X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=6d003df720f6680bdfde4fd6766a064edb8f05c2;hb=a8fa5a1685d84d3b07732e4ecc04e2dc7ba06a00;hp=09f72c5948933f606cb82663e144fcd14a658f60;hpb=f24502e9868fa7fc37541896bd169fa10d13636c;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 09f72c59..6d003df7 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -44,10 +44,10 @@ use crate::prelude::*; use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::iter::FromIterator; use core::ops::Deref; -use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; -#[derive(Clone, Copy, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents /// entirely opaque. enum UpdateOrigin { @@ -61,7 +61,7 @@ enum UpdateOrigin { } /// An opaque identifier describing a specific [`Persist`] method call. -#[derive(Clone, Copy, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub struct MonitorUpdateId { contents: UpdateOrigin, } @@ -78,26 +78,51 @@ impl MonitorUpdateId { /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// -/// Each method can return two possible values: -/// * If persistence (including any relevant `fsync()` calls) happens immediately, the -/// implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal -/// channel operation should continue. -/// * If persistence happens asynchronously, implementations should first ensure the -/// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return -/// [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background. -/// Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with -/// the corresponding [`MonitorUpdateId`]. +/// Persistence can happen in one of two ways - synchronously completing before the trait method +/// calls return or asynchronously in the background. /// -/// Note that unlike the direct [`chain::Watch`] interface, -/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. +/// # For those implementing synchronous persistence /// -/// If persistence fails for some reason, implementations should still return -/// [`ChannelMonitorUpdateStatus::InProgress`] and attempt to shut down or otherwise resolve the -/// situation ASAP. +/// * If persistence completes fully (including any relevant `fsync()` calls), the implementation +/// should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal channel operation +/// should continue. /// -/// Third-party watchtowers may be built as a part of an implementation of this trait, with the -/// advantage that you can control whether to resume channel operation depending on if an update -/// has been persisted to a watchtower. For this, you may find the following methods useful: +/// * If persistence fails for some reason, implementations should consider returning +/// [`ChannelMonitorUpdateStatus::InProgress`] and retry all pending persistence operations in +/// the background with [`ChainMonitor::list_pending_monitor_updates`] and +/// [`ChainMonitor::get_monitor`]. +/// +/// Once a full [`ChannelMonitor`] has been persisted, all pending updates for that channel can +/// be marked as complete via [`ChainMonitor::channel_monitor_updated`]. +/// +/// If at some point no further progress can be made towards persisting the pending updates, the +/// node should simply shut down. +/// +/// * If the persistence has failed and cannot be retried further (e.g. because of an outage), +/// [`ChannelMonitorUpdateStatus::UnrecoverableError`] can be used, though this will result in +/// an immediate panic and future operations in LDK generally failing. +/// +/// # For those implementing asynchronous persistence +/// +/// All calls should generally spawn a background task and immediately return +/// [`ChannelMonitorUpdateStatus::InProgress`]. Once the update completes, +/// [`ChainMonitor::channel_monitor_updated`] should be called with the corresponding +/// [`MonitorUpdateId`]. +/// +/// Note that unlike the direct [`chain::Watch`] interface, +/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. +/// +/// If at some point no further progress can be made towards persisting a pending update, the node +/// should simply shut down. Until then, the background task should either loop indefinitely, or +/// persistence should be regularly retried with [`ChainMonitor::list_pending_monitor_updates`] +/// and [`ChainMonitor::get_monitor`] (note that if a full monitor is persisted all pending +/// monitor updates may be marked completed). +/// +/// # Using remote watchtowers +/// +/// Watchtowers may be updated as a part of an implementation of this trait, utilizing the async +/// update process described above while the watchtower is being updated. The following methods are +/// provided for bulding transactions for a watchtower: /// [`ChannelMonitor::initial_counterparty_commitment_tx`], /// [`ChannelMonitor::counterparty_commitment_txs_from_update`], /// [`ChannelMonitor::sign_to_local_justice_tx`], [`TrustedCommitmentTransaction::revokeable_output_index`], @@ -279,11 +304,20 @@ where C::Target: chain::Filter, where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; let funding_outpoints: HashSet = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned()); for funding_outpoint in funding_outpoints.iter() { let monitor_lock = self.monitors.read().unwrap(); if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { - self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state); + if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(monitor_state); + core::mem::drop(monitor_lock); + let _poison = self.monitors.write().unwrap(); + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); + } } } @@ -291,7 +325,10 @@ where C::Target: chain::Filter, let monitor_states = self.monitors.write().unwrap(); for (funding_outpoint, monitor_state) in monitor_states.iter() { if !funding_outpoints.contains(funding_outpoint) { - self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state); + if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() { + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); + } } } @@ -306,7 +343,10 @@ where C::Target: chain::Filter, } } - fn update_monitor_with_chain_data(&self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder) where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + fn update_monitor_with_chain_data( + &self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, + process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder + ) -> Result<(), ()> where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let monitor = &monitor_state.monitor; let mut txn_outputs; { @@ -331,7 +371,10 @@ where C::Target: chain::Filter, ChannelMonitorUpdateStatus::InProgress => { log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); - } + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + return Err(()); + }, } } @@ -351,6 +394,7 @@ where C::Target: chain::Filter, } } } + Ok(()) } /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels. @@ -654,12 +698,6 @@ where C::Target: chain::Filter, L::Target: Logger, P::Target: Persist, { - /// Adds the monitor that watches the channel referred to by the given outpoint. - /// - /// Calls back to [`chain::Filter`] with the funding transaction and outputs to watch. - /// - /// Note that we persist the given `ChannelMonitor` while holding the `ChainMonitor` - /// monitors lock. fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result { let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(funding_outpoint) { @@ -680,7 +718,12 @@ where C::Target: chain::Filter, }, ChannelMonitorUpdateStatus::Completed => { log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor)); - } + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); + }, } if let Some(ref chain_source) = self.chain_source { monitor.load_outputs_to_watch(chain_source); @@ -693,12 +736,10 @@ where C::Target: chain::Filter, Ok(persist_res) } - /// Note that we persist the given `ChannelMonitor` update while holding the - /// `ChainMonitor` monitors lock. fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { // Update the monitor that watches the channel referred to by the given outpoint. let monitors = self.monitors.read().unwrap(); - match monitors.get(&funding_txo) { + let ret = match monitors.get(&funding_txo) { None => { log_error!(self.logger, "Failed to update channel monitor: no such monitor registered"); @@ -730,6 +771,7 @@ where C::Target: chain::Filter, ChannelMonitorUpdateStatus::Completed => { log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor)); }, + ChannelMonitorUpdateStatus::UnrecoverableError => { /* we'll panic in a moment */ }, } if update_res.is_err() { ChannelMonitorUpdateStatus::InProgress @@ -737,7 +779,17 @@ where C::Target: chain::Filter, persist_res } } + }; + if let ChannelMonitorUpdateStatus::UnrecoverableError = ret { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); } + ret } fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec, Option)> { @@ -981,4 +1033,26 @@ mod tests { do_chainsync_pauses_events(false); do_chainsync_pauses_events(true); } + + #[test] + #[cfg(feature = "std")] + fn update_during_chainsync_poisons_channel() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + create_announced_chan_between_nodes(&nodes, 0, 1); + + chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear(); + chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::UnrecoverableError); + + assert!(std::panic::catch_unwind(|| { + // Returning an UnrecoverableError should always panic immediately + connect_blocks(&nodes[0], 1); + }).is_err()); + assert!(std::panic::catch_unwind(|| { + // ...and also poison our locks causing later use to panic as well + core::mem::drop(nodes); + }).is_err()); + } }