Merge pull request #1806 from arik-so/2022-10-background-processor-deparametrization
[rust-lightning] / lightning / src / chain / chainmonitor.rs
index fc7e50a1cb26c2a8f3dff74b4d4f77f186fe148c..17fe69182ee81d060bc5c7c8c092608770104c84 100644 (file)
 //! events. The remote server would make use of [`ChainMonitor`] for block processing and for
 //! servicing [`ChannelMonitor`] updates from the client.
 
-use bitcoin::blockdata::block::{Block, BlockHeader};
-use bitcoin::hash_types::Txid;
-
-use chain;
-use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput};
-use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
-use chain::transaction::{OutPoint, TransactionData};
-use chain::keysinterface::Sign;
-use util::atomic_counter::AtomicCounter;
-use util::logger::Logger;
-use util::errors::APIError;
-use util::events;
-use util::events::EventHandler;
-use ln::channelmanager::ChannelDetails;
-
-use prelude::*;
-use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
+use bitcoin::blockdata::block::BlockHeader;
+use bitcoin::hash_types::{Txid, BlockHash};
+
+use crate::chain;
+use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
+use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
+use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
+use crate::chain::transaction::{OutPoint, TransactionData};
+use crate::chain::keysinterface::Sign;
+use crate::util::atomic_counter::AtomicCounter;
+use crate::util::logger::Logger;
+use crate::util::errors::APIError;
+use crate::util::events;
+use crate::util::events::{Event, EventHandler};
+use crate::ln::channelmanager::ChannelDetails;
+
+use crate::prelude::*;
+use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
 use core::ops::Deref;
 use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use bitcoin::secp256k1::PublicKey;
 
 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
 /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
@@ -77,20 +78,21 @@ impl MonitorUpdateId {
 ///
 /// Each method can return three possible values:
 ///  * If persistence (including any relevant `fsync()` calls) happens immediately, the
-///    implementation should return `Ok(())`, indicating normal channel operation should continue.
+///    implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal
+///    channel operation should continue.
 ///  * If persistence happens asynchronously, implementations should first ensure the
 ///    [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return
-///    `Err(ChannelMonitorUpdateErr::TemporaryFailure)` while the update continues in the
-///    background. Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be
-///    called with the corresponding [`MonitorUpdateId`].
+///    [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background.
+///    Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with
+///    the corresponding [`MonitorUpdateId`].
 ///
 ///    Note that unlike the direct [`chain::Watch`] interface,
 ///    [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
 ///
 ///  * If persistence fails for some reason, implementations should return
-///    `Err(ChannelMonitorUpdateErr::PermanentFailure)`, in which case the channel will likely be
+///    [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be
 ///    closed without broadcasting the latest state. See
-///    [`ChannelMonitorUpdateErr::PermanentFailure`] for more details.
+///    [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details.
 pub trait Persist<ChannelSigner: Sign> {
        /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is
        /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup.
@@ -100,14 +102,14 @@ pub trait Persist<ChannelSigner: Sign> {
        /// and the stored channel data). Note that you **must** persist every new monitor to disk.
        ///
        /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`],
-       /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`].
+       /// if you return [`ChannelMonitorUpdateStatus::InProgress`].
        ///
        /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`
-       /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors.
+       /// and [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
        ///
        /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
        /// [`Writeable::write`]: crate::util::ser::Writeable::write
-       fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
+       fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
 
        /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
        /// update.
@@ -135,14 +137,14 @@ pub trait Persist<ChannelSigner: Sign> {
        /// whereas updates are small and `O(1)`.
        ///
        /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`],
-       /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`].
+       /// if you return [`ChannelMonitorUpdateStatus::InProgress`].
        ///
        /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`,
        /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and
-       /// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
+       /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
        ///
        /// [`Writeable::write`]: crate::util::ser::Writeable::write
-       fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
+       fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
 }
 
 struct MonitorHolder<ChannelSigner: Sign> {
@@ -150,9 +152,9 @@ struct MonitorHolder<ChannelSigner: Sign> {
        /// The full set of pending monitor updates for this Channel.
        ///
        /// Note that this lock must be held during updates to prevent a race where we call
-       /// update_persisted_channel, the user returns a TemporaryFailure, and then calls
-       /// channel_monitor_updated immediately, racing our insertion of the pending update into the
-       /// contained Vec.
+       /// update_persisted_channel, the user returns a
+       /// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated
+       /// immediately, racing our insertion of the pending update into the contained Vec.
        ///
        /// Beyond the synchronization of updates themselves, we cannot handle user events until after
        /// any chain updates have been stored on disk. Thus, we scan this list when returning updates
@@ -235,7 +237,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
        persister: P,
        /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
        /// from the user and not from a [`ChannelMonitor`].
-       pending_monitor_events: Mutex<Vec<MonitorEvent>>,
+       pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
        /// The best block height seen, used as a proxy for the passage of time.
        highest_chain_height: AtomicUsize,
 }
@@ -262,82 +264,67 @@ where C::Target: chain::Filter,
        where
                FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
        {
-               let mut dependent_txdata = Vec::new();
-               {
-                       let monitor_states = self.monitors.write().unwrap();
-                       if let Some(height) = best_height {
-                               // If the best block height is being updated, update highest_chain_height under the
-                               // monitors write lock.
-                               let old_height = self.highest_chain_height.load(Ordering::Acquire);
-                               let new_height = height as usize;
-                               if new_height > old_height {
-                                       self.highest_chain_height.store(new_height, Ordering::Release);
-                               }
+               let monitor_states = self.monitors.write().unwrap();
+               if let Some(height) = best_height {
+                       // If the best block height is being updated, update highest_chain_height under the
+                       // monitors write lock.
+                       let old_height = self.highest_chain_height.load(Ordering::Acquire);
+                       let new_height = height as usize;
+                       if new_height > old_height {
+                               self.highest_chain_height.store(new_height, Ordering::Release);
                        }
+               }
 
-                       for (funding_outpoint, monitor_state) in monitor_states.iter() {
-                               let monitor = &monitor_state.monitor;
-                               let mut txn_outputs;
-                               {
-                                       txn_outputs = process(monitor, txdata);
-                                       let update_id = MonitorUpdateId {
-                                               contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
-                                       };
-                                       let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
-                                       if let Some(height) = best_height {
-                                               if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
-                                                       // If there are not ChainSync persists awaiting completion, go ahead and
-                                                       // set last_chain_persist_height here - we wouldn't want the first
-                                                       // TemporaryFailure to always immediately be considered "overly delayed".
-                                                       monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
-                                               }
+               for (funding_outpoint, monitor_state) in monitor_states.iter() {
+                       let monitor = &monitor_state.monitor;
+                       let mut txn_outputs;
+                       {
+                               txn_outputs = process(monitor, txdata);
+                               let update_id = MonitorUpdateId {
+                                       contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
+                               };
+                               let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
+                               if let Some(height) = best_height {
+                                       if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
+                                               // If there are not ChainSync persists awaiting completion, go ahead and
+                                               // set last_chain_persist_height here - we wouldn't want the first
+                                               // InProgress to always immediately be considered "overly delayed".
+                                               monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
                                        }
+                               }
 
-                                       log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
-                                       match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
-                                               Ok(()) =>
-                                                       log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
-                                               Err(ChannelMonitorUpdateErr::PermanentFailure) => {
-                                                       monitor_state.channel_perm_failed.store(true, Ordering::Release);
-                                                       self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateFailed(*funding_outpoint));
-                                               },
-                                               Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
-                                                       log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
-                                                       pending_monitor_updates.push(update_id);
-                                               },
-                                       }
+                               log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
+                               match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
+                                       ChannelMonitorUpdateStatus::Completed =>
+                                               log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
+                                       ChannelMonitorUpdateStatus::PermanentFailure => {
+                                               monitor_state.channel_perm_failed.store(true, Ordering::Release);
+                                               self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
+                                       },
+                                       ChannelMonitorUpdateStatus::InProgress => {
+                                               log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
+                                               pending_monitor_updates.push(update_id);
+                                       },
                                }
+                       }
 
-                               // Register any new outputs with the chain source for filtering, storing any dependent
-                               // transactions from within the block that previously had not been included in txdata.
-                               if let Some(ref chain_source) = self.chain_source {
-                                       let block_hash = header.block_hash();
-                                       for (txid, mut outputs) in txn_outputs.drain(..) {
-                                               for (idx, output) in outputs.drain(..) {
-                                                       // Register any new outputs with the chain source for filtering and recurse
-                                                       // if it indicates that there are dependent transactions within the block
-                                                       // that had not been previously included in txdata.
-                                                       let output = WatchedOutput {
-                                                               block_hash: Some(block_hash),
-                                                               outpoint: OutPoint { txid, index: idx as u16 },
-                                                               script_pubkey: output.script_pubkey,
-                                                       };
-                                                       if let Some(tx) = chain_source.register_output(output) {
-                                                               dependent_txdata.push(tx);
-                                                       }
-                                               }
+                       // Register any new outputs with the chain source for filtering, storing any dependent
+                       // transactions from within the block that previously had not been included in txdata.
+                       if let Some(ref chain_source) = self.chain_source {
+                               let block_hash = header.block_hash();
+                               for (txid, mut outputs) in txn_outputs.drain(..) {
+                                       for (idx, output) in outputs.drain(..) {
+                                               // Register any new outputs with the chain source for filtering
+                                               let output = WatchedOutput {
+                                                       block_hash: Some(block_hash),
+                                                       outpoint: OutPoint { txid, index: idx as u16 },
+                                                       script_pubkey: output.script_pubkey,
+                                               };
+                                               chain_source.register_output(output)
                                        }
                                }
                        }
                }
-
-               // Recursively call for any dependent transactions that were identified by the chain source.
-               if !dependent_txdata.is_empty() {
-                       dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index);
-                       dependent_txdata.dedup_by_key(|(index, _tx)| *index);
-                       let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect();
-                       self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around
-               }
        }
 
        /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
@@ -408,18 +395,35 @@ where C::Target: chain::Filter,
                self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
        }
 
+       #[cfg(not(c_bindings))]
+       /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
+       pub fn list_pending_monitor_updates(&self) -> HashMap<OutPoint, Vec<MonitorUpdateId>> {
+               self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
+                       (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
+               }).collect()
+       }
+
+       #[cfg(c_bindings)]
+       /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
+       pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec<MonitorUpdateId>)> {
+               self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
+                       (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
+               }).collect()
+       }
+
+
        #[cfg(test)]
        pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor<ChannelSigner> {
                self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
        }
 
        /// Indicates the persistence of a [`ChannelMonitor`] has completed after
-       /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation.
+       /// [`ChannelMonitorUpdateStatus::InProgress`] was returned from an update operation.
        ///
        /// Thus, the anticipated use is, at a high level:
        ///  1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
        ///     update to disk and begins updating any remote (e.g. watchtower/backup) copies,
-       ///     returning [`ChannelMonitorUpdateErr::TemporaryFailure`],
+       ///     returning [`ChannelMonitorUpdateStatus::InProgress`],
        ///  2) once all remote copies are updated, you call this function with the
        ///     `completed_update_id` that completed, and once all pending updates have completed the
        ///     channel will be re-enabled.
@@ -452,13 +456,13 @@ where C::Target: chain::Filter,
                                if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) {
                                        // If there are still monitor updates pending (or an old monitor update
                                        // finished after a later one perm-failed), we cannot yet construct an
-                                       // UpdateCompleted event.
+                                       // Completed event.
                                        return Ok(());
                                }
-                               self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
+                               self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
                                        funding_txo,
                                        monitor_update_id: monitor_data.monitor.get_latest_update_id(),
-                               });
+                               }], monitor_data.monitor.get_counterparty_node_id()));
                        },
                        MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
                                if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
@@ -474,22 +478,42 @@ where C::Target: chain::Filter,
        /// This wrapper avoids having to update some of our tests for now as they assume the direct
        /// chain::Watch API wherein we mark a monitor fully-updated by just calling
        /// channel_monitor_updated once with the highest ID.
-       #[cfg(any(test, feature = "fuzztarget"))]
+       #[cfg(any(test, fuzzing))]
        pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
-               self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
+               let monitors = self.monitors.read().unwrap();
+               let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id());
+               self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
                        funding_txo,
                        monitor_update_id,
-               });
+               }], counterparty_node_id));
        }
 
-       #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
+       #[cfg(any(test, fuzzing, feature = "_test_utils"))]
        pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
-               use util::events::EventsProvider;
+               use crate::util::events::EventsProvider;
                let events = core::cell::RefCell::new(Vec::new());
-               let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone());
+               let event_handler = |event: events::Event| events.borrow_mut().push(event);
                self.process_pending_events(&event_handler);
                events.into_inner()
        }
+
+       /// Processes any events asynchronously in the order they were generated since the last call
+       /// using the given event handler.
+       ///
+       /// See the trait-level documentation of [`EventsProvider`] for requirements.
+       ///
+       /// [`EventsProvider`]: crate::util::events::EventsProvider
+       pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+               &self, handler: H
+       ) {
+               let mut pending_events = Vec::new();
+               for monitor_state in self.monitors.read().unwrap().values() {
+                       pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
+               }
+               for event in pending_events {
+                       handler(event).await;
+               }
+       }
 }
 
 impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
@@ -501,9 +525,7 @@ where
        L::Target: Logger,
        P::Target: Persist<ChannelSigner>,
 {
-       fn block_connected(&self, block: &Block, height: u32) {
-               let header = &block.header;
-               let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
+       fn filtered_block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
                log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
                self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
                        monitor.block_connected(
@@ -557,7 +579,7 @@ where
                });
        }
 
-       fn get_relevant_txids(&self) -> Vec<Txid> {
+       fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
                let mut txids = Vec::new();
                let monitor_states = self.monitors.read().unwrap();
                for monitor_state in monitor_states.values() {
@@ -584,27 +606,31 @@ where C::Target: chain::Filter,
        ///
        /// Note that we persist the given `ChannelMonitor` while holding the `ChainMonitor`
        /// monitors lock.
-       fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr> {
+       fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> ChannelMonitorUpdateStatus {
                let mut monitors = self.monitors.write().unwrap();
                let entry = match monitors.entry(funding_outpoint) {
                        hash_map::Entry::Occupied(_) => {
                                log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
-                               return Err(ChannelMonitorUpdateErr::PermanentFailure)},
+                               return ChannelMonitorUpdateStatus::PermanentFailure
+                       },
                        hash_map::Entry::Vacant(e) => e,
                };
                log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
                let update_id = MonitorUpdateId::from_new_monitor(&monitor);
                let mut pending_monitor_updates = Vec::new();
                let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id);
-               if persist_res.is_err() {
-                       log_error!(self.logger, "Failed to persist new ChannelMonitor for channel {}: {:?}", log_funding_info!(monitor), persist_res);
-               } else {
-                       log_trace!(self.logger, "Finished persisting new ChannelMonitor for channel {}", log_funding_info!(monitor));
-               }
-               if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) {
-                       return persist_res;
-               } else if persist_res.is_err() {
-                       pending_monitor_updates.push(update_id);
+               match persist_res {
+                       ChannelMonitorUpdateStatus::InProgress => {
+                               log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
+                               pending_monitor_updates.push(update_id);
+                       },
+                       ChannelMonitorUpdateStatus::PermanentFailure => {
+                               log_error!(self.logger, "Persistence of new ChannelMonitor for channel {} failed", log_funding_info!(monitor));
+                               return persist_res;
+                       },
+                       ChannelMonitorUpdateStatus::Completed => {
+                               log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
+                       }
                }
                if let Some(ref chain_source) = self.chain_source {
                        monitor.load_outputs_to_watch(chain_source);
@@ -620,7 +646,7 @@ where C::Target: chain::Filter,
 
        /// Note that we persist the given `ChannelMonitor` update while holding the
        /// `ChainMonitor` monitors lock.
-       fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> {
+       fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
                // Update the monitor that watches the channel referred to by the given outpoint.
                let monitors = self.monitors.read().unwrap();
                match monitors.get(&funding_txo) {
@@ -630,15 +656,15 @@ where C::Target: chain::Filter,
                                // We should never ever trigger this from within ChannelManager. Technically a
                                // user could use this object with some proxying in between which makes this
                                // possible, but in tests and fuzzing, this should be a panic.
-                               #[cfg(any(test, feature = "fuzztarget"))]
+                               #[cfg(any(test, fuzzing))]
                                panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
-                               #[cfg(not(any(test, feature = "fuzztarget")))]
-                               Err(ChannelMonitorUpdateErr::PermanentFailure)
+                               #[cfg(not(any(test, fuzzing)))]
+                               ChannelMonitorUpdateStatus::PermanentFailure
                        },
                        Some(monitor_state) => {
                                let monitor = &monitor_state.monitor;
                                log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
-                               let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
+                               let update_res = monitor.update_monitor(&update, &self.broadcaster, &*self.fee_estimator, &self.logger);
                                if update_res.is_err() {
                                        log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor));
                                }
@@ -647,20 +673,23 @@ where C::Target: chain::Filter,
                                let update_id = MonitorUpdateId::from_monitor_update(&update);
                                let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
                                let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id);
-                               if let Err(e) = persist_res {
-                                       if e == ChannelMonitorUpdateErr::TemporaryFailure {
+                               match persist_res {
+                                       ChannelMonitorUpdateStatus::InProgress => {
                                                pending_monitor_updates.push(update_id);
-                                       } else {
+                                               log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
+                                       },
+                                       ChannelMonitorUpdateStatus::PermanentFailure => {
                                                monitor_state.channel_perm_failed.store(true, Ordering::Release);
-                                       }
-                                       log_error!(self.logger, "Failed to persist ChannelMonitor update for channel {}: {:?}", log_funding_info!(monitor), e);
-                               } else {
-                                       log_trace!(self.logger, "Finished persisting ChannelMonitor update for channel {}", log_funding_info!(monitor));
+                                               log_error!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} failed", log_funding_info!(monitor));
+                                       },
+                                       ChannelMonitorUpdateStatus::Completed => {
+                                               log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
+                                       },
                                }
                                if update_res.is_err() {
-                                       Err(ChannelMonitorUpdateErr::PermanentFailure)
+                                       ChannelMonitorUpdateStatus::PermanentFailure
                                } else if monitor_state.channel_perm_failed.load(Ordering::Acquire) {
-                                       Err(ChannelMonitorUpdateErr::PermanentFailure)
+                                       ChannelMonitorUpdateStatus::PermanentFailure
                                } else {
                                        persist_res
                                }
@@ -668,7 +697,7 @@ where C::Target: chain::Filter,
                }
        }
 
-       fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
+       fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
                let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
                for monitor_state in self.monitors.read().unwrap().values() {
                        let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
@@ -694,7 +723,12 @@ where C::Target: chain::Filter,
                                        log_error!(self.logger, "   To avoid funds-loss, we are allowing monitor updates to be released.");
                                        log_error!(self.logger, "   This may cause duplicate payment events to be generated.");
                                }
-                               pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
+                               let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
+                               if monitor_events.len() > 0 {
+                                       let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
+                                       let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
+                                       pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id));
+                               }
                        }
                }
                pending_monitor_events
@@ -708,81 +742,61 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
              L::Target: Logger,
              P::Target: Persist<ChannelSigner>,
 {
+       #[cfg(not(anchors))]
+       /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
+       ///
+       /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
+       /// order to handle these events.
+       ///
+       /// [`SpendableOutputs`]: events::Event::SpendableOutputs
+       fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
+               let mut pending_events = Vec::new();
+               for monitor_state in self.monitors.read().unwrap().values() {
+                       pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
+               }
+               for event in pending_events {
+                       handler.handle_event(event);
+               }
+       }
+       #[cfg(anchors)]
        /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
        ///
+       /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
+       /// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
+       /// within each channel. As the confirmation of a commitment transaction may be critical to the
+       /// safety of funds, this method must be invoked frequently, ideally once for every chain tip
+       /// update (block connected or disconnected).
+       ///
        /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
        /// order to handle these events.
        ///
        /// [`SpendableOutputs`]: events::Event::SpendableOutputs
+       /// [`BumpTransaction`]: events::Event::BumpTransaction
        fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
                let mut pending_events = Vec::new();
                for monitor_state in self.monitors.read().unwrap().values() {
                        pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
                }
-               for event in pending_events.drain(..) {
-                       handler.handle_event(&event);
+               for event in pending_events {
+                       handler.handle_event(event);
                }
        }
 }
 
 #[cfg(test)]
 mod tests {
-       use bitcoin::BlockHeader;
-       use ::{check_added_monitors, check_closed_broadcast, check_closed_event};
-       use ::{expect_payment_sent, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
-       use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
-       use chain::{ChannelMonitorUpdateErr, Confirm, Watch};
-       use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
-       use ln::channelmanager::PaymentSendFailure;
-       use ln::features::InitFeatures;
-       use ln::functional_test_utils::*;
-       use ln::msgs::ChannelMessageHandler;
-       use util::errors::APIError;
-       use util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
-       use util::test_utils::{OnRegisterOutput, TxOutReference};
-
-       /// Tests that in-block dependent transactions are processed by `block_connected` when not
-       /// included in `txdata` but returned by [`chain::Filter::register_output`]. For instance,
-       /// a (non-anchor) commitment transaction's HTLC output may be spent in the same block as the
-       /// commitment transaction itself. An Electrum client may filter the commitment transaction but
-       /// needs to return the HTLC transaction so it can be processed.
-       #[test]
-       fn connect_block_checks_dependent_transactions() {
-               let chanmon_cfgs = create_chanmon_cfgs(2);
-               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
-               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
-               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-               let channel = create_announced_chan_between_nodes(
-                       &nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
-
-               // Send a payment, saving nodes[0]'s revoked commitment and HTLC-Timeout transactions.
-               let (commitment_tx, htlc_tx) = {
-                       let payment_preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 5_000_000).0;
-                       let mut txn = get_local_commitment_txn!(nodes[0], channel.2);
-                       claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage);
-
-                       assert_eq!(txn.len(), 2);
-                       (txn.remove(0), txn.remove(0))
-               };
-
-               // Set expectations on nodes[1]'s chain source to return dependent transactions.
-               let htlc_output = TxOutReference(commitment_tx.clone(), 0);
-               let to_local_output = TxOutReference(commitment_tx.clone(), 1);
-               let htlc_timeout_output = TxOutReference(htlc_tx.clone(), 0);
-               nodes[1].chain_source
-                       .expect(OnRegisterOutput { with: htlc_output, returns: Some((1, htlc_tx)) })
-                       .expect(OnRegisterOutput { with: to_local_output, returns: None })
-                       .expect(OnRegisterOutput { with: htlc_timeout_output, returns: None });
-
-               // Notify nodes[1] that nodes[0]'s revoked commitment transaction was mined. The chain
-               // source should return the dependent HTLC transaction when the HTLC output is registered.
-               mine_transaction(&nodes[1], &commitment_tx);
-
-               // Clean up so uninteresting assertions don't fail.
-               check_added_monitors!(nodes[1], 1);
-               nodes[1].node.get_and_clear_pending_msg_events();
-               nodes[1].node.get_and_clear_pending_events();
-       }
+       use bitcoin::{BlockHeader, TxMerkleNode};
+       use bitcoin::hashes::Hash;
+       use crate::{check_added_monitors, check_closed_broadcast, check_closed_event};
+       use crate::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
+       use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
+       use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch};
+       use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
+       use crate::ln::channelmanager::{self, PaymentSendFailure, PaymentId};
+       use crate::ln::functional_test_utils::*;
+       use crate::ln::msgs::ChannelMessageHandler;
+       use crate::util::errors::APIError;
+       use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
 
        #[test]
        fn test_async_ooo_offchain_updates() {
@@ -793,21 +807,23 @@ mod tests {
                let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
                let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
                let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-               create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
+               create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
 
                // Route two payments to be claimed at the same time.
-               let payment_preimage_1 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
-               let payment_preimage_2 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
+               let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
+               let (payment_preimage_2, payment_hash_2, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
 
                chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
-               chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
+               chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
 
                nodes[1].node.claim_funds(payment_preimage_1);
                check_added_monitors!(nodes[1], 1);
+               expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
                nodes[1].node.claim_funds(payment_preimage_2);
                check_added_monitors!(nodes[1], 1);
+               expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
 
-               chanmon_cfgs[1].persister.set_update_ret(Ok(()));
+               chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
 
                let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
                assert_eq!(persistences.len(), 1);
@@ -817,7 +833,22 @@ mod tests {
                // Note that updates is a HashMap so the ordering here is actually random. This shouldn't
                // fail either way but if it fails intermittently it's depending on the ordering of updates.
                let mut update_iter = updates.iter();
-               nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
+               let next_update = update_iter.next().unwrap().clone();
+               // Should contain next_update when pending updates listed.
+               #[cfg(not(c_bindings))]
+               assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo)
+                       .unwrap().contains(&next_update));
+               #[cfg(c_bindings)]
+               assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter()
+                       .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
+               nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, next_update.clone()).unwrap();
+               // Should not contain the previously pending next_update when pending updates listed.
+               #[cfg(not(c_bindings))]
+               assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo)
+                       .unwrap().contains(&next_update));
+               #[cfg(c_bindings)]
+               assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter()
+                       .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
                assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
                assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
                nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
@@ -868,15 +899,16 @@ mod tests {
                let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
                let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
                let channel = create_announced_chan_between_nodes(
-                       &nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
+                       &nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
 
                // Get a route for later and rebalance the channel somewhat
                send_payment(&nodes[0], &[&nodes[1]], 10_000_000);
                let (route, second_payment_hash, _, second_payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000);
 
                // First route a payment that we will claim on chain and give the recipient the preimage.
-               let payment_preimage = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
+               let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
                nodes[1].node.claim_funds(payment_preimage);
+               expect_payment_claimed!(nodes[1], payment_hash, 1_000_000);
                nodes[1].node.get_and_clear_pending_msg_events();
                check_added_monitors!(nodes[1], 1);
                let remote_txn = get_local_commitment_txn!(nodes[1], channel.2);
@@ -884,14 +916,14 @@ mod tests {
 
                // Temp-fail the block connection which will hold the channel-closed event
                chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
-               chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
+               chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
 
                // Connect B's commitment transaction, but only to the ChainMonitor/ChannelMonitor. The
                // channel is now closed, but the ChannelManager doesn't know that yet.
                let new_header = BlockHeader {
                        version: 2, time: 0, bits: 0, nonce: 0,
                        prev_blockhash: nodes[0].best_block_info().0,
-                       merkle_root: Default::default() };
+                       merkle_root: TxMerkleNode::all_zeros() };
                nodes[0].chain_monitor.chain_monitor.transactions_confirmed(&new_header,
                        &[(0, &remote_txn[0]), (1, &remote_txn[1])], nodes[0].best_block_info().1 + 1);
                assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
@@ -900,8 +932,8 @@ mod tests {
 
                // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass
                // the update through to the ChannelMonitor which will refuse it (as the channel is closed).
-               chanmon_cfgs[0].persister.set_update_ret(Ok(()));
-               unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret)),
+               chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
+               unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret), PaymentId(second_payment_hash.0)),
                        true, APIError::ChannelUnavailable { ref err },
                        assert!(err.contains("ChannelMonitor storage failure")));
                check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update
@@ -917,12 +949,13 @@ mod tests {
                        let latest_header = BlockHeader {
                                version: 2, time: 0, bits: 0, nonce: 0,
                                prev_blockhash: nodes[0].best_block_info().0,
-                               merkle_root: Default::default() };
+                               merkle_root: TxMerkleNode::all_zeros() };
                        nodes[0].chain_monitor.chain_monitor.best_block_updated(&latest_header, nodes[0].best_block_info().1 + LATENCY_GRACE_PERIOD_BLOCKS);
                } else {
-                       for (funding_outpoint, update_ids) in chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().iter() {
+                       let persistences = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clone();
+                       for (funding_outpoint, update_ids) in persistences {
                                for update_id in update_ids {
-                                       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(*funding_outpoint, *update_id).unwrap();
+                                       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_outpoint, update_id).unwrap();
                                }
                        }
                }
@@ -935,4 +968,27 @@ mod tests {
                do_chainsync_pauses_events(false);
                do_chainsync_pauses_events(true);
        }
+
+       #[test]
+       fn update_during_chainsync_fails_channel() {
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+               create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
+
+               chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
+               chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
+
+               connect_blocks(&nodes[0], 1);
+               // Before processing events, the ChannelManager will still think the Channel is open and
+               // there won't be any ChannelMonitorUpdates
+               assert_eq!(nodes[0].node.list_channels().len(), 1);
+               check_added_monitors!(nodes[0], 0);
+               // ... however once we get events once, the channel will close, creating a channel-closed
+               // ChannelMonitorUpdate.
+               check_closed_broadcast!(nodes[0], true);
+               check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() });
+               check_added_monitors!(nodes[0], 1);
+       }
 }