Merge pull request #2621 from G8XSU/dont-persist-erroneous-update
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Fri, 29 Sep 2023 23:45:57 +0000 (23:45 +0000)
committerGitHub <noreply@github.com>
Fri, 29 Sep 2023 23:45:57 +0000 (23:45 +0000)
Persist entire monitor if there is an error while applying monitor_update

1  2 
lightning/src/chain/chainmonitor.rs
lightning/src/util/test_utils.rs

index cf51dbab72019d9eb87748e0817171c1c0abfb35,d70ebd40d667ad840c012197484d4f94d1f2ced4..261c0471ca4089dfaa1d21ca0e8c8f810f90c7a5
@@@ -47,23 -47,35 +47,35 @@@ use core::ops::Deref
  use core::sync::atomic::{AtomicUsize, Ordering};
  use bitcoin::secp256k1::PublicKey;
  
- #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
- /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
- /// entirely opaque.
- enum UpdateOrigin {
-       /// An update that was generated by the `ChannelManager` (via our `chain::Watch`
-       /// implementation). This corresponds to an actual [`ChannelMonitorUpdate::update_id`] field
-       /// and [`ChannelMonitor::get_latest_update_id`].
-       OffChain(u64),
-       /// An update that was generated during blockchain processing. The ID here is specific to the
-       /// generating [`ChainMonitor`] and does *not* correspond to any on-disk IDs.
-       ChainSync(u64),
+ mod update_origin {
+       #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
+       /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
+       /// entirely opaque.
+       pub(crate) enum UpdateOrigin {
+               /// An update that was generated by the `ChannelManager` (via our [`crate::chain::Watch`]
+               /// implementation). This corresponds to an actual [ChannelMonitorUpdate::update_id] field
+               /// and [ChannelMonitor::get_latest_update_id].
+               ///
+               /// [ChannelMonitor::get_latest_update_id]: crate::chain::channelmonitor::ChannelMonitor::get_latest_update_id
+               /// [ChannelMonitorUpdate::update_id]: crate::chain::channelmonitor::ChannelMonitorUpdate::update_id
+               OffChain(u64),
+               /// An update that was generated during blockchain processing. The ID here is specific to the
+               /// generating [ChannelMonitor] and does *not* correspond to any on-disk IDs.
+               ///
+               /// [ChannelMonitor]: crate::chain::channelmonitor::ChannelMonitor
+               ChainSync(u64),
+       }
  }
  
+ #[cfg(any(feature = "_test_utils", test))]
+ pub(crate) use update_origin::UpdateOrigin;
+ #[cfg(not(any(feature = "_test_utils", test)))]
+ use update_origin::UpdateOrigin;
  /// An opaque identifier describing a specific [`Persist`] method call.
  #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
  pub struct MonitorUpdateId {
-       contents: UpdateOrigin,
+       pub(crate) contents: UpdateOrigin,
  }
  
  impl MonitorUpdateId {
  ///    If at some point no further progress can be made towards persisting the pending updates, the
  ///    node should simply shut down.
  ///
 -///  * If the persistence has failed and cannot be retried further (e.g. because of some timeout),
 +///  * If the persistence has failed and cannot be retried further (e.g. because of an outage),
  ///    [`ChannelMonitorUpdateStatus::UnrecoverableError`] can be used, though this will result in
  ///    an immediate panic and future operations in LDK generally failing.
  ///
  ///  [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
  ///
  ///  If at some point no further progress can be made towards persisting a pending update, the node
 -///  should simply shut down.
 +///  should simply shut down. Until then, the background task should either loop indefinitely, or
 +///  persistence should be regularly retried with [`ChainMonitor::list_pending_monitor_updates`]
 +///  and [`ChainMonitor::get_monitor`] (note that if a full monitor is persisted all pending
 +///  monitor updates may be marked completed).
  ///
  /// # Using remote watchtowers
  ///
@@@ -155,8 -164,8 +167,8 @@@ pub trait Persist<ChannelSigner: Writea
        /// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more
        /// details.
        ///
-       /// During blockchain synchronization operations, this may be called with no
-       /// [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted.
+       /// During blockchain synchronization operations, and in some rare cases, this may be called with
+       /// no [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted.
        /// Note that after the full [`ChannelMonitor`] is persisted any previous
        /// [`ChannelMonitorUpdate`]s which were persisted should be discarded - they can no longer be
        /// applied to the persisted [`ChannelMonitor`] as they were already applied.
@@@ -423,8 -432,7 +435,8 @@@ where C::Target: chain::Filter
        /// claims which are awaiting confirmation.
        ///
        /// Includes the balances from each [`ChannelMonitor`] *except* those included in
 -      /// `ignored_channels`.
 +      /// `ignored_channels`, allowing you to filter out balances from channels which are still open
 +      /// (and whose balance should likely be pulled from the [`ChannelDetails`]).
        ///
        /// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for
        /// inclusion in the return value.
@@@ -756,14 -764,20 +768,20 @@@ where C::Target: chain::Filter
                                let monitor = &monitor_state.monitor;
                                log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
                                let update_res = monitor.update_monitor(update, &self.broadcaster, &*self.fee_estimator, &self.logger);
-                               if update_res.is_err() {
-                                       log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor));
-                               }
-                               // Even if updating the monitor returns an error, the monitor's state will
-                               // still be changed. So, persist the updated monitor despite the error.
                                let update_id = MonitorUpdateId::from_monitor_update(update);
                                let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
-                               let persist_res = self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id);
+                               let persist_res = if update_res.is_err() {
+                                       // Even if updating the monitor returns an error, the monitor's state will
+                                       // still be changed. Therefore, we should persist the updated monitor despite the error.
+                                       // We don't want to persist a `monitor_update` which results in a failure to apply later
+                                       // while reading `channel_monitor` with updates from storage. Instead, we should persist
+                                       // the entire `channel_monitor` here.
+                                       log_warn!(self.logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
+                                       self.persister.update_persisted_channel(funding_txo, None, monitor, update_id)
+                               } else {
+                                       self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id)
+                               };
                                match persist_res {
                                        ChannelMonitorUpdateStatus::InProgress => {
                                                pending_monitor_updates.push(update_id);
index b912221fac372da5247879ff62fd7d506d655a6b,bc9c0a32dd05c82019ff7cb21218e3a27a1001b1..4bcaca57f81ca61cd2af4d499d6ffbfa4f1b51e0
@@@ -13,7 -13,7 +13,7 @@@ use crate::chain::chaininterface
  use crate::chain::chaininterface::ConfirmationTarget;
  use crate::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW;
  use crate::chain::chainmonitor;
- use crate::chain::chainmonitor::MonitorUpdateId;
+ use crate::chain::chainmonitor::{MonitorUpdateId, UpdateOrigin};
  use crate::chain::channelmonitor;
  use crate::chain::channelmonitor::MonitorEvent;
  use crate::chain::transaction::OutPoint;
@@@ -207,9 -207,6 +207,9 @@@ pub struct TestChainMonitor<'a> 
        /// ChannelForceClosed event for the given channel_id with should_broadcast set to the given
        /// boolean.
        pub expect_channel_force_closed: Mutex<Option<(ChannelId, bool)>>,
 +      /// If this is set to Some(), the next round trip serialization check will not hold after an
 +      /// update_channel call (not watch_channel) for the given channel_id.
 +      pub expect_monitor_round_trip_fail: Mutex<Option<ChannelId>>,
  }
  impl<'a> TestChainMonitor<'a> {
        pub fn new(chain_source: Option<&'a TestChainSource>, broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, persister: &'a chainmonitor::Persist<TestChannelSigner>, keys_manager: &'a TestKeysInterface) -> Self {
                        chain_monitor: chainmonitor::ChainMonitor::new(chain_source, broadcaster, logger, fee_estimator, persister),
                        keys_manager,
                        expect_channel_force_closed: Mutex::new(None),
 +                      expect_monitor_round_trip_fail: Mutex::new(None),
                }
        }
  
@@@ -271,12 -267,7 +271,12 @@@ impl<'a> chain::Watch<TestChannelSigner
                monitor.write(&mut w).unwrap();
                let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::read(
                        &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager)).unwrap().1;
 -              assert!(new_monitor == *monitor);
 +              if let Some(chan_id) = self.expect_monitor_round_trip_fail.lock().unwrap().take() {
 +                      assert_eq!(chan_id, funding_txo.to_channel_id());
 +                      assert!(new_monitor != *monitor);
 +              } else {
 +                      assert!(new_monitor == *monitor);
 +              }
                self.added_monitors.lock().unwrap().push((funding_txo, new_monitor));
                update_res
        }
@@@ -428,7 -419,8 +428,8 @@@ impl<Signer: sign::WriteableEcdsaChanne
                if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() {
                        ret = update_ret;
                }
-               if update.is_none() {
+               let is_chain_sync = if let UpdateOrigin::ChainSync(_) = update_id.contents { true } else { false };
+               if is_chain_sync {
                        self.chain_sync_monitor_persistences.lock().unwrap().entry(funding_txo).or_insert(HashSet::new()).insert(update_id);
                } else {
                        self.offchain_monitor_updates.lock().unwrap().entry(funding_txo).or_insert(HashSet::new()).insert(update_id);