Rework `chain::Watch` return types to make async updates less scary
[rust-lightning] / lightning / src / chain / chainmonitor.rs
index 9f15a2a27996ba1e40faf27a97f80c27fb434659..2479e8f7e7b5efd00582d2e9aaac7ecd9cfa9f31 100644 (file)
@@ -27,7 +27,7 @@ use bitcoin::blockdata::block::BlockHeader;
 use bitcoin::hash_types::Txid;
 
 use chain;
-use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput};
+use chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
 use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
 use chain::transaction::{OutPoint, TransactionData};
@@ -78,20 +78,21 @@ impl MonitorUpdateId {
 ///
 /// Each method can return three possible values:
 ///  * If persistence (including any relevant `fsync()` calls) happens immediately, the
-///    implementation should return `Ok(())`, indicating normal channel operation should continue.
+///    implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal
+///    channel operation should continue.
 ///  * If persistence happens asynchronously, implementations should first ensure the
 ///    [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return
-///    `Err(ChannelMonitorUpdateErr::TemporaryFailure)` while the update continues in the
-///    background. Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be
-///    called with the corresponding [`MonitorUpdateId`].
+///    [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background.
+///    Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with
+///    the corresponding [`MonitorUpdateId`].
 ///
 ///    Note that unlike the direct [`chain::Watch`] interface,
 ///    [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
 ///
 ///  * If persistence fails for some reason, implementations should return
-///    `Err(ChannelMonitorUpdateErr::PermanentFailure)`, in which case the channel will likely be
+///    [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be
 ///    closed without broadcasting the latest state. See
-///    [`ChannelMonitorUpdateErr::PermanentFailure`] for more details.
+///    [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details.
 pub trait Persist<ChannelSigner: Sign> {
        /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is
        /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup.
@@ -101,14 +102,14 @@ pub trait Persist<ChannelSigner: Sign> {
        /// and the stored channel data). Note that you **must** persist every new monitor to disk.
        ///
        /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`],
-       /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`].
+       /// if you return [`ChannelMonitorUpdateStatus::InProgress`].
        ///
        /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`
-       /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors.
+       /// and [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
        ///
        /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
        /// [`Writeable::write`]: crate::util::ser::Writeable::write
-       fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
+       fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
 
        /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
        /// update.
@@ -136,14 +137,14 @@ pub trait Persist<ChannelSigner: Sign> {
        /// whereas updates are small and `O(1)`.
        ///
        /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`],
-       /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`].
+       /// if you return [`ChannelMonitorUpdateStatus::InProgress`].
        ///
        /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`,
        /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and
-       /// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
+       /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
        ///
        /// [`Writeable::write`]: crate::util::ser::Writeable::write
-       fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
+       fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
 }
 
 struct MonitorHolder<ChannelSigner: Sign> {
@@ -151,9 +152,9 @@ struct MonitorHolder<ChannelSigner: Sign> {
        /// The full set of pending monitor updates for this Channel.
        ///
        /// Note that this lock must be held during updates to prevent a race where we call
-       /// update_persisted_channel, the user returns a TemporaryFailure, and then calls
-       /// channel_monitor_updated immediately, racing our insertion of the pending update into the
-       /// contained Vec.
+       /// update_persisted_channel, the user returns a
+       /// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated
+       /// immediately, racing our insertion of the pending update into the contained Vec.
        ///
        /// Beyond the synchronization of updates themselves, we cannot handle user events until after
        /// any chain updates have been stored on disk. Thus, we scan this list when returning updates
@@ -287,20 +288,20 @@ where C::Target: chain::Filter,
                                        if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
                                                // If there are not ChainSync persists awaiting completion, go ahead and
                                                // set last_chain_persist_height here - we wouldn't want the first
-                                               // TemporaryFailure to always immediately be considered "overly delayed".
+                                               // InProgress to always immediately be considered "overly delayed".
                                                monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
                                        }
                                }
 
                                log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
                                match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
-                                       Ok(()) =>
+                                       ChannelMonitorUpdateStatus::Completed =>
                                                log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
-                                       Err(ChannelMonitorUpdateErr::PermanentFailure) => {
+                                       ChannelMonitorUpdateStatus::PermanentFailure => {
                                                monitor_state.channel_perm_failed.store(true, Ordering::Release);
                                                self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
                                        },
-                                       Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
+                                       ChannelMonitorUpdateStatus::InProgress => {
                                                log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
                                                pending_monitor_updates.push(update_id);
                                        },
@@ -400,12 +401,12 @@ where C::Target: chain::Filter,
        }
 
        /// Indicates the persistence of a [`ChannelMonitor`] has completed after
-       /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation.
+       /// [`ChannelMonitorUpdateStatus::InProgress`] was returned from an update operation.
        ///
        /// Thus, the anticipated use is, at a high level:
        ///  1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
        ///     update to disk and begins updating any remote (e.g. watchtower/backup) copies,
-       ///     returning [`ChannelMonitorUpdateErr::TemporaryFailure`],
+       ///     returning [`ChannelMonitorUpdateStatus::InProgress`],
        ///  2) once all remote copies are updated, you call this function with the
        ///     `completed_update_id` that completed, and once all pending updates have completed the
        ///     channel will be re-enabled.
@@ -438,10 +439,10 @@ where C::Target: chain::Filter,
                                if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) {
                                        // If there are still monitor updates pending (or an old monitor update
                                        // finished after a later one perm-failed), we cannot yet construct an
-                                       // UpdateCompleted event.
+                                       // Completed event.
                                        return Ok(());
                                }
-                               self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
+                               self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
                                        funding_txo,
                                        monitor_update_id: monitor_data.monitor.get_latest_update_id(),
                                }], monitor_data.monitor.get_counterparty_node_id()));
@@ -464,7 +465,7 @@ where C::Target: chain::Filter,
        pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
                let monitors = self.monitors.read().unwrap();
                let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id());
-               self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
+               self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
                        funding_txo,
                        monitor_update_id,
                }], counterparty_node_id));
@@ -570,27 +571,31 @@ where C::Target: chain::Filter,
        ///
        /// Note that we persist the given `ChannelMonitor` while holding the `ChainMonitor`
        /// monitors lock.
-       fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr> {
+       fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> ChannelMonitorUpdateStatus {
                let mut monitors = self.monitors.write().unwrap();
                let entry = match monitors.entry(funding_outpoint) {
                        hash_map::Entry::Occupied(_) => {
                                log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
-                               return Err(ChannelMonitorUpdateErr::PermanentFailure)},
+                               return ChannelMonitorUpdateStatus::PermanentFailure
+                       },
                        hash_map::Entry::Vacant(e) => e,
                };
                log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
                let update_id = MonitorUpdateId::from_new_monitor(&monitor);
                let mut pending_monitor_updates = Vec::new();
                let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id);
-               if persist_res.is_err() {
-                       log_error!(self.logger, "Failed to persist new ChannelMonitor for channel {}: {:?}", log_funding_info!(monitor), persist_res);
-               } else {
-                       log_trace!(self.logger, "Finished persisting new ChannelMonitor for channel {}", log_funding_info!(monitor));
-               }
-               if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) {
-                       return persist_res;
-               } else if persist_res.is_err() {
-                       pending_monitor_updates.push(update_id);
+               match persist_res {
+                       ChannelMonitorUpdateStatus::InProgress => {
+                               log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
+                               pending_monitor_updates.push(update_id);
+                       },
+                       ChannelMonitorUpdateStatus::PermanentFailure => {
+                               log_error!(self.logger, "Persistence of new ChannelMonitor for channel {} failed", log_funding_info!(monitor));
+                               return persist_res;
+                       },
+                       ChannelMonitorUpdateStatus::Completed => {
+                               log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
+                       }
                }
                if let Some(ref chain_source) = self.chain_source {
                        monitor.load_outputs_to_watch(chain_source);
@@ -606,7 +611,7 @@ where C::Target: chain::Filter,
 
        /// Note that we persist the given `ChannelMonitor` update while holding the
        /// `ChainMonitor` monitors lock.
-       fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> {
+       fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
                // Update the monitor that watches the channel referred to by the given outpoint.
                let monitors = self.monitors.read().unwrap();
                match monitors.get(&funding_txo) {
@@ -619,7 +624,7 @@ where C::Target: chain::Filter,
                                #[cfg(any(test, fuzzing))]
                                panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
                                #[cfg(not(any(test, fuzzing)))]
-                               Err(ChannelMonitorUpdateErr::PermanentFailure)
+                               ChannelMonitorUpdateStatus::PermanentFailure
                        },
                        Some(monitor_state) => {
                                let monitor = &monitor_state.monitor;
@@ -633,20 +638,23 @@ where C::Target: chain::Filter,
                                let update_id = MonitorUpdateId::from_monitor_update(&update);
                                let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
                                let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id);
-                               if let Err(e) = persist_res {
-                                       if e == ChannelMonitorUpdateErr::TemporaryFailure {
+                               match persist_res {
+                                       ChannelMonitorUpdateStatus::InProgress => {
                                                pending_monitor_updates.push(update_id);
-                                       } else {
+                                               log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
+                                       },
+                                       ChannelMonitorUpdateStatus::PermanentFailure => {
                                                monitor_state.channel_perm_failed.store(true, Ordering::Release);
-                                       }
-                                       log_error!(self.logger, "Failed to persist ChannelMonitor update for channel {}: {:?}", log_funding_info!(monitor), e);
-                               } else {
-                                       log_trace!(self.logger, "Finished persisting ChannelMonitor update for channel {}", log_funding_info!(monitor));
+                                               log_error!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} failed", log_funding_info!(monitor));
+                                       },
+                                       ChannelMonitorUpdateStatus::Completed => {
+                                               log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
+                                       },
                                }
                                if update_res.is_err() {
-                                       Err(ChannelMonitorUpdateErr::PermanentFailure)
+                                       ChannelMonitorUpdateStatus::PermanentFailure
                                } else if monitor_state.channel_perm_failed.load(Ordering::Acquire) {
-                                       Err(ChannelMonitorUpdateErr::PermanentFailure)
+                                       ChannelMonitorUpdateStatus::PermanentFailure
                                } else {
                                        persist_res
                                }
@@ -723,7 +731,7 @@ mod tests {
        use ::{check_added_monitors, check_closed_broadcast, check_closed_event};
        use ::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
        use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
-       use chain::{ChannelMonitorUpdateErr, Confirm, Watch};
+       use chain::{ChannelMonitorUpdateStatus, Confirm, Watch};
        use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
        use ln::channelmanager::{self, PaymentSendFailure};
        use ln::functional_test_utils::*;
@@ -747,7 +755,7 @@ mod tests {
                let (payment_preimage_2, payment_hash_2, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
 
                chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
-               chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
+               chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
 
                nodes[1].node.claim_funds(payment_preimage_1);
                check_added_monitors!(nodes[1], 1);
@@ -756,7 +764,7 @@ mod tests {
                check_added_monitors!(nodes[1], 1);
                expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
 
-               chanmon_cfgs[1].persister.set_update_ret(Ok(()));
+               chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
 
                let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
                assert_eq!(persistences.len(), 1);
@@ -834,7 +842,7 @@ mod tests {
 
                // Temp-fail the block connection which will hold the channel-closed event
                chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
-               chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
+               chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
 
                // Connect B's commitment transaction, but only to the ChainMonitor/ChannelMonitor. The
                // channel is now closed, but the ChannelManager doesn't know that yet.
@@ -850,7 +858,7 @@ mod tests {
 
                // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass
                // the update through to the ChannelMonitor which will refuse it (as the channel is closed).
-               chanmon_cfgs[0].persister.set_update_ret(Ok(()));
+               chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
                unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret)),
                        true, APIError::ChannelUnavailable { ref err },
                        assert!(err.contains("ChannelMonitor storage failure")));
@@ -896,7 +904,7 @@ mod tests {
                create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
 
                chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
-               chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::PermanentFailure));
+               chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
 
                connect_blocks(&nodes[0], 1);
                // Before processing events, the ChannelManager will still think the Channel is open and