X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=b71f10f58d455cc0fccff487884dd65f12c8d5cc;hb=7c9463668a4f71663746e57bdb09ee6b91797d5a;hp=39fa3a237a62f02085e904349dc9555daee2c232;hpb=e21a500668179c7084e2df5cb75019810eb03cbe;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 39fa3a23..b71f10f5 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -31,11 +31,12 @@ use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS}; use crate::chain::transaction::{OutPoint, TransactionData}; +use crate::ln::ChannelId; use crate::sign::ecdsa::WriteableEcdsaChannelSigner; use crate::events; use crate::events::{Event, EventHandler}; use crate::util::atomic_counter::AtomicCounter; -use crate::util::logger::Logger; +use crate::util::logger::{Logger, WithContext}; use crate::util::errors::APIError; use crate::util::wakers::{Future, Notifier}; use crate::ln::channelmanager::ChannelDetails; @@ -158,7 +159,7 @@ pub trait Persist { /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + fn persist_new_channel(&self, channel_funding_outpoint: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given /// update. @@ -193,7 +194,7 @@ pub trait Persist { /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; } struct MonitorHolder { @@ -287,7 +288,7 @@ pub struct ChainMonitor, Option)>>, + pending_monitor_events: Mutex, Option)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, @@ -402,7 +403,8 @@ where C::Target: chain::Filter, outpoint: OutPoint { txid, index: idx as u16 }, script_pubkey: output.script_pubkey, }; - chain_source.register_output(output) + log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint); + chain_source.register_output(output); } } } @@ -470,12 +472,15 @@ where C::Target: chain::Filter, } } - /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored. + /// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored. /// /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always /// monitoring for on-chain state resolutions. - pub fn list_monitors(&self) -> Vec { - self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() + pub fn list_monitors(&self) -> Vec<(OutPoint, ChannelId)> { + self.monitors.read().unwrap().iter().map(|(outpoint, monitor_holder)| { + let channel_id = monitor_holder.monitor.channel_id(); + (*outpoint, channel_id) + }).collect() } #[cfg(not(c_bindings))] @@ -541,8 +546,9 @@ where C::Target: chain::Filter, // Completed event. return Ok(()); } - self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed { - funding_txo, + let channel_id = monitor_data.monitor.channel_id(); + self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed { + funding_txo, channel_id, monitor_update_id: monitor_data.monitor.get_latest_update_id(), }], monitor_data.monitor.get_counterparty_node_id())); }, @@ -564,9 +570,14 @@ where C::Target: chain::Filter, #[cfg(any(test, fuzzing))] pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) { let monitors = self.monitors.read().unwrap(); - let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id()); - self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed { + let (counterparty_node_id, channel_id) = if let Some(m) = monitors.get(&funding_txo) { + (m.monitor.get_counterparty_node_id(), m.monitor.channel_id()) + } else { + (None, ChannelId::v1_from_funding_outpoint(funding_txo)) + }; + self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed { funding_txo, + channel_id, monitor_update_id, }], counterparty_node_id)); self.event_notifier.notify(); @@ -620,9 +631,8 @@ where C::Target: chain::Filter, pub fn rebroadcast_pending_claims(&self) { let monitors = self.monitors.read().unwrap(); for (_, monitor_holder) in &*monitors { - let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor); monitor_holder.monitor.rebroadcast_pending_claims( - &*self.broadcaster, &*self.fee_estimator, &logger + &*self.broadcaster, &*self.fee_estimator, &self.logger ) } } @@ -640,9 +650,8 @@ where fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) { log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height); self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| { - let logger = WithChannelMonitor::from(&self.logger, &monitor); monitor.block_connected( - header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &logger) + header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger) }); } @@ -650,9 +659,8 @@ where let monitor_states = self.monitors.read().unwrap(); log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height); for monitor_state in monitor_states.values() { - let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor); monitor_state.monitor.block_disconnected( - header, height, &*self.broadcaster, &*self.fee_estimator, &logger); + header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger); } } } @@ -669,9 +677,8 @@ where fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) { log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash()); self.process_chain_data(header, None, txdata, |monitor, txdata| { - let logger = WithChannelMonitor::from(&self.logger, &monitor); monitor.transactions_confirmed( - header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &logger) + header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger) }); } @@ -679,20 +686,19 @@ where log_debug!(self.logger, "Transaction {} reorganized out of chain", txid); let monitor_states = self.monitors.read().unwrap(); for monitor_state in monitor_states.values() { - let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor); - monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &logger); + monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &self.logger); } } fn best_block_updated(&self, header: &Header, height: u32) { log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height); self.process_chain_data(header, Some(height), &[], |monitor, txdata| { - let logger = WithChannelMonitor::from(&self.logger, &monitor); // While in practice there shouldn't be any recursive calls when given empty txdata, // it's still possible if a chain::Filter implementation returns a transaction. debug_assert!(txdata.is_empty()); monitor.best_block_updated( - header, height, &*self.broadcaster, &*self.fee_estimator, &logger) + header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger + ) }); } @@ -746,7 +752,7 @@ where C::Target: chain::Filter, }, } if let Some(ref chain_source) = self.chain_source { - monitor.load_outputs_to_watch(chain_source); + monitor.load_outputs_to_watch(chain_source , &self.logger); } entry.insert(MonitorHolder { monitor, @@ -757,12 +763,15 @@ where C::Target: chain::Filter, } fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { + // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those + // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. + let channel_id = update.channel_id.unwrap_or(ChannelId::v1_from_funding_outpoint(funding_txo)); // Update the monitor that watches the channel referred to by the given outpoint. - let monitors_lock = self.monitors.read().unwrap(); - let monitors = monitors_lock.deref(); + let monitors = self.monitors.read().unwrap(); match monitors.get(&funding_txo) { None => { - log_error!(self.logger, "Failed to update channel monitor: no such monitor registered"); + let logger = WithContext::from(&self.logger, update.counterparty_node_id, Some(channel_id)); + log_error!(logger, "Failed to update channel monitor: no such monitor registered"); // We should never ever trigger this from within ChannelManager. Technically a // user could use this object with some proxying in between which makes this @@ -802,6 +811,7 @@ where C::Target: chain::Filter, ChannelMonitorUpdateStatus::UnrecoverableError => { // Take the monitors lock for writing so that we poison it and any future // operations going forward fail immediately. + core::mem::drop(pending_monitor_updates); core::mem::drop(monitors); let _poison = self.monitors.write().unwrap(); let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; @@ -818,7 +828,7 @@ where C::Target: chain::Filter, } } - fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec, Option)> { + fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec, Option)> { let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor); @@ -832,8 +842,9 @@ where C::Target: chain::Filter, let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); if monitor_events.len() > 0 { let monitor_outpoint = monitor_state.monitor.get_funding_txo().0; + let monitor_channel_id = monitor_state.monitor.channel_id(); let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id(); - pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id)); + pending_monitor_events.push((monitor_outpoint, monitor_channel_id, monitor_events, counterparty_node_id)); } } }