X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=b71f10f58d455cc0fccff487884dd65f12c8d5cc;hb=7c9463668a4f71663746e57bdb09ee6b91797d5a;hp=6d003df720f6680bdfde4fd6766a064edb8f05c2;hpb=955e81086f7ba0b738eb473dcaac5ae561b0b153;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 6d003df7..b71f10f5 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -23,19 +23,20 @@ //! events. The remote server would make use of [`ChainMonitor`] for block processing and for //! servicing [`ChannelMonitor`] updates from the client. -use bitcoin::blockdata::block::BlockHeader; +use bitcoin::blockdata::block::Header; use bitcoin::hash_types::{Txid, BlockHash}; use crate::chain; use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; +use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS}; use crate::chain::transaction::{OutPoint, TransactionData}; -use crate::sign::WriteableEcdsaChannelSigner; +use crate::ln::ChannelId; +use crate::sign::ecdsa::WriteableEcdsaChannelSigner; use crate::events; use crate::events::{Event, EventHandler}; use crate::util::atomic_counter::AtomicCounter; -use crate::util::logger::Logger; +use crate::util::logger::{Logger, WithContext}; use crate::util::errors::APIError; use crate::util::wakers::{Future, Notifier}; use crate::ln::channelmanager::ChannelDetails; @@ -47,23 +48,35 @@ use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -/// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents -/// entirely opaque. -enum UpdateOrigin { - /// An update that was generated by the `ChannelManager` (via our `chain::Watch` - /// implementation). This corresponds to an actual [`ChannelMonitorUpdate::update_id`] field - /// and [`ChannelMonitor::get_latest_update_id`]. - OffChain(u64), - /// An update that was generated during blockchain processing. The ID here is specific to the - /// generating [`ChainMonitor`] and does *not* correspond to any on-disk IDs. - ChainSync(u64), +mod update_origin { + #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] + /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents + /// entirely opaque. + pub(crate) enum UpdateOrigin { + /// An update that was generated by the `ChannelManager` (via our [`crate::chain::Watch`] + /// implementation). This corresponds to an actual [ChannelMonitorUpdate::update_id] field + /// and [ChannelMonitor::get_latest_update_id]. + /// + /// [ChannelMonitor::get_latest_update_id]: crate::chain::channelmonitor::ChannelMonitor::get_latest_update_id + /// [ChannelMonitorUpdate::update_id]: crate::chain::channelmonitor::ChannelMonitorUpdate::update_id + OffChain(u64), + /// An update that was generated during blockchain processing. The ID here is specific to the + /// generating [ChannelMonitor] and does *not* correspond to any on-disk IDs. + /// + /// [ChannelMonitor]: crate::chain::channelmonitor::ChannelMonitor + ChainSync(u64), + } } +#[cfg(any(feature = "_test_utils", test))] +pub(crate) use update_origin::UpdateOrigin; +#[cfg(not(any(feature = "_test_utils", test)))] +use update_origin::UpdateOrigin; + /// An opaque identifier describing a specific [`Persist`] method call. #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub struct MonitorUpdateId { - contents: UpdateOrigin, + pub(crate) contents: UpdateOrigin, } impl MonitorUpdateId { @@ -146,7 +159,7 @@ pub trait Persist { /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + fn persist_new_channel(&self, channel_funding_outpoint: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given /// update. @@ -155,8 +168,8 @@ pub trait Persist { /// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more /// details. /// - /// During blockchain synchronization operations, this may be called with no - /// [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted. + /// During blockchain synchronization operations, and in some rare cases, this may be called with + /// no [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted. /// Note that after the full [`ChannelMonitor`] is persisted any previous /// [`ChannelMonitorUpdate`]s which were persisted should be discarded - they can no longer be /// applied to the persisted [`ChannelMonitor`] as they were already applied. @@ -181,7 +194,7 @@ pub trait Persist { /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; } struct MonitorHolder { @@ -275,7 +288,7 @@ pub struct ChainMonitor, Option)>>, + pending_monitor_events: Mutex, Option)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, @@ -300,7 +313,7 @@ where C::Target: chain::Filter, /// updated `txdata`. /// /// Calls which represent a new blockchain tip height should set `best_height`. - fn process_chain_data(&self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, process: FN) + fn process_chain_data(&self, header: &Header, best_height: Option, txdata: &TransactionData, process: FN) where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { @@ -312,7 +325,6 @@ where C::Target: chain::Filter, if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() { // Take the monitors lock for writing so that we poison it and any future // operations going forward fail immediately. - core::mem::drop(monitor_state); core::mem::drop(monitor_lock); let _poison = self.monitors.write().unwrap(); log_error!(self.logger, "{}", err_str); @@ -344,10 +356,11 @@ where C::Target: chain::Filter, } fn update_monitor_with_chain_data( - &self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, + &self, header: &Header, best_height: Option, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder ) -> Result<(), ()> where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let monitor = &monitor_state.monitor; + let logger = WithChannelMonitor::from(&self.logger, &monitor); let mut txn_outputs; { txn_outputs = process(monitor, txdata); @@ -364,12 +377,12 @@ where C::Target: chain::Filter, } } - log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + log_trace!(logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { ChannelMonitorUpdateStatus::Completed => - log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), + log_trace!(logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), ChannelMonitorUpdateStatus::InProgress => { - log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); + log_debug!(logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); }, ChannelMonitorUpdateStatus::UnrecoverableError => { @@ -390,7 +403,8 @@ where C::Target: chain::Filter, outpoint: OutPoint { txid, index: idx as u16 }, script_pubkey: output.script_pubkey, }; - chain_source.register_output(output) + log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint); + chain_source.register_output(output); } } } @@ -423,7 +437,8 @@ where C::Target: chain::Filter, /// claims which are awaiting confirmation. /// /// Includes the balances from each [`ChannelMonitor`] *except* those included in - /// `ignored_channels`. + /// `ignored_channels`, allowing you to filter out balances from channels which are still open + /// (and whose balance should likely be pulled from the [`ChannelDetails`]). /// /// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for /// inclusion in the return value. @@ -457,12 +472,15 @@ where C::Target: chain::Filter, } } - /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored. + /// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored. /// /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always /// monitoring for on-chain state resolutions. - pub fn list_monitors(&self) -> Vec { - self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() + pub fn list_monitors(&self) -> Vec<(OutPoint, ChannelId)> { + self.monitors.read().unwrap().iter().map(|(outpoint, monitor_holder)| { + let channel_id = monitor_holder.monitor.channel_id(); + (*outpoint, channel_id) + }).collect() } #[cfg(not(c_bindings))] @@ -528,8 +546,9 @@ where C::Target: chain::Filter, // Completed event. return Ok(()); } - self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed { - funding_txo, + let channel_id = monitor_data.monitor.channel_id(); + self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed { + funding_txo, channel_id, monitor_update_id: monitor_data.monitor.get_latest_update_id(), }], monitor_data.monitor.get_counterparty_node_id())); }, @@ -551,9 +570,14 @@ where C::Target: chain::Filter, #[cfg(any(test, fuzzing))] pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) { let monitors = self.monitors.read().unwrap(); - let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id()); - self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed { + let (counterparty_node_id, channel_id) = if let Some(m) = monitors.get(&funding_txo) { + (m.monitor.get_counterparty_node_id(), m.monitor.channel_id()) + } else { + (None, ChannelId::v1_from_funding_outpoint(funding_txo)) + }; + self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed { funding_txo, + channel_id, monitor_update_id, }], counterparty_node_id)); self.event_notifier.notify(); @@ -608,7 +632,7 @@ where C::Target: chain::Filter, let monitors = self.monitors.read().unwrap(); for (_, monitor_holder) in &*monitors { monitor_holder.monitor.rebroadcast_pending_claims( - &*self.broadcaster, &*self.fee_estimator, &*self.logger + &*self.broadcaster, &*self.fee_estimator, &self.logger ) } } @@ -623,20 +647,20 @@ where L::Target: Logger, P::Target: Persist, { - fn filtered_block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { + fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) { log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height); self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| { monitor.block_connected( - header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger) }); } - fn block_disconnected(&self, header: &BlockHeader, height: u32) { + fn block_disconnected(&self, header: &Header, height: u32) { let monitor_states = self.monitors.read().unwrap(); log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height); for monitor_state in monitor_states.values() { monitor_state.monitor.block_disconnected( - header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger); } } } @@ -650,11 +674,11 @@ where L::Target: Logger, P::Target: Persist, { - fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { + fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) { log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash()); self.process_chain_data(header, None, txdata, |monitor, txdata| { monitor.transactions_confirmed( - header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger) }); } @@ -662,30 +686,31 @@ where log_debug!(self.logger, "Transaction {} reorganized out of chain", txid); let monitor_states = self.monitors.read().unwrap(); for monitor_state in monitor_states.values() { - monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &self.logger); } } - fn best_block_updated(&self, header: &BlockHeader, height: u32) { + fn best_block_updated(&self, header: &Header, height: u32) { log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height); self.process_chain_data(header, Some(height), &[], |monitor, txdata| { // While in practice there shouldn't be any recursive calls when given empty txdata, // it's still possible if a chain::Filter implementation returns a transaction. debug_assert!(txdata.is_empty()); monitor.best_block_updated( - header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger + ) }); } - fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { + fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { let mut txids = Vec::new(); let monitor_states = self.monitors.read().unwrap(); for monitor_state in monitor_states.values() { txids.append(&mut monitor_state.monitor.get_relevant_txids()); } - txids.sort_unstable(); - txids.dedup(); + txids.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1))); + txids.dedup_by_key(|(txid, _, _)| *txid); txids } } @@ -699,34 +724,35 @@ where C::Target: chain::Filter, P::Target: Persist, { fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result { + let logger = WithChannelMonitor::from(&self.logger, &monitor); let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(funding_outpoint) { hash_map::Entry::Occupied(_) => { - log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); + log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); return Err(()); }, hash_map::Entry::Vacant(e) => e, }; - log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor)); + log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor)); let update_id = MonitorUpdateId::from_new_monitor(&monitor); let mut pending_monitor_updates = Vec::new(); let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id); match persist_res { ChannelMonitorUpdateStatus::InProgress => { - log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); + log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); }, ChannelMonitorUpdateStatus::Completed => { - log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor)); + log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor)); }, ChannelMonitorUpdateStatus::UnrecoverableError => { let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(self.logger, "{}", err_str); + log_error!(logger, "{}", err_str); panic!("{}", err_str); }, } if let Some(ref chain_source) = self.chain_source { - monitor.load_outputs_to_watch(chain_source); + monitor.load_outputs_to_watch(chain_source , &self.logger); } entry.insert(MonitorHolder { monitor, @@ -737,11 +763,15 @@ where C::Target: chain::Filter, } fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { + // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those + // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. + let channel_id = update.channel_id.unwrap_or(ChannelId::v1_from_funding_outpoint(funding_txo)); // Update the monitor that watches the channel referred to by the given outpoint. let monitors = self.monitors.read().unwrap(); - let ret = match monitors.get(&funding_txo) { + match monitors.get(&funding_txo) { None => { - log_error!(self.logger, "Failed to update channel monitor: no such monitor registered"); + let logger = WithContext::from(&self.logger, update.counterparty_node_id, Some(channel_id)); + log_error!(logger, "Failed to update channel monitor: no such monitor registered"); // We should never ever trigger this from within ChannelManager. Technically a // user could use this object with some proxying in between which makes this @@ -753,25 +783,41 @@ where C::Target: chain::Filter, }, Some(monitor_state) => { let monitor = &monitor_state.monitor; - log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor)); - let update_res = monitor.update_monitor(update, &self.broadcaster, &*self.fee_estimator, &self.logger); - if update_res.is_err() { - log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor)); - } - // Even if updating the monitor returns an error, the monitor's state will - // still be changed. So, persist the updated monitor despite the error. + let logger = WithChannelMonitor::from(&self.logger, &monitor); + log_trace!(logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor)); + let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger); + let update_id = MonitorUpdateId::from_monitor_update(update); let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - let persist_res = self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id); + let persist_res = if update_res.is_err() { + // Even if updating the monitor returns an error, the monitor's state will + // still be changed. Therefore, we should persist the updated monitor despite the error. + // We don't want to persist a `monitor_update` which results in a failure to apply later + // while reading `channel_monitor` with updates from storage. Instead, we should persist + // the entire `channel_monitor` here. + log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor)); + self.persister.update_persisted_channel(funding_txo, None, monitor, update_id) + } else { + self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id) + }; match persist_res { ChannelMonitorUpdateStatus::InProgress => { pending_monitor_updates.push(update_id); - log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor)); + log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor)); }, ChannelMonitorUpdateStatus::Completed => { - log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor)); + log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor)); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(pending_monitor_updates); + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); }, - ChannelMonitorUpdateStatus::UnrecoverableError => { /* we'll panic in a moment */ }, } if update_res.is_err() { ChannelMonitorUpdateStatus::InProgress @@ -779,39 +825,26 @@ where C::Target: chain::Filter, persist_res } } - }; - if let ChannelMonitorUpdateStatus::UnrecoverableError = ret { - // Take the monitors lock for writing so that we poison it and any future - // operations going forward fail immediately. - core::mem::drop(monitors); - let _poison = self.monitors.write().unwrap(); - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(self.logger, "{}", err_str); - panic!("{}", err_str); } - ret } - fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec, Option)> { + fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec, Option)> { let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { + let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor); let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap()); - if is_pending_monitor_update && - monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize - > self.highest_chain_height.load(Ordering::Acquire) - { - log_debug!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!"); - } else { + if !is_pending_monitor_update || monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize <= self.highest_chain_height.load(Ordering::Acquire) { if is_pending_monitor_update { - log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS); - log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released."); - log_error!(self.logger, " This may cause duplicate payment events to be generated."); + log_error!(logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS); + log_error!(logger, " To avoid funds-loss, we are allowing monitor updates to be released."); + log_error!(logger, " This may cause duplicate payment events to be generated."); } let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); if monitor_events.len() > 0 { let monitor_outpoint = monitor_state.monitor.get_funding_txo().0; + let monitor_channel_id = monitor_state.monitor.channel_id(); let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id(); - pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id)); + pending_monitor_events.push((monitor_outpoint, monitor_channel_id, monitor_events, counterparty_node_id)); } } }