X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=6e824d1e111e5b288277f008547a30afb7bcf3e8;hb=refs%2Fheads%2F2024-04-log-in-flights;hp=261c0471ca4089dfaa1d21ca0e8c8f810f90c7a5;hpb=cff2061335ff973100ea77b388da1628d4fd5982;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 261c0471..6e824d1e 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -23,19 +23,19 @@ //! events. The remote server would make use of [`ChainMonitor`] for block processing and for //! servicing [`ChannelMonitor`] updates from the client. -use bitcoin::blockdata::block::BlockHeader; +use bitcoin::blockdata::block::Header; use bitcoin::hash_types::{Txid, BlockHash}; use crate::chain; use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; +use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS}; use crate::chain::transaction::{OutPoint, TransactionData}; -use crate::sign::WriteableEcdsaChannelSigner; +use crate::sign::ecdsa::WriteableEcdsaChannelSigner; use crate::events; use crate::events::{Event, EventHandler}; use crate::util::atomic_counter::AtomicCounter; -use crate::util::logger::Logger; +use crate::util::logger::{Logger, WithContext}; use crate::util::errors::APIError; use crate::util::wakers::{Future, Notifier}; use crate::ln::channelmanager::ChannelDetails; @@ -312,7 +312,7 @@ where C::Target: chain::Filter, /// updated `txdata`. /// /// Calls which represent a new blockchain tip height should set `best_height`. - fn process_chain_data(&self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, process: FN) + fn process_chain_data(&self, header: &Header, best_height: Option, txdata: &TransactionData, process: FN) where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { @@ -324,7 +324,6 @@ where C::Target: chain::Filter, if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() { // Take the monitors lock for writing so that we poison it and any future // operations going forward fail immediately. - core::mem::drop(monitor_state); core::mem::drop(monitor_lock); let _poison = self.monitors.write().unwrap(); log_error!(self.logger, "{}", err_str); @@ -356,15 +355,17 @@ where C::Target: chain::Filter, } fn update_monitor_with_chain_data( - &self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, + &self, header: &Header, best_height: Option, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder ) -> Result<(), ()> where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let monitor = &monitor_state.monitor; + let logger = WithChannelMonitor::from(&self.logger, &monitor); let mut txn_outputs; { txn_outputs = process(monitor, txdata); + let chain_sync_update_id = self.sync_persistence_id.get_increment(); let update_id = MonitorUpdateId { - contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), + contents: UpdateOrigin::ChainSync(chain_sync_update_id), }; let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); if let Some(height) = best_height { @@ -376,12 +377,18 @@ where C::Target: chain::Filter, } } - log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + log_trace!(logger, "Syncing Channel Monitor for channel {} for block-data update_id {}", + log_funding_info!(monitor), + chain_sync_update_id + ); match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { ChannelMonitorUpdateStatus::Completed => - log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), + log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data update_id {}", + log_funding_info!(monitor), + chain_sync_update_id + ), ChannelMonitorUpdateStatus::InProgress => { - log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); + log_debug!(logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); }, ChannelMonitorUpdateStatus::UnrecoverableError => { @@ -402,7 +409,8 @@ where C::Target: chain::Filter, outpoint: OutPoint { txid, index: idx as u16 }, script_pubkey: output.script_pubkey, }; - chain_source.register_output(output) + log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint); + chain_source.register_output(output); } } } @@ -525,7 +533,7 @@ where C::Target: chain::Filter, pending_monitor_updates.retain(|update_id| *update_id != completed_update_id); match completed_update_id { - MonitorUpdateId { contents: UpdateOrigin::OffChain(_) } => { + MonitorUpdateId { contents: UpdateOrigin::OffChain(completed_update_id) } => { // Note that we only check for `UpdateOrigin::OffChain` failures here - if // we're being told that a `UpdateOrigin::OffChain` monitor update completed, // we only care about ensuring we don't tell the `ChannelManager` to restore @@ -536,6 +544,14 @@ where C::Target: chain::Filter, // `MonitorEvent`s from the monitor back to the `ChannelManager` until they // complete. let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates); + log_debug!(self.logger, "Completed off-chain monitor update {} for channel with funding outpoint {:?}, {}", + completed_update_id, + funding_txo, + if monitor_is_pending_updates { + "still have pending off-chain updates" + } else { + "all off-chain updates complete, returning a MonitorEvent" + }); if monitor_is_pending_updates { // If there are still monitor updates pending, we cannot yet construct a // Completed event. @@ -546,8 +562,18 @@ where C::Target: chain::Filter, monitor_update_id: monitor_data.monitor.get_latest_update_id(), }], monitor_data.monitor.get_counterparty_node_id())); }, - MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => { - if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) { + MonitorUpdateId { contents: UpdateOrigin::ChainSync(completed_update_id) } => { + let monitor_has_pending_updates = + monitor_data.has_pending_chainsync_updates(&pending_monitor_updates); + log_debug!(self.logger, "Completed chain sync monitor update {} for channel with funding outpoint {:?}, {}", + completed_update_id, + funding_txo, + if monitor_has_pending_updates { + "still have pending chain sync updates" + } else { + "all chain sync updates complete, releasing pending MonitorEvents" + }); + if !monitor_has_pending_updates { monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release); // The next time release_pending_monitor_events is called, any events for this // ChannelMonitor will be returned. @@ -621,7 +647,7 @@ where C::Target: chain::Filter, let monitors = self.monitors.read().unwrap(); for (_, monitor_holder) in &*monitors { monitor_holder.monitor.rebroadcast_pending_claims( - &*self.broadcaster, &*self.fee_estimator, &*self.logger + &*self.broadcaster, &*self.fee_estimator, &self.logger ) } } @@ -636,20 +662,20 @@ where L::Target: Logger, P::Target: Persist, { - fn filtered_block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { + fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) { log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height); self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| { monitor.block_connected( - header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger) }); } - fn block_disconnected(&self, header: &BlockHeader, height: u32) { + fn block_disconnected(&self, header: &Header, height: u32) { let monitor_states = self.monitors.read().unwrap(); log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height); for monitor_state in monitor_states.values() { monitor_state.monitor.block_disconnected( - header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger); } } } @@ -663,11 +689,11 @@ where L::Target: Logger, P::Target: Persist, { - fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { + fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) { log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash()); self.process_chain_data(header, None, txdata, |monitor, txdata| { monitor.transactions_confirmed( - header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger) }); } @@ -675,30 +701,31 @@ where log_debug!(self.logger, "Transaction {} reorganized out of chain", txid); let monitor_states = self.monitors.read().unwrap(); for monitor_state in monitor_states.values() { - monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &self.logger); } } - fn best_block_updated(&self, header: &BlockHeader, height: u32) { + fn best_block_updated(&self, header: &Header, height: u32) { log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height); self.process_chain_data(header, Some(height), &[], |monitor, txdata| { // While in practice there shouldn't be any recursive calls when given empty txdata, // it's still possible if a chain::Filter implementation returns a transaction. debug_assert!(txdata.is_empty()); monitor.best_block_updated( - header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger + ) }); } - fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { + fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { let mut txids = Vec::new(); let monitor_states = self.monitors.read().unwrap(); for monitor_state in monitor_states.values() { txids.append(&mut monitor_state.monitor.get_relevant_txids()); } - txids.sort_unstable(); - txids.dedup(); + txids.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1))); + txids.dedup_by_key(|(txid, _, _)| *txid); txids } } @@ -712,34 +739,35 @@ where C::Target: chain::Filter, P::Target: Persist, { fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result { + let logger = WithChannelMonitor::from(&self.logger, &monitor); let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(funding_outpoint) { hash_map::Entry::Occupied(_) => { - log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); + log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); return Err(()); }, hash_map::Entry::Vacant(e) => e, }; - log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor)); + log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor)); let update_id = MonitorUpdateId::from_new_monitor(&monitor); let mut pending_monitor_updates = Vec::new(); let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id); match persist_res { ChannelMonitorUpdateStatus::InProgress => { - log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); + log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); }, ChannelMonitorUpdateStatus::Completed => { - log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor)); + log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor)); }, ChannelMonitorUpdateStatus::UnrecoverableError => { let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(self.logger, "{}", err_str); + log_error!(logger, "{}", err_str); panic!("{}", err_str); }, } if let Some(ref chain_source) = self.chain_source { - monitor.load_outputs_to_watch(chain_source); + monitor.load_outputs_to_watch(chain_source , &self.logger); } entry.insert(MonitorHolder { monitor, @@ -752,9 +780,10 @@ where C::Target: chain::Filter, fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { // Update the monitor that watches the channel referred to by the given outpoint. let monitors = self.monitors.read().unwrap(); - let ret = match monitors.get(&funding_txo) { + match monitors.get(&funding_txo) { None => { - log_error!(self.logger, "Failed to update channel monitor: no such monitor registered"); + let logger = WithContext::from(&self.logger, update.counterparty_node_id, Some(funding_txo.to_channel_id())); + log_error!(logger, "Failed to update channel monitor: no such monitor registered"); // We should never ever trigger this from within ChannelManager. Technically a // user could use this object with some proxying in between which makes this @@ -766,8 +795,9 @@ where C::Target: chain::Filter, }, Some(monitor_state) => { let monitor = &monitor_state.monitor; - log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor)); - let update_res = monitor.update_monitor(update, &self.broadcaster, &*self.fee_estimator, &self.logger); + let logger = WithChannelMonitor::from(&self.logger, &monitor); + log_trace!(logger, "Updating ChannelMonitor to id {} for channel {}", update.update_id, log_funding_info!(monitor)); + let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger); let update_id = MonitorUpdateId::from_monitor_update(update); let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); @@ -777,7 +807,7 @@ where C::Target: chain::Filter, // We don't want to persist a `monitor_update` which results in a failure to apply later // while reading `channel_monitor` with updates from storage. Instead, we should persist // the entire `channel_monitor` here. - log_warn!(self.logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor)); + log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor)); self.persister.update_persisted_channel(funding_txo, None, monitor, update_id) } else { self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id) @@ -785,12 +815,29 @@ where C::Target: chain::Filter, match persist_res { ChannelMonitorUpdateStatus::InProgress => { pending_monitor_updates.push(update_id); - log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor)); + log_debug!(logger, + "Persistence of ChannelMonitorUpdate id {:?} for channel {} in progress", + update_id, + log_funding_info!(monitor) + ); }, ChannelMonitorUpdateStatus::Completed => { - log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor)); + log_debug!(logger, + "Persistence of ChannelMonitorUpdate id {:?} for channel {} completed", + update_id, + log_funding_info!(monitor) + ); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(pending_monitor_updates); + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); }, - ChannelMonitorUpdateStatus::UnrecoverableError => { /* we'll panic in a moment */ }, } if update_res.is_err() { ChannelMonitorUpdateStatus::InProgress @@ -798,33 +845,19 @@ where C::Target: chain::Filter, persist_res } } - }; - if let ChannelMonitorUpdateStatus::UnrecoverableError = ret { - // Take the monitors lock for writing so that we poison it and any future - // operations going forward fail immediately. - core::mem::drop(monitors); - let _poison = self.monitors.write().unwrap(); - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(self.logger, "{}", err_str); - panic!("{}", err_str); } - ret } fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec, Option)> { let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { + let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor); let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap()); - if is_pending_monitor_update && - monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize - > self.highest_chain_height.load(Ordering::Acquire) - { - log_debug!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!"); - } else { + if !is_pending_monitor_update || monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize <= self.highest_chain_height.load(Ordering::Acquire) { if is_pending_monitor_update { - log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS); - log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released."); - log_error!(self.logger, " This may cause duplicate payment events to be generated."); + log_error!(logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS); + log_error!(logger, " To avoid funds-loss, we are allowing monitor updates to be released."); + log_error!(logger, " This may cause duplicate payment events to be generated."); } let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); if monitor_events.len() > 0 {