X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=e87d082d9a7bb9061894ce26fb15da0087a7c173;hb=3f416bc24e0e804e0cae1c8c5650b19500122b6d;hp=fef1e3bf14fc7b8c051e084f3c492084293562b7;hpb=b5e959460f1473022321c8b8868fb0f7da6906b5;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index fef1e3bf..e87d082d 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -44,26 +44,38 @@ use crate::prelude::*; use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::iter::FromIterator; use core::ops::Deref; -use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; -#[derive(Clone, Copy, Hash, PartialEq, Eq)] -/// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents -/// entirely opaque. -enum UpdateOrigin { - /// An update that was generated by the `ChannelManager` (via our `chain::Watch` - /// implementation). This corresponds to an actual [`ChannelMonitorUpdate::update_id`] field - /// and [`ChannelMonitor::get_latest_update_id`]. - OffChain(u64), - /// An update that was generated during blockchain processing. The ID here is specific to the - /// generating [`ChainMonitor`] and does *not* correspond to any on-disk IDs. - ChainSync(u64), +mod update_origin { + #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] + /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents + /// entirely opaque. + pub(crate) enum UpdateOrigin { + /// An update that was generated by the `ChannelManager` (via our [`crate::chain::Watch`] + /// implementation). This corresponds to an actual [ChannelMonitorUpdate::update_id] field + /// and [ChannelMonitor::get_latest_update_id]. + /// + /// [ChannelMonitor::get_latest_update_id]: crate::chain::channelmonitor::ChannelMonitor::get_latest_update_id + /// [ChannelMonitorUpdate::update_id]: crate::chain::channelmonitor::ChannelMonitorUpdate::update_id + OffChain(u64), + /// An update that was generated during blockchain processing. The ID here is specific to the + /// generating [ChannelMonitor] and does *not* correspond to any on-disk IDs. + /// + /// [ChannelMonitor]: crate::chain::channelmonitor::ChannelMonitor + ChainSync(u64), + } } +#[cfg(any(feature = "_test_utils", test))] +pub(crate) use update_origin::UpdateOrigin; +#[cfg(not(any(feature = "_test_utils", test)))] +use update_origin::UpdateOrigin; + /// An opaque identifier describing a specific [`Persist`] method call. -#[derive(Clone, Copy, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub struct MonitorUpdateId { - contents: UpdateOrigin, + pub(crate) contents: UpdateOrigin, } impl MonitorUpdateId { @@ -78,27 +90,51 @@ impl MonitorUpdateId { /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// -/// Each method can return three possible values: -/// * If persistence (including any relevant `fsync()` calls) happens immediately, the -/// implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal -/// channel operation should continue. -/// * If persistence happens asynchronously, implementations should first ensure the -/// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return -/// [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background. -/// Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with -/// the corresponding [`MonitorUpdateId`]. +/// Persistence can happen in one of two ways - synchronously completing before the trait method +/// calls return or asynchronously in the background. +/// +/// # For those implementing synchronous persistence +/// +/// * If persistence completes fully (including any relevant `fsync()` calls), the implementation +/// should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal channel operation +/// should continue. +/// +/// * If persistence fails for some reason, implementations should consider returning +/// [`ChannelMonitorUpdateStatus::InProgress`] and retry all pending persistence operations in +/// the background with [`ChainMonitor::list_pending_monitor_updates`] and +/// [`ChainMonitor::get_monitor`]. +/// +/// Once a full [`ChannelMonitor`] has been persisted, all pending updates for that channel can +/// be marked as complete via [`ChainMonitor::channel_monitor_updated`]. +/// +/// If at some point no further progress can be made towards persisting the pending updates, the +/// node should simply shut down. +/// +/// * If the persistence has failed and cannot be retried further (e.g. because of an outage), +/// [`ChannelMonitorUpdateStatus::UnrecoverableError`] can be used, though this will result in +/// an immediate panic and future operations in LDK generally failing. /// -/// Note that unlike the direct [`chain::Watch`] interface, -/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. +/// # For those implementing asynchronous persistence /// -/// * If persistence fails for some reason, implementations should return -/// [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be -/// closed without broadcasting the latest state. See -/// [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details. +/// All calls should generally spawn a background task and immediately return +/// [`ChannelMonitorUpdateStatus::InProgress`]. Once the update completes, +/// [`ChainMonitor::channel_monitor_updated`] should be called with the corresponding +/// [`MonitorUpdateId`]. /// -/// Third-party watchtowers may be built as a part of an implementation of this trait, with the -/// advantage that you can control whether to resume channel operation depending on if an update -/// has been persisted to a watchtower. For this, you may find the following methods useful: +/// Note that unlike the direct [`chain::Watch`] interface, +/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. +/// +/// If at some point no further progress can be made towards persisting a pending update, the node +/// should simply shut down. Until then, the background task should either loop indefinitely, or +/// persistence should be regularly retried with [`ChainMonitor::list_pending_monitor_updates`] +/// and [`ChainMonitor::get_monitor`] (note that if a full monitor is persisted all pending +/// monitor updates may be marked completed). +/// +/// # Using remote watchtowers +/// +/// Watchtowers may be updated as a part of an implementation of this trait, utilizing the async +/// update process described above while the watchtower is being updated. The following methods are +/// provided for bulding transactions for a watchtower: /// [`ChannelMonitor::initial_counterparty_commitment_tx`], /// [`ChannelMonitor::counterparty_commitment_txs_from_update`], /// [`ChannelMonitor::sign_to_local_justice_tx`], [`TrustedCommitmentTransaction::revokeable_output_index`], @@ -131,8 +167,8 @@ pub trait Persist { /// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more /// details. /// - /// During blockchain synchronization operations, this may be called with no - /// [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted. + /// During blockchain synchronization operations, and in some rare cases, this may be called with + /// no [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted. /// Note that after the full [`ChannelMonitor`] is persisted any previous /// [`ChannelMonitorUpdate`]s which were persisted should be discarded - they can no longer be /// applied to the persisted [`ChannelMonitor`] as they were already applied. @@ -180,12 +216,6 @@ struct MonitorHolder { /// the ChannelManager re-adding the same payment entry, before the same block is replayed, /// resulting in a duplicate PaymentSent event. pending_monitor_updates: Mutex>, - /// When the user returns a PermanentFailure error from an update_persisted_channel call during - /// block processing, we inform the ChannelManager that the channel should be closed - /// asynchronously. In order to ensure no further changes happen before the ChannelManager has - /// processed the closure event, we set this to true and return PermanentFailure for any other - /// chain::Watch events. - channel_perm_failed: AtomicBool, /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present /// in `pending_monitor_updates`. /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain @@ -286,11 +316,19 @@ where C::Target: chain::Filter, where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; let funding_outpoints: HashSet = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned()); for funding_outpoint in funding_outpoints.iter() { let monitor_lock = self.monitors.read().unwrap(); if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { - self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state); + if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(monitor_lock); + let _poison = self.monitors.write().unwrap(); + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); + } } } @@ -298,7 +336,10 @@ where C::Target: chain::Filter, let monitor_states = self.monitors.write().unwrap(); for (funding_outpoint, monitor_state) in monitor_states.iter() { if !funding_outpoints.contains(funding_outpoint) { - self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state); + if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() { + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); + } } } @@ -313,7 +354,10 @@ where C::Target: chain::Filter, } } - fn update_monitor_with_chain_data(&self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder) where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + fn update_monitor_with_chain_data( + &self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, + process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder + ) -> Result<(), ()> where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let monitor = &monitor_state.monitor; let mut txn_outputs; { @@ -335,15 +379,13 @@ where C::Target: chain::Filter, match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { ChannelMonitorUpdateStatus::Completed => log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), - ChannelMonitorUpdateStatus::PermanentFailure => { - monitor_state.channel_perm_failed.store(true, Ordering::Release); - self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); - self.event_notifier.notify(); - } ChannelMonitorUpdateStatus::InProgress => { log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); - } + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + return Err(()); + }, } } @@ -363,6 +405,7 @@ where C::Target: chain::Filter, } } } + Ok(()) } /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels. @@ -391,7 +434,8 @@ where C::Target: chain::Filter, /// claims which are awaiting confirmation. /// /// Includes the balances from each [`ChannelMonitor`] *except* those included in - /// `ignored_channels`. + /// `ignored_channels`, allowing you to filter out balances from channels which are still open + /// (and whose balance should likely be pulled from the [`ChannelDetails`]). /// /// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for /// inclusion in the return value. @@ -491,9 +535,8 @@ where C::Target: chain::Filter, // `MonitorEvent`s from the monitor back to the `ChannelManager` until they // complete. let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates); - if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) { - // If there are still monitor updates pending (or an old monitor update - // finished after a later one perm-failed), we cannot yet construct an + if monitor_is_pending_updates { + // If there are still monitor updates pending, we cannot yet construct a // Completed event. return Ok(()); } @@ -667,18 +710,12 @@ where C::Target: chain::Filter, L::Target: Logger, P::Target: Persist, { - /// Adds the monitor that watches the channel referred to by the given outpoint. - /// - /// Calls back to [`chain::Filter`] with the funding transaction and outputs to watch. - /// - /// Note that we persist the given `ChannelMonitor` while holding the `ChainMonitor` - /// monitors lock. - fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> ChannelMonitorUpdateStatus { + fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result { let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(funding_outpoint) { hash_map::Entry::Occupied(_) => { log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); - return ChannelMonitorUpdateStatus::PermanentFailure + return Err(()); }, hash_map::Entry::Vacant(e) => e, }; @@ -691,13 +728,14 @@ where C::Target: chain::Filter, log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); }, - ChannelMonitorUpdateStatus::PermanentFailure => { - log_error!(self.logger, "Persistence of new ChannelMonitor for channel {} failed", log_funding_info!(monitor)); - return persist_res; - }, ChannelMonitorUpdateStatus::Completed => { log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor)); - } + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); + }, } if let Some(ref chain_source) = self.chain_source { monitor.load_outputs_to_watch(chain_source); @@ -705,63 +743,71 @@ where C::Target: chain::Filter, entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates), - channel_perm_failed: AtomicBool::new(false), last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)), }); - persist_res + Ok(persist_res) } - /// Note that we persist the given `ChannelMonitor` update while holding the - /// `ChainMonitor` monitors lock. fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { // Update the monitor that watches the channel referred to by the given outpoint. let monitors = self.monitors.read().unwrap(); - match monitors.get(&funding_txo) { + let ret = match monitors.get(&funding_txo) { None => { log_error!(self.logger, "Failed to update channel monitor: no such monitor registered"); // We should never ever trigger this from within ChannelManager. Technically a // user could use this object with some proxying in between which makes this // possible, but in tests and fuzzing, this should be a panic. - #[cfg(any(test, fuzzing))] + #[cfg(debug_assertions)] panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); - #[cfg(not(any(test, fuzzing)))] - ChannelMonitorUpdateStatus::PermanentFailure + #[cfg(not(debug_assertions))] + ChannelMonitorUpdateStatus::InProgress }, Some(monitor_state) => { let monitor = &monitor_state.monitor; log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor)); - let update_res = monitor.update_monitor(update, &self.broadcaster, &*self.fee_estimator, &self.logger); - if update_res.is_err() { - log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor)); - } - // Even if updating the monitor returns an error, the monitor's state will - // still be changed. So, persist the updated monitor despite the error. + let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger); + let update_id = MonitorUpdateId::from_monitor_update(update); let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - let persist_res = self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id); + let persist_res = if update_res.is_err() { + // Even if updating the monitor returns an error, the monitor's state will + // still be changed. Therefore, we should persist the updated monitor despite the error. + // We don't want to persist a `monitor_update` which results in a failure to apply later + // while reading `channel_monitor` with updates from storage. Instead, we should persist + // the entire `channel_monitor` here. + log_warn!(self.logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor)); + self.persister.update_persisted_channel(funding_txo, None, monitor, update_id) + } else { + self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id) + }; match persist_res { ChannelMonitorUpdateStatus::InProgress => { pending_monitor_updates.push(update_id); log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor)); }, - ChannelMonitorUpdateStatus::PermanentFailure => { - monitor_state.channel_perm_failed.store(true, Ordering::Release); - log_error!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} failed", log_funding_info!(monitor)); - }, ChannelMonitorUpdateStatus::Completed => { log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor)); }, + ChannelMonitorUpdateStatus::UnrecoverableError => { /* we'll panic in a moment */ }, } if update_res.is_err() { - ChannelMonitorUpdateStatus::PermanentFailure - } else if monitor_state.channel_perm_failed.load(Ordering::Acquire) { - ChannelMonitorUpdateStatus::PermanentFailure + ChannelMonitorUpdateStatus::InProgress } else { persist_res } } + }; + if let ChannelMonitorUpdateStatus::UnrecoverableError = ret { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(self.logger, "{}", err_str); + panic!("{}", err_str); } + ret } fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec, Option)> { @@ -774,17 +820,6 @@ where C::Target: chain::Filter, { log_debug!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!"); } else { - if monitor_state.channel_perm_failed.load(Ordering::Acquire) { - // If a `UpdateOrigin::ChainSync` persistence failed with `PermanantFailure`, - // we don't really know if the latest `ChannelMonitor` state is on disk or not. - // We're supposed to hold monitor updates until the latest state is on disk to - // avoid duplicate events, but the user told us persistence is screw-y and may - // not complete. We can't hold events forever because we may learn some payment - // preimage, so instead we just log and hope the user complied with the - // `PermanentFailure` requirements of having at least the local-disk copy - // updated. - log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!"); - } if is_pending_monitor_update { log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS); log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released."); @@ -831,12 +866,12 @@ impl