Add an `UnrecoverableError` variant to `ChannelMonitorUpdateStatus`
[rust-lightning] / lightning / src / chain / chainmonitor.rs
index 7dbf308a568c11c4abf45bc07ef1653aff6c4b51..b6909cb3e416890895367532455edb127fdc038c 100644 (file)
@@ -78,26 +78,48 @@ impl MonitorUpdateId {
 /// `Persist` defines behavior for persisting channel monitors: this could mean
 /// writing once to disk, and/or uploading to one or more backup services.
 ///
-/// Each method can return two possible values:
-///  * If persistence (including any relevant `fsync()` calls) happens immediately, the
-///    implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal
-///    channel operation should continue.
+/// Persistence can happen in one of two ways - synchronously completing before the trait method
+/// calls return or asynchronously in the background.
 ///
-///  * If persistence happens asynchronously, implementations can return
-///    [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background.
-///    Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with
-///    the corresponding [`MonitorUpdateId`].
+/// # For those implementing synchronous persistence
 ///
-///    Note that unlike the direct [`chain::Watch`] interface,
-///    [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
+///  * If persistence completes fully (including any relevant `fsync()` calls), the implementation
+///    should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal channel operation
+///    should continue.
 ///
-///    If persistence fails for some reason, implementations should still return
-///    [`ChannelMonitorUpdateStatus::InProgress`] and attempt to shut down or otherwise resolve the
-///    situation ASAP.
+///  * If persistence fails for some reason, implementations should consider returning
+///    [`ChannelMonitorUpdateStatus::InProgress`] and retry all pending persistence operations in
+///    the background with [`ChainMonitor::list_pending_monitor_updates`] and
+///    [`ChainMonitor::get_monitor`].
 ///
-/// Third-party watchtowers may be built as a part of an implementation of this trait, with the
-/// advantage that you can control whether to resume channel operation depending on if an update
-/// has been persisted to a watchtower. For this, you may find the following methods useful:
+///    Once a full [`ChannelMonitor`] has been persisted, all pending updates for that channel can
+///    be marked as complete via [`ChainMonitor::channel_monitor_updated`].
+///
+///    If at some point no further progress can be made towards persisting the pending updates, the
+///    node should simply shut down.
+///
+///  * If the persistence has failed and cannot be retried further (e.g. because of some timeout),
+///    [`ChannelMonitorUpdateStatus::UnrecoverableError`] can be used, though this will result in
+///    an immediate panic and future operations in LDK generally failing.
+///
+/// # For those implementing asynchronous persistence
+///
+///  All calls should generally spawn a background task and immediately return
+///  [`ChannelMonitorUpdateStatus::InProgress`]. Once the update completes,
+///  [`ChainMonitor::channel_monitor_updated`] should be called with the corresponding
+///  [`MonitorUpdateId`].
+///
+///  Note that unlike the direct [`chain::Watch`] interface,
+///  [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
+///
+///  If at some point no further progress can be made towards persisting a pending update, the node
+///  should simply shut down.
+///
+/// # Using remote watchtowers
+///
+/// Watchtowers may be updated as a part of an implementation of this trait, utilizing the async
+/// update process described above while the watchtower is being updated. The following methods are
+/// provided for bulding transactions for a watchtower:
 /// [`ChannelMonitor::initial_counterparty_commitment_tx`],
 /// [`ChannelMonitor::counterparty_commitment_txs_from_update`],
 /// [`ChannelMonitor::sign_to_local_justice_tx`], [`TrustedCommitmentTransaction::revokeable_output_index`],
@@ -279,11 +301,20 @@ where C::Target: chain::Filter,
        where
                FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
        {
+               let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
                let funding_outpoints: HashSet<OutPoint> = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned());
                for funding_outpoint in funding_outpoints.iter() {
                        let monitor_lock = self.monitors.read().unwrap();
                        if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
-                               self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state);
+                               if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() {
+                                       // Take the monitors lock for writing so that we poison it and any future
+                                       // operations going forward fail immediately.
+                                       core::mem::drop(monitor_state);
+                                       core::mem::drop(monitor_lock);
+                                       let _poison = self.monitors.write().unwrap();
+                                       log_error!(self.logger, "{}", err_str);
+                                       panic!("{}", err_str);
+                               }
                        }
                }
 
@@ -291,7 +322,10 @@ where C::Target: chain::Filter,
                let monitor_states = self.monitors.write().unwrap();
                for (funding_outpoint, monitor_state) in monitor_states.iter() {
                        if !funding_outpoints.contains(funding_outpoint) {
-                               self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state);
+                               if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() {
+                                       log_error!(self.logger, "{}", err_str);
+                                       panic!("{}", err_str);
+                               }
                        }
                }
 
@@ -306,7 +340,10 @@ where C::Target: chain::Filter,
                }
        }
 
-       fn update_monitor_with_chain_data<FN>(&self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>) where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
+       fn update_monitor_with_chain_data<FN>(
+               &self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData,
+               process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>
+       ) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
                let monitor = &monitor_state.monitor;
                let mut txn_outputs;
                {
@@ -331,7 +368,10 @@ where C::Target: chain::Filter,
                                ChannelMonitorUpdateStatus::InProgress => {
                                        log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
                                        pending_monitor_updates.push(update_id);
-                               }
+                               },
+                               ChannelMonitorUpdateStatus::UnrecoverableError => {
+                                       return Err(());
+                               },
                        }
                }
 
@@ -351,6 +391,7 @@ where C::Target: chain::Filter,
                                }
                        }
                }
+               Ok(())
        }
 
        /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
@@ -674,7 +715,12 @@ where C::Target: chain::Filter,
                        },
                        ChannelMonitorUpdateStatus::Completed => {
                                log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
-                       }
+                       },
+                       ChannelMonitorUpdateStatus::UnrecoverableError => {
+                               let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
+                               log_error!(self.logger, "{}", err_str);
+                               panic!("{}", err_str);
+                       },
                }
                if let Some(ref chain_source) = self.chain_source {
                        monitor.load_outputs_to_watch(chain_source);
@@ -690,7 +736,7 @@ where C::Target: chain::Filter,
        fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
                // Update the monitor that watches the channel referred to by the given outpoint.
                let monitors = self.monitors.read().unwrap();
-               match monitors.get(&funding_txo) {
+               let ret = match monitors.get(&funding_txo) {
                        None => {
                                log_error!(self.logger, "Failed to update channel monitor: no such monitor registered");
 
@@ -722,6 +768,7 @@ where C::Target: chain::Filter,
                                        ChannelMonitorUpdateStatus::Completed => {
                                                log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
                                        },
+                                       ChannelMonitorUpdateStatus::UnrecoverableError => { /* we'll panic in a moment */ },
                                }
                                if update_res.is_err() {
                                        ChannelMonitorUpdateStatus::InProgress
@@ -729,7 +776,17 @@ where C::Target: chain::Filter,
                                        persist_res
                                }
                        }
+               };
+               if let ChannelMonitorUpdateStatus::UnrecoverableError = ret {
+                       // Take the monitors lock for writing so that we poison it and any future
+                       // operations going forward fail immediately.
+                       core::mem::drop(monitors);
+                       let _poison = self.monitors.write().unwrap();
+                       let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
+                       log_error!(self.logger, "{}", err_str);
+                       panic!("{}", err_str);
                }
+               ret
        }
 
        fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
@@ -973,4 +1030,26 @@ mod tests {
                do_chainsync_pauses_events(false);
                do_chainsync_pauses_events(true);
        }
+
+       #[test]
+       #[cfg(feature = "std")]
+       fn update_during_chainsync_poisons_channel() {
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+               create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
+               chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::UnrecoverableError);
+
+               assert!(std::panic::catch_unwind(|| {
+                       // Returning an UnrecoverableError should always panic immediately
+                       connect_blocks(&nodes[0], 1);
+               }).is_err());
+               assert!(std::panic::catch_unwind(|| {
+                       // ...and also poison our locks causing later use to panic as well
+                       core::mem::drop(nodes);
+               }).is_err());
+       }
 }