X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=81ba30ffb83ed964ca4fe13901be979073e2e6ad;hb=eebab4015d9c84b1b605c498c9b6c166369cdb8b;hp=b71f10f58d455cc0fccff487884dd65f12c8d5cc;hpb=e594021052e251659e33c0f5e82c7ec2b9e99c18;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index b71f10f5..81ba30ff 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -43,7 +43,6 @@ use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; -use core::iter::FromIterator; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; @@ -195,6 +194,11 @@ pub trait Persist { /// /// [`Writeable::write`]: crate::util::ser::Writeable::write fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + /// Prevents the channel monitor from being loaded on startup. + /// + /// Archiving the data in a backup location (rather than deleting it fully) is useful for + /// hedging against data loss in case of unexpected failure. + fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint); } struct MonitorHolder { @@ -318,7 +322,7 @@ where C::Target: chain::Filter, FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - let funding_outpoints: HashSet = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned()); + let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned()); for funding_outpoint in funding_outpoints.iter() { let monitor_lock = self.monitors.read().unwrap(); if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { @@ -364,8 +368,9 @@ where C::Target: chain::Filter, let mut txn_outputs; { txn_outputs = process(monitor, txdata); + let chain_sync_update_id = self.sync_persistence_id.get_increment(); let update_id = MonitorUpdateId { - contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), + contents: UpdateOrigin::ChainSync(chain_sync_update_id), }; let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); if let Some(height) = best_height { @@ -377,10 +382,16 @@ where C::Target: chain::Filter, } } - log_trace!(logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + log_trace!(logger, "Syncing Channel Monitor for channel {} for block-data update_id {}", + log_funding_info!(monitor), + chain_sync_update_id + ); match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { ChannelMonitorUpdateStatus::Completed => - log_trace!(logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), + log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data update_id {}", + log_funding_info!(monitor), + chain_sync_update_id + ), ChannelMonitorUpdateStatus::InProgress => { log_debug!(logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); @@ -420,7 +431,7 @@ where C::Target: chain::Filter, /// transactions relevant to the watched channels. pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P) -> Self { Self { - monitors: RwLock::new(HashMap::new()), + monitors: RwLock::new(new_hash_map()), sync_persistence_id: AtomicCounter::new(), chain_source, broadcaster, @@ -486,9 +497,9 @@ where C::Target: chain::Filter, #[cfg(not(c_bindings))] /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). pub fn list_pending_monitor_updates(&self) -> HashMap> { - self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + hash_map_from_iter(self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) - }).collect() + })) } #[cfg(c_bindings)] @@ -530,7 +541,7 @@ where C::Target: chain::Filter, pending_monitor_updates.retain(|update_id| *update_id != completed_update_id); match completed_update_id { - MonitorUpdateId { contents: UpdateOrigin::OffChain(_) } => { + MonitorUpdateId { contents: UpdateOrigin::OffChain(completed_update_id) } => { // Note that we only check for `UpdateOrigin::OffChain` failures here - if // we're being told that a `UpdateOrigin::OffChain` monitor update completed, // we only care about ensuring we don't tell the `ChannelManager` to restore @@ -541,6 +552,14 @@ where C::Target: chain::Filter, // `MonitorEvent`s from the monitor back to the `ChannelManager` until they // complete. let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates); + log_debug!(self.logger, "Completed off-chain monitor update {} for channel with funding outpoint {:?}, {}", + completed_update_id, + funding_txo, + if monitor_is_pending_updates { + "still have pending off-chain updates" + } else { + "all off-chain updates complete, returning a MonitorEvent" + }); if monitor_is_pending_updates { // If there are still monitor updates pending, we cannot yet construct a // Completed event. @@ -552,8 +571,18 @@ where C::Target: chain::Filter, monitor_update_id: monitor_data.monitor.get_latest_update_id(), }], monitor_data.monitor.get_counterparty_node_id())); }, - MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => { - if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) { + MonitorUpdateId { contents: UpdateOrigin::ChainSync(completed_update_id) } => { + let monitor_has_pending_updates = + monitor_data.has_pending_chainsync_updates(&pending_monitor_updates); + log_debug!(self.logger, "Completed chain sync monitor update {} for channel with funding outpoint {:?}, {}", + completed_update_id, + funding_txo, + if monitor_has_pending_updates { + "still have pending chain sync updates" + } else { + "all chain sync updates complete, releasing pending MonitorEvents" + }); + if !monitor_has_pending_updates { monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release); // The next time release_pending_monitor_events is called, any events for this // ChannelMonitor will be returned. @@ -636,6 +665,62 @@ where C::Target: chain::Filter, ) } } + + /// Triggers rebroadcasts of pending claims from force-closed channels after a transaction + /// signature generation failure. + /// + /// `monitor_opt` can be used as a filter to only trigger them for a specific channel monitor. + pub fn signer_unblocked(&self, monitor_opt: Option) { + let monitors = self.monitors.read().unwrap(); + if let Some(funding_txo) = monitor_opt { + if let Some(monitor_holder) = monitors.get(&funding_txo) { + monitor_holder.monitor.signer_unblocked( + &*self.broadcaster, &*self.fee_estimator, &self.logger + ) + } + } else { + for (_, monitor_holder) in &*monitors { + monitor_holder.monitor.signer_unblocked( + &*self.broadcaster, &*self.fee_estimator, &self.logger + ) + } + } + } + + /// Archives fully resolved channel monitors by calling [`Persist::archive_persisted_channel`]. + /// + /// This is useful for pruning fully resolved monitors from the monitor set and primary + /// storage so they are not kept in memory and reloaded on restart. + /// + /// Should be called occasionally (once every handful of blocks or on startup). + /// + /// Depending on the implementation of [`Persist::archive_persisted_channel`] the monitor + /// data could be moved to an archive location or removed entirely. + pub fn archive_fully_resolved_channel_monitors(&self) { + let mut have_monitors_to_prune = false; + for (_, monitor_holder) in self.monitors.read().unwrap().iter() { + let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor); + if monitor_holder.monitor.is_fully_resolved(&logger) { + have_monitors_to_prune = true; + } + } + if have_monitors_to_prune { + let mut monitors = self.monitors.write().unwrap(); + monitors.retain(|funding_txo, monitor_holder| { + let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor); + if monitor_holder.monitor.is_fully_resolved(&logger) { + log_info!(logger, + "Archiving fully resolved ChannelMonitor for funding txo {}", + funding_txo + ); + self.persister.archive_persisted_channel(*funding_txo); + false + } else { + true + } + }); + } + } } impl @@ -784,7 +869,7 @@ where C::Target: chain::Filter, Some(monitor_state) => { let monitor = &monitor_state.monitor; let logger = WithChannelMonitor::from(&self.logger, &monitor); - log_trace!(logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor)); + log_trace!(logger, "Updating ChannelMonitor to id {} for channel {}", update.update_id, log_funding_info!(monitor)); let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger); let update_id = MonitorUpdateId::from_monitor_update(update); @@ -803,10 +888,18 @@ where C::Target: chain::Filter, match persist_res { ChannelMonitorUpdateStatus::InProgress => { pending_monitor_updates.push(update_id); - log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor)); + log_debug!(logger, + "Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress", + update_id, + log_funding_info!(monitor) + ); }, ChannelMonitorUpdateStatus::Completed => { - log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor)); + log_debug!(logger, + "Persistence of ChannelMonitorUpdate id {:?} for channel {} completed", + update_id, + log_funding_info!(monitor) + ); }, ChannelMonitorUpdateStatus::UnrecoverableError => { // Take the monitors lock for writing so that we poison it and any future