X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=81ba30ffb83ed964ca4fe13901be979073e2e6ad;hb=d25d55a682dbd4a3c7fcbaa05744fa002e571bca;hp=6e824d1e111e5b288277f008547a30afb7bcf3e8;hpb=93e77632d61481e912d2630395c30ff9ec71ead6;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 6e824d1e..81ba30ff 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -31,6 +31,7 @@ use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS}; use crate::chain::transaction::{OutPoint, TransactionData}; +use crate::ln::ChannelId; use crate::sign::ecdsa::WriteableEcdsaChannelSigner; use crate::events; use crate::events::{Event, EventHandler}; @@ -42,7 +43,6 @@ use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; -use core::iter::FromIterator; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; @@ -158,7 +158,7 @@ pub trait Persist { /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + fn persist_new_channel(&self, channel_funding_outpoint: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given /// update. @@ -193,7 +193,12 @@ pub trait Persist { /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + /// Prevents the channel monitor from being loaded on startup. + /// + /// Archiving the data in a backup location (rather than deleting it fully) is useful for + /// hedging against data loss in case of unexpected failure. + fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint); } struct MonitorHolder { @@ -287,7 +292,7 @@ pub struct ChainMonitor, Option)>>, + pending_monitor_events: Mutex, Option)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, @@ -317,7 +322,7 @@ where C::Target: chain::Filter, FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - let funding_outpoints: HashSet = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned()); + let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned()); for funding_outpoint in funding_outpoints.iter() { let monitor_lock = self.monitors.read().unwrap(); if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { @@ -426,7 +431,7 @@ where C::Target: chain::Filter, /// transactions relevant to the watched channels. pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P) -> Self { Self { - monitors: RwLock::new(HashMap::new()), + monitors: RwLock::new(new_hash_map()), sync_persistence_id: AtomicCounter::new(), chain_source, broadcaster, @@ -478,20 +483,23 @@ where C::Target: chain::Filter, } } - /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored. + /// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored. /// /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always /// monitoring for on-chain state resolutions. - pub fn list_monitors(&self) -> Vec { - self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() + pub fn list_monitors(&self) -> Vec<(OutPoint, ChannelId)> { + self.monitors.read().unwrap().iter().map(|(outpoint, monitor_holder)| { + let channel_id = monitor_holder.monitor.channel_id(); + (*outpoint, channel_id) + }).collect() } #[cfg(not(c_bindings))] /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). pub fn list_pending_monitor_updates(&self) -> HashMap> { - self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + hash_map_from_iter(self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) - }).collect() + })) } #[cfg(c_bindings)] @@ -557,8 +565,9 @@ where C::Target: chain::Filter, // Completed event. return Ok(()); } - self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed { - funding_txo, + let channel_id = monitor_data.monitor.channel_id(); + self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed { + funding_txo, channel_id, monitor_update_id: monitor_data.monitor.get_latest_update_id(), }], monitor_data.monitor.get_counterparty_node_id())); }, @@ -590,9 +599,14 @@ where C::Target: chain::Filter, #[cfg(any(test, fuzzing))] pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) { let monitors = self.monitors.read().unwrap(); - let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id()); - self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed { + let (counterparty_node_id, channel_id) = if let Some(m) = monitors.get(&funding_txo) { + (m.monitor.get_counterparty_node_id(), m.monitor.channel_id()) + } else { + (None, ChannelId::v1_from_funding_outpoint(funding_txo)) + }; + self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed { funding_txo, + channel_id, monitor_update_id, }], counterparty_node_id)); self.event_notifier.notify(); @@ -651,6 +665,62 @@ where C::Target: chain::Filter, ) } } + + /// Triggers rebroadcasts of pending claims from force-closed channels after a transaction + /// signature generation failure. + /// + /// `monitor_opt` can be used as a filter to only trigger them for a specific channel monitor. + pub fn signer_unblocked(&self, monitor_opt: Option) { + let monitors = self.monitors.read().unwrap(); + if let Some(funding_txo) = monitor_opt { + if let Some(monitor_holder) = monitors.get(&funding_txo) { + monitor_holder.monitor.signer_unblocked( + &*self.broadcaster, &*self.fee_estimator, &self.logger + ) + } + } else { + for (_, monitor_holder) in &*monitors { + monitor_holder.monitor.signer_unblocked( + &*self.broadcaster, &*self.fee_estimator, &self.logger + ) + } + } + } + + /// Archives fully resolved channel monitors by calling [`Persist::archive_persisted_channel`]. + /// + /// This is useful for pruning fully resolved monitors from the monitor set and primary + /// storage so they are not kept in memory and reloaded on restart. + /// + /// Should be called occasionally (once every handful of blocks or on startup). + /// + /// Depending on the implementation of [`Persist::archive_persisted_channel`] the monitor + /// data could be moved to an archive location or removed entirely. + pub fn archive_fully_resolved_channel_monitors(&self) { + let mut have_monitors_to_prune = false; + for (_, monitor_holder) in self.monitors.read().unwrap().iter() { + let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor); + if monitor_holder.monitor.is_fully_resolved(&logger) { + have_monitors_to_prune = true; + } + } + if have_monitors_to_prune { + let mut monitors = self.monitors.write().unwrap(); + monitors.retain(|funding_txo, monitor_holder| { + let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor); + if monitor_holder.monitor.is_fully_resolved(&logger) { + log_info!(logger, + "Archiving fully resolved ChannelMonitor for funding txo {}", + funding_txo + ); + self.persister.archive_persisted_channel(*funding_txo); + false + } else { + true + } + }); + } + } } impl @@ -778,11 +848,14 @@ where C::Target: chain::Filter, } fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { + // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those + // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. + let channel_id = update.channel_id.unwrap_or(ChannelId::v1_from_funding_outpoint(funding_txo)); // Update the monitor that watches the channel referred to by the given outpoint. let monitors = self.monitors.read().unwrap(); match monitors.get(&funding_txo) { None => { - let logger = WithContext::from(&self.logger, update.counterparty_node_id, Some(funding_txo.to_channel_id())); + let logger = WithContext::from(&self.logger, update.counterparty_node_id, Some(channel_id)); log_error!(logger, "Failed to update channel monitor: no such monitor registered"); // We should never ever trigger this from within ChannelManager. Technically a @@ -848,7 +921,7 @@ where C::Target: chain::Filter, } } - fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec, Option)> { + fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec, Option)> { let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor); @@ -862,8 +935,9 @@ where C::Target: chain::Filter, let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); if monitor_events.len() > 0 { let monitor_outpoint = monitor_state.monitor.get_funding_txo().0; + let monitor_channel_id = monitor_state.monitor.channel_id(); let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id(); - pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id)); + pending_monitor_events.push((monitor_outpoint, monitor_channel_id, monitor_events, counterparty_node_id)); } } }