X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=cf51dbab72019d9eb87748e0817171c1c0abfb35;hb=efbaa19a054ca1bbe1832c080154b72baf65b480;hp=2cc71a2ecc7ce7a77e99abee6fc0a135a058c7bc;hpb=c383f06538ac664fe3312daf765595ba106d5b98;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 2cc71a2e..cf51dbab 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -42,11 +42,12 @@ use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; +use core::iter::FromIterator; use core::ops::Deref; -use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; -#[derive(Clone, Copy, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents /// entirely opaque. enum UpdateOrigin { @@ -60,7 +61,7 @@ enum UpdateOrigin { } /// An opaque identifier describing a specific [`Persist`] method call. -#[derive(Clone, Copy, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub struct MonitorUpdateId { contents: UpdateOrigin, } @@ -77,23 +78,58 @@ impl MonitorUpdateId { /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// -/// Each method can return three possible values: -/// * If persistence (including any relevant `fsync()` calls) happens immediately, the -/// implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal -/// channel operation should continue. -/// * If persistence happens asynchronously, implementations should first ensure the -/// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return -/// [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background. -/// Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with -/// the corresponding [`MonitorUpdateId`]. +/// Persistence can happen in one of two ways - synchronously completing before the trait method +/// calls return or asynchronously in the background. /// -/// Note that unlike the direct [`chain::Watch`] interface, -/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. +/// # For those implementing synchronous persistence /// -/// * If persistence fails for some reason, implementations should return -/// [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be -/// closed without broadcasting the latest state. See -/// [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details. +/// * If persistence completes fully (including any relevant `fsync()` calls), the implementation +/// should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal channel operation +/// should continue. +/// +/// * If persistence fails for some reason, implementations should consider returning +/// [`ChannelMonitorUpdateStatus::InProgress`] and retry all pending persistence operations in +/// the background with [`ChainMonitor::list_pending_monitor_updates`] and +/// [`ChainMonitor::get_monitor`]. +/// +/// Once a full [`ChannelMonitor`] has been persisted, all pending updates for that channel can +/// be marked as complete via [`ChainMonitor::channel_monitor_updated`]. +/// +/// If at some point no further progress can be made towards persisting the pending updates, the +/// node should simply shut down. +/// +/// * If the persistence has failed and cannot be retried further (e.g. because of an outage), +/// [`ChannelMonitorUpdateStatus::UnrecoverableError`] can be used, though this will result in +/// an immediate panic and future operations in LDK generally failing. +/// +/// # For those implementing asynchronous persistence +/// +/// All calls should generally spawn a background task and immediately return +/// [`ChannelMonitorUpdateStatus::InProgress`]. Once the update completes, +/// [`ChainMonitor::channel_monitor_updated`] should be called with the corresponding +/// [`MonitorUpdateId`]. +/// +/// Note that unlike the direct [`chain::Watch`] interface, +/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. +/// +/// If at some point no further progress can be made towards persisting a pending update, the node +/// should simply shut down. Until then, the background task should either loop indefinitely, or +/// persistence should be regularly retried with [`ChainMonitor::list_pending_monitor_updates`] +/// and [`ChainMonitor::get_monitor`] (note that if a full monitor is persisted all pending +/// monitor updates may be marked completed). +/// +/// # Using remote watchtowers +/// +/// Watchtowers may be updated as a part of an implementation of this trait, utilizing the async +/// update process described above while the watchtower is being updated. The following methods are +/// provided for bulding transactions for a watchtower: +/// [`ChannelMonitor::initial_counterparty_commitment_tx`], +/// [`ChannelMonitor::counterparty_commitment_txs_from_update`], +/// [`ChannelMonitor::sign_to_local_justice_tx`], [`TrustedCommitmentTransaction::revokeable_output_index`], +/// [`TrustedCommitmentTransaction::build_to_local_justice_tx`]. +/// +/// [`TrustedCommitmentTransaction::revokeable_output_index`]: crate::ln::chan_utils::TrustedCommitmentTransaction::revokeable_output_index +/// [`TrustedCommitmentTransaction::build_to_local_justice_tx`]: crate::ln::chan_utils::TrustedCommitmentTransaction::build_to_local_justice_tx pub trait Persist { /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup. @@ -168,12 +204,6 @@ struct MonitorHolder { /// the ChannelManager re-adding the same payment entry, before the same block is replayed, /// resulting in a duplicate PaymentSent event. pending_monitor_updates: Mutex>, - /// When the user returns a PermanentFailure error from an update_persisted_channel call during - /// block processing, we inform the ChannelManager that the channel should be closed - /// asynchronously. In order to ensure no further changes happen before the ChannelManager has - /// processed the closure event, we set this to true and return PermanentFailure for any other - /// chain::Watch events. - channel_perm_failed: AtomicBool, /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present /// in `pending_monitor_updates`. /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain @@ -274,7 +304,34 @@ where C::Target: chain::Filter, where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + let funding_outpoints: HashSet = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned()); + for funding_outpoint in funding_outpoints.iter() { + let monitor_lock = self.monitors.read().unwrap(); + if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { + if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(monitor_state); + core::mem::drop(monitor_lock); + let _poison = self.monitors.write().unwrap(); + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); + } + } + } + + // do some followup cleanup if any funding outpoints were added in between iterations let monitor_states = self.monitors.write().unwrap(); + for (funding_outpoint, monitor_state) in monitor_states.iter() { + if !funding_outpoints.contains(funding_outpoint) { + if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() { + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); + } + } + } + if let Some(height) = best_height { // If the best block height is being updated, update highest_chain_height under the // monitors write lock. @@ -284,58 +341,60 @@ where C::Target: chain::Filter, self.highest_chain_height.store(new_height, Ordering::Release); } } + } - for (funding_outpoint, monitor_state) in monitor_states.iter() { - let monitor = &monitor_state.monitor; - let mut txn_outputs; - { - txn_outputs = process(monitor, txdata); - let update_id = MonitorUpdateId { - contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), - }; - let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - if let Some(height) = best_height { - if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { - // If there are not ChainSync persists awaiting completion, go ahead and - // set last_chain_persist_height here - we wouldn't want the first - // InProgress to always immediately be considered "overly delayed". - monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); - } + fn update_monitor_with_chain_data( + &self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, + process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder + ) -> Result<(), ()> where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + let monitor = &monitor_state.monitor; + let mut txn_outputs; + { + txn_outputs = process(monitor, txdata); + let update_id = MonitorUpdateId { + contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), + }; + let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + if let Some(height) = best_height { + if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { + // If there are not ChainSync persists awaiting completion, go ahead and + // set last_chain_persist_height here - we wouldn't want the first + // InProgress to always immediately be considered "overly delayed". + monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); } + } - log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); - match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { - ChannelMonitorUpdateStatus::Completed => - log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), - ChannelMonitorUpdateStatus::PermanentFailure => { - monitor_state.channel_perm_failed.store(true, Ordering::Release); - self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); - self.event_notifier.notify(); - }, - ChannelMonitorUpdateStatus::InProgress => { - log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); - pending_monitor_updates.push(update_id); - }, - } + log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { + ChannelMonitorUpdateStatus::Completed => + log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), + ChannelMonitorUpdateStatus::InProgress => { + log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); + pending_monitor_updates.push(update_id); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + return Err(()); + }, } + } - // Register any new outputs with the chain source for filtering, storing any dependent - // transactions from within the block that previously had not been included in txdata. - if let Some(ref chain_source) = self.chain_source { - let block_hash = header.block_hash(); - for (txid, mut outputs) in txn_outputs.drain(..) { - for (idx, output) in outputs.drain(..) { - // Register any new outputs with the chain source for filtering - let output = WatchedOutput { - block_hash: Some(block_hash), - outpoint: OutPoint { txid, index: idx as u16 }, - script_pubkey: output.script_pubkey, - }; - chain_source.register_output(output) - } + // Register any new outputs with the chain source for filtering, storing any dependent + // transactions from within the block that previously had not been included in txdata. + if let Some(ref chain_source) = self.chain_source { + let block_hash = header.block_hash(); + for (txid, mut outputs) in txn_outputs.drain(..) { + for (idx, output) in outputs.drain(..) { + // Register any new outputs with the chain source for filtering + let output = WatchedOutput { + block_hash: Some(block_hash), + outpoint: OutPoint { txid, index: idx as u16 }, + script_pubkey: output.script_pubkey, + }; + chain_source.register_output(output) } } } + Ok(()) } /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels. @@ -465,9 +524,8 @@ where C::Target: chain::Filter, // `MonitorEvent`s from the monitor back to the `ChannelManager` until they // complete. let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates); - if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) { - // If there are still monitor updates pending (or an old monitor update - // finished after a later one perm-failed), we cannot yet construct an + if monitor_is_pending_updates { + // If there are still monitor updates pending, we cannot yet construct a // Completed event. return Ok(()); } @@ -641,18 +699,12 @@ where C::Target: chain::Filter, L::Target: Logger, P::Target: Persist, { - /// Adds the monitor that watches the channel referred to by the given outpoint. - /// - /// Calls back to [`chain::Filter`] with the funding transaction and outputs to watch. - /// - /// Note that we persist the given `ChannelMonitor` while holding the `ChainMonitor` - /// monitors lock. - fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> ChannelMonitorUpdateStatus { + fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result { let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(funding_outpoint) { hash_map::Entry::Occupied(_) => { log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); - return ChannelMonitorUpdateStatus::PermanentFailure + return Err(()); }, hash_map::Entry::Vacant(e) => e, }; @@ -665,13 +717,14 @@ where C::Target: chain::Filter, log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); }, - ChannelMonitorUpdateStatus::PermanentFailure => { - log_error!(self.logger, "Persistence of new ChannelMonitor for channel {} failed", log_funding_info!(monitor)); - return persist_res; - }, ChannelMonitorUpdateStatus::Completed => { log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor)); - } + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); + }, } if let Some(ref chain_source) = self.chain_source { monitor.load_outputs_to_watch(chain_source); @@ -679,28 +732,25 @@ where C::Target: chain::Filter, entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates), - channel_perm_failed: AtomicBool::new(false), last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)), }); - persist_res + Ok(persist_res) } - /// Note that we persist the given `ChannelMonitor` update while holding the - /// `ChainMonitor` monitors lock. fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { // Update the monitor that watches the channel referred to by the given outpoint. let monitors = self.monitors.read().unwrap(); - match monitors.get(&funding_txo) { + let ret = match monitors.get(&funding_txo) { None => { log_error!(self.logger, "Failed to update channel monitor: no such monitor registered"); // We should never ever trigger this from within ChannelManager. Technically a // user could use this object with some proxying in between which makes this // possible, but in tests and fuzzing, this should be a panic. - #[cfg(any(test, fuzzing))] + #[cfg(debug_assertions)] panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); - #[cfg(not(any(test, fuzzing)))] - ChannelMonitorUpdateStatus::PermanentFailure + #[cfg(not(debug_assertions))] + ChannelMonitorUpdateStatus::InProgress }, Some(monitor_state) => { let monitor = &monitor_state.monitor; @@ -719,23 +769,28 @@ where C::Target: chain::Filter, pending_monitor_updates.push(update_id); log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor)); }, - ChannelMonitorUpdateStatus::PermanentFailure => { - monitor_state.channel_perm_failed.store(true, Ordering::Release); - log_error!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} failed", log_funding_info!(monitor)); - }, ChannelMonitorUpdateStatus::Completed => { log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor)); }, + ChannelMonitorUpdateStatus::UnrecoverableError => { /* we'll panic in a moment */ }, } if update_res.is_err() { - ChannelMonitorUpdateStatus::PermanentFailure - } else if monitor_state.channel_perm_failed.load(Ordering::Acquire) { - ChannelMonitorUpdateStatus::PermanentFailure + ChannelMonitorUpdateStatus::InProgress } else { persist_res } } + }; + if let ChannelMonitorUpdateStatus::UnrecoverableError = ret { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); } + ret } fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec, Option)> { @@ -746,19 +801,8 @@ where C::Target: chain::Filter, monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize > self.highest_chain_height.load(Ordering::Acquire) { - log_info!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!"); + log_debug!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!"); } else { - if monitor_state.channel_perm_failed.load(Ordering::Acquire) { - // If a `UpdateOrigin::ChainSync` persistence failed with `PermanantFailure`, - // we don't really know if the latest `ChannelMonitor` state is on disk or not. - // We're supposed to hold monitor updates until the latest state is on disk to - // avoid duplicate events, but the user told us persistence is screw-y and may - // not complete. We can't hold events forever because we may learn some payment - // preimage, so instead we just log and hope the user complied with the - // `PermanentFailure` requirements of having at least the local-disk copy - // updated. - log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!"); - } if is_pending_monitor_update { log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS); log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released."); @@ -805,12 +849,12 @@ impl