Add `ChainMonitor::archive_fully_resolved_monitor_channels`
[rust-lightning] / lightning / src / chain / chainmonitor.rs
index 0b7e13f24b0218235cf690bb8652322d7f7db520..efea0cf03f7d9a250675fde8bc6d6ee653edc9c1 100644 (file)
@@ -29,20 +29,20 @@ use bitcoin::hash_types::{Txid, BlockHash};
 use crate::chain;
 use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
 use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
+use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS};
 use crate::chain::transaction::{OutPoint, TransactionData};
+use crate::ln::ChannelId;
 use crate::sign::ecdsa::WriteableEcdsaChannelSigner;
 use crate::events;
 use crate::events::{Event, EventHandler};
 use crate::util::atomic_counter::AtomicCounter;
-use crate::util::logger::Logger;
+use crate::util::logger::{Logger, WithContext};
 use crate::util::errors::APIError;
 use crate::util::wakers::{Future, Notifier};
 use crate::ln::channelmanager::ChannelDetails;
 
 use crate::prelude::*;
 use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
-use core::iter::FromIterator;
 use core::ops::Deref;
 use core::sync::atomic::{AtomicUsize, Ordering};
 use bitcoin::secp256k1::PublicKey;
@@ -158,7 +158,7 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
        ///
        /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
        /// [`Writeable::write`]: crate::util::ser::Writeable::write
-       fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
+       fn persist_new_channel(&self, channel_funding_outpoint: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
 
        /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
        /// update.
@@ -193,7 +193,12 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
        /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
        ///
        /// [`Writeable::write`]: crate::util::ser::Writeable::write
-       fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
+       fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
+       /// Prevents the channel monitor from being loaded on startup.
+       ///
+       /// Archiving the data in a backup location (rather than deleting it fully) is useful for
+       /// hedging against data loss in case of unexpected failure.
+       fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
 }
 
 struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
@@ -287,7 +292,7 @@ pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T:
        persister: P,
        /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
        /// from the user and not from a [`ChannelMonitor`].
-       pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
+       pending_monitor_events: Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)>>,
        /// The best block height seen, used as a proxy for the passage of time.
        highest_chain_height: AtomicUsize,
 
@@ -317,7 +322,7 @@ where C::Target: chain::Filter,
                FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
        {
                let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
-               let funding_outpoints: HashSet<OutPoint> = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned());
+               let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
                for funding_outpoint in funding_outpoints.iter() {
                        let monitor_lock = self.monitors.read().unwrap();
                        if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
@@ -359,6 +364,7 @@ where C::Target: chain::Filter,
                process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>
        ) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
                let monitor = &monitor_state.monitor;
+               let logger = WithChannelMonitor::from(&self.logger, &monitor);
                let mut txn_outputs;
                {
                        txn_outputs = process(monitor, txdata);
@@ -375,12 +381,12 @@ where C::Target: chain::Filter,
                                }
                        }
 
-                       log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
+                       log_trace!(logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
                        match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
                                ChannelMonitorUpdateStatus::Completed =>
-                                       log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
+                                       log_trace!(logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
                                ChannelMonitorUpdateStatus::InProgress => {
-                                       log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
+                                       log_debug!(logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
                                        pending_monitor_updates.push(update_id);
                                },
                                ChannelMonitorUpdateStatus::UnrecoverableError => {
@@ -401,7 +407,8 @@ where C::Target: chain::Filter,
                                                outpoint: OutPoint { txid, index: idx as u16 },
                                                script_pubkey: output.script_pubkey,
                                        };
-                                       chain_source.register_output(output)
+                                       log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint);
+                                       chain_source.register_output(output);
                                }
                        }
                }
@@ -417,7 +424,7 @@ where C::Target: chain::Filter,
        /// transactions relevant to the watched channels.
        pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
                Self {
-                       monitors: RwLock::new(HashMap::new()),
+                       monitors: RwLock::new(new_hash_map()),
                        sync_persistence_id: AtomicCounter::new(),
                        chain_source,
                        broadcaster,
@@ -469,20 +476,23 @@ where C::Target: chain::Filter,
                }
        }
 
-       /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored.
+       /// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored.
        ///
        /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
        /// monitoring for on-chain state resolutions.
-       pub fn list_monitors(&self) -> Vec<OutPoint> {
-               self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
+       pub fn list_monitors(&self) -> Vec<(OutPoint, ChannelId)> {
+               self.monitors.read().unwrap().iter().map(|(outpoint, monitor_holder)| {
+                       let channel_id = monitor_holder.monitor.channel_id();
+                       (*outpoint, channel_id)
+               }).collect()
        }
 
        #[cfg(not(c_bindings))]
        /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
        pub fn list_pending_monitor_updates(&self) -> HashMap<OutPoint, Vec<MonitorUpdateId>> {
-               self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
+               hash_map_from_iter(self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
                        (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
-               }).collect()
+               }))
        }
 
        #[cfg(c_bindings)]
@@ -540,8 +550,9 @@ where C::Target: chain::Filter,
                                        // Completed event.
                                        return Ok(());
                                }
-                               self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
-                                       funding_txo,
+                               let channel_id = monitor_data.monitor.channel_id();
+                               self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed {
+                                       funding_txo, channel_id,
                                        monitor_update_id: monitor_data.monitor.get_latest_update_id(),
                                }], monitor_data.monitor.get_counterparty_node_id()));
                        },
@@ -563,9 +574,14 @@ where C::Target: chain::Filter,
        #[cfg(any(test, fuzzing))]
        pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
                let monitors = self.monitors.read().unwrap();
-               let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id());
-               self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
+               let (counterparty_node_id, channel_id) = if let Some(m) = monitors.get(&funding_txo) {
+                       (m.monitor.get_counterparty_node_id(), m.monitor.channel_id())
+               } else {
+                       (None, ChannelId::v1_from_funding_outpoint(funding_txo))
+               };
+               self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed {
                        funding_txo,
+                       channel_id,
                        monitor_update_id,
                }], counterparty_node_id));
                self.event_notifier.notify();
@@ -620,10 +636,66 @@ where C::Target: chain::Filter,
                let monitors = self.monitors.read().unwrap();
                for (_, monitor_holder) in &*monitors {
                        monitor_holder.monitor.rebroadcast_pending_claims(
-                               &*self.broadcaster, &*self.fee_estimator, &*self.logger
+                               &*self.broadcaster, &*self.fee_estimator, &self.logger
                        )
                }
        }
+
+       /// Triggers rebroadcasts of pending claims from force-closed channels after a transaction
+       /// signature generation failure.
+       ///
+       /// `monitor_opt` can be used as a filter to only trigger them for a specific channel monitor.
+       pub fn signer_unblocked(&self, monitor_opt: Option<OutPoint>) {
+               let monitors = self.monitors.read().unwrap();
+               if let Some(funding_txo) = monitor_opt {
+                       if let Some(monitor_holder) = monitors.get(&funding_txo) {
+                               monitor_holder.monitor.signer_unblocked(
+                                       &*self.broadcaster, &*self.fee_estimator, &self.logger
+                               )
+                       }
+               } else {
+                       for (_, monitor_holder) in &*monitors {
+                               monitor_holder.monitor.signer_unblocked(
+                                       &*self.broadcaster, &*self.fee_estimator, &self.logger
+                               )
+                       }
+               }
+       }
+
+       /// Archives fully resolved channel monitors by calling [`Persist::archive_persisted_channel`].
+       ///
+       /// This is useful for pruning fully resolved monitors from the monitor set and primary
+       /// storage so they are not kept in memory and reloaded on restart.
+       ///
+       /// Should be called occasionally (once every handful of blocks or on startup).
+       ///
+       /// Depending on the implementation of [`Persist::archive_persisted_channel`] the monitor
+       /// data could be moved to an archive location or removed entirely.
+       pub fn archive_fully_resolved_channel_monitors(&self) {
+               let mut have_monitors_to_prune = false;
+               for (_, monitor_holder) in self.monitors.read().unwrap().iter() {
+                       let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor);
+                       if monitor_holder.monitor.is_fully_resolved(&logger) {
+                               have_monitors_to_prune = true;
+                       }
+               }
+               if have_monitors_to_prune {
+                       let mut monitors = self.monitors.write().unwrap();
+                       monitors.retain(|funding_txo, monitor_holder| {
+                               let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor);
+                               if monitor_holder.monitor.is_fully_resolved(&logger) {
+                                       log_info!(logger,
+                                               "Archiving fully resolved ChannelMonitor for funding txo {}",
+                                               funding_txo
+                                       );
+                                       self.persister.archive_persisted_channel(*funding_txo);
+                                       false
+                               } else {
+                                       true
+                               }
+                       });
+               }
+       }
 }
 
 impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
@@ -639,7 +711,7 @@ where
                log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
                self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
                        monitor.block_connected(
-                               header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+                               header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
                });
        }
 
@@ -648,7 +720,7 @@ where
                log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
                for monitor_state in monitor_states.values() {
                        monitor_state.monitor.block_disconnected(
-                               header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+                               header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger);
                }
        }
 }
@@ -666,7 +738,7 @@ where
                log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
                self.process_chain_data(header, None, txdata, |monitor, txdata| {
                        monitor.transactions_confirmed(
-                               header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+                               header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
                });
        }
 
@@ -674,7 +746,7 @@ where
                log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
                let monitor_states = self.monitors.read().unwrap();
                for monitor_state in monitor_states.values() {
-                       monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+                       monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &self.logger);
                }
        }
 
@@ -685,7 +757,8 @@ where
                        // it's still possible if a chain::Filter implementation returns a transaction.
                        debug_assert!(txdata.is_empty());
                        monitor.best_block_updated(
-                               header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+                               header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger
+                       )
                });
        }
 
@@ -711,34 +784,35 @@ where C::Target: chain::Filter,
            P::Target: Persist<ChannelSigner>,
 {
        fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<ChannelMonitorUpdateStatus, ()> {
+               let logger = WithChannelMonitor::from(&self.logger, &monitor);
                let mut monitors = self.monitors.write().unwrap();
                let entry = match monitors.entry(funding_outpoint) {
                        hash_map::Entry::Occupied(_) => {
-                               log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
+                               log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
                                return Err(());
                        },
                        hash_map::Entry::Vacant(e) => e,
                };
-               log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
+               log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
                let update_id = MonitorUpdateId::from_new_monitor(&monitor);
                let mut pending_monitor_updates = Vec::new();
                let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id);
                match persist_res {
                        ChannelMonitorUpdateStatus::InProgress => {
-                               log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
+                               log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
                                pending_monitor_updates.push(update_id);
                        },
                        ChannelMonitorUpdateStatus::Completed => {
-                               log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
+                               log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
                        },
                        ChannelMonitorUpdateStatus::UnrecoverableError => {
                                let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
-                               log_error!(self.logger, "{}", err_str);
+                               log_error!(logger, "{}", err_str);
                                panic!("{}", err_str);
                        },
                }
                if let Some(ref chain_source) = self.chain_source {
-                       monitor.load_outputs_to_watch(chain_source);
+                       monitor.load_outputs_to_watch(chain_source , &self.logger);
                }
                entry.insert(MonitorHolder {
                        monitor,
@@ -749,11 +823,15 @@ where C::Target: chain::Filter,
        }
 
        fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
+               // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those
+               // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`.
+               let channel_id = update.channel_id.unwrap_or(ChannelId::v1_from_funding_outpoint(funding_txo));
                // Update the monitor that watches the channel referred to by the given outpoint.
                let monitors = self.monitors.read().unwrap();
-               let ret = match monitors.get(&funding_txo) {
+               match monitors.get(&funding_txo) {
                        None => {
-                               log_error!(self.logger, "Failed to update channel monitor: no such monitor registered");
+                               let logger = WithContext::from(&self.logger, update.counterparty_node_id, Some(channel_id));
+                               log_error!(logger, "Failed to update channel monitor: no such monitor registered");
 
                                // We should never ever trigger this from within ChannelManager. Technically a
                                // user could use this object with some proxying in between which makes this
@@ -765,7 +843,8 @@ where C::Target: chain::Filter,
                        },
                        Some(monitor_state) => {
                                let monitor = &monitor_state.monitor;
-                               log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
+                               let logger = WithChannelMonitor::from(&self.logger, &monitor);
+                               log_trace!(logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
                                let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger);
 
                                let update_id = MonitorUpdateId::from_monitor_update(update);
@@ -776,7 +855,7 @@ where C::Target: chain::Filter,
                                        // We don't want to persist a `monitor_update` which results in a failure to apply later
                                        // while reading `channel_monitor` with updates from storage. Instead, we should persist
                                        // the entire `channel_monitor` here.
-                                       log_warn!(self.logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
+                                       log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
                                        self.persister.update_persisted_channel(funding_txo, None, monitor, update_id)
                                } else {
                                        self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id)
@@ -784,12 +863,21 @@ where C::Target: chain::Filter,
                                match persist_res {
                                        ChannelMonitorUpdateStatus::InProgress => {
                                                pending_monitor_updates.push(update_id);
-                                               log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
+                                               log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
                                        },
                                        ChannelMonitorUpdateStatus::Completed => {
-                                               log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
+                                               log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
+                                       },
+                                       ChannelMonitorUpdateStatus::UnrecoverableError => {
+                                               // Take the monitors lock for writing so that we poison it and any future
+                                               // operations going forward fail immediately.
+                                               core::mem::drop(pending_monitor_updates);
+                                               core::mem::drop(monitors);
+                                               let _poison = self.monitors.write().unwrap();
+                                               let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
+                                               log_error!(logger, "{}", err_str);
+                                               panic!("{}", err_str);
                                        },
-                                       ChannelMonitorUpdateStatus::UnrecoverableError => { /* we'll panic in a moment */ },
                                }
                                if update_res.is_err() {
                                        ChannelMonitorUpdateStatus::InProgress
@@ -797,34 +885,26 @@ where C::Target: chain::Filter,
                                        persist_res
                                }
                        }
-               };
-               if let ChannelMonitorUpdateStatus::UnrecoverableError = ret {
-                       // Take the monitors lock for writing so that we poison it and any future
-                       // operations going forward fail immediately.
-                       core::mem::drop(monitors);
-                       let _poison = self.monitors.write().unwrap();
-                       let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
-                       log_error!(self.logger, "{}", err_str);
-                       panic!("{}", err_str);
                }
-               ret
        }
 
-       fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
+       fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
                let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
                for monitor_state in self.monitors.read().unwrap().values() {
+                       let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
                        let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
                        if !is_pending_monitor_update || monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize <= self.highest_chain_height.load(Ordering::Acquire) {
                                if is_pending_monitor_update {
-                                       log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
-                                       log_error!(self.logger, "   To avoid funds-loss, we are allowing monitor updates to be released.");
-                                       log_error!(self.logger, "   This may cause duplicate payment events to be generated.");
+                                       log_error!(logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
+                                       log_error!(logger, "   To avoid funds-loss, we are allowing monitor updates to be released.");
+                                       log_error!(logger, "   This may cause duplicate payment events to be generated.");
                                }
                                let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
                                if monitor_events.len() > 0 {
                                        let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
+                                       let monitor_channel_id = monitor_state.monitor.channel_id();
                                        let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
-                                       pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id));
+                                       pending_monitor_events.push((monitor_outpoint, monitor_channel_id, monitor_events, counterparty_node_id));
                                }
                        }
                }