X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=6051f00b90a8327026ff89b2a6a758fd5c6fadc4;hb=543674f1dac007586be8c94ae73af1ca82b5e095;hp=261e5593b5b4600ad661142a4579308b68903316;hpb=6775b957bc0e738afff46eb819c69f45410f1843;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 261e5593..6051f00b 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -42,6 +42,7 @@ use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; +use core::iter::FromIterator; use core::ops::Deref; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; @@ -94,6 +95,17 @@ impl MonitorUpdateId { /// [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be /// closed without broadcasting the latest state. See /// [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details. +/// +/// Third-party watchtowers may be built as a part of an implementation of this trait, with the +/// advantage that you can control whether to resume channel operation depending on if an update +/// has been persisted to a watchtower. For this, you may find the following methods useful: +/// [`ChannelMonitor::initial_counterparty_commitment_tx`], +/// [`ChannelMonitor::counterparty_commitment_txs_from_update`], +/// [`ChannelMonitor::sign_to_local_justice_tx`], [`TrustedCommitmentTransaction::revokeable_output_index`], +/// [`TrustedCommitmentTransaction::build_to_local_justice_tx`]. +/// +/// [`TrustedCommitmentTransaction::revokeable_output_index`]: crate::ln::chan_utils::TrustedCommitmentTransaction::revokeable_output_index +/// [`TrustedCommitmentTransaction::build_to_local_justice_tx`]: crate::ln::chan_utils::TrustedCommitmentTransaction::build_to_local_justice_tx pub trait Persist { /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup. @@ -274,7 +286,22 @@ where C::Target: chain::Filter, where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + let funding_outpoints: HashSet = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned()); + for funding_outpoint in funding_outpoints.iter() { + let monitor_lock = self.monitors.read().unwrap(); + if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { + self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state); + } + } + + // do some followup cleanup if any funding outpoints were added in between iterations let monitor_states = self.monitors.write().unwrap(); + for (funding_outpoint, monitor_state) in monitor_states.iter() { + if !funding_outpoints.contains(funding_outpoint) { + self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state); + } + } + if let Some(height) = best_height { // If the best block height is being updated, update highest_chain_height under the // monitors write lock. @@ -284,55 +311,55 @@ where C::Target: chain::Filter, self.highest_chain_height.store(new_height, Ordering::Release); } } + } - for (funding_outpoint, monitor_state) in monitor_states.iter() { - let monitor = &monitor_state.monitor; - let mut txn_outputs; - { - txn_outputs = process(monitor, txdata); - let update_id = MonitorUpdateId { - contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), - }; - let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - if let Some(height) = best_height { - if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { - // If there are not ChainSync persists awaiting completion, go ahead and - // set last_chain_persist_height here - we wouldn't want the first - // InProgress to always immediately be considered "overly delayed". - monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); - } + fn update_monitor_with_chain_data(&self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder) where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { + let monitor = &monitor_state.monitor; + let mut txn_outputs; + { + txn_outputs = process(monitor, txdata); + let update_id = MonitorUpdateId { + contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), + }; + let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + if let Some(height) = best_height { + if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { + // If there are not ChainSync persists awaiting completion, go ahead and + // set last_chain_persist_height here - we wouldn't want the first + // InProgress to always immediately be considered "overly delayed". + monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); } + } - log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); - match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { - ChannelMonitorUpdateStatus::Completed => - log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), - ChannelMonitorUpdateStatus::PermanentFailure => { - monitor_state.channel_perm_failed.store(true, Ordering::Release); - self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); - self.event_notifier.notify(); - }, - ChannelMonitorUpdateStatus::InProgress => { - log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); - pending_monitor_updates.push(update_id); - }, + log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { + ChannelMonitorUpdateStatus::Completed => + log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), + ChannelMonitorUpdateStatus::PermanentFailure => { + monitor_state.channel_perm_failed.store(true, Ordering::Release); + self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); + self.event_notifier.notify(); + } + ChannelMonitorUpdateStatus::InProgress => { + log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); + pending_monitor_updates.push(update_id); } } + } - // Register any new outputs with the chain source for filtering, storing any dependent - // transactions from within the block that previously had not been included in txdata. - if let Some(ref chain_source) = self.chain_source { - let block_hash = header.block_hash(); - for (txid, mut outputs) in txn_outputs.drain(..) { - for (idx, output) in outputs.drain(..) { - // Register any new outputs with the chain source for filtering - let output = WatchedOutput { - block_hash: Some(block_hash), - outpoint: OutPoint { txid, index: idx as u16 }, - script_pubkey: output.script_pubkey, - }; - chain_source.register_output(output) - } + // Register any new outputs with the chain source for filtering, storing any dependent + // transactions from within the block that previously had not been included in txdata. + if let Some(ref chain_source) = self.chain_source { + let block_hash = header.block_hash(); + for (txid, mut outputs) in txn_outputs.drain(..) { + for (idx, output) in outputs.drain(..) { + // Register any new outputs with the chain source for filtering + let output = WatchedOutput { + block_hash: Some(block_hash), + outpoint: OutPoint { txid, index: idx as u16 }, + script_pubkey: output.script_pubkey, + }; + chain_source.register_output(output) } } } @@ -364,8 +391,7 @@ where C::Target: chain::Filter, /// claims which are awaiting confirmation. /// /// Includes the balances from each [`ChannelMonitor`] *except* those included in - /// `ignored_channels`, allowing you to filter out balances from channels which are still open - /// (and whose balance should likely be pulled from the [`ChannelDetails`]). + /// `ignored_channels`. /// /// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for /// inclusion in the return value. @@ -502,7 +528,7 @@ where C::Target: chain::Filter, self.event_notifier.notify(); } - #[cfg(any(test, fuzzing, feature = "_test_utils"))] + #[cfg(any(test, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { use crate::events::EventsProvider; let events = core::cell::RefCell::new(Vec::new()); @@ -520,12 +546,13 @@ where C::Target: chain::Filter, pub async fn process_pending_events_async Future>( &self, handler: H ) { - let mut pending_events = Vec::new(); - for monitor_state in self.monitors.read().unwrap().values() { - pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); - } - for event in pending_events { - handler(event).await; + // Sadly we can't hold the monitors read lock through an async call. Thus we have to do a + // crazy dance to process a monitor's events then only remove them once we've done so. + let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::>(); + for funding_txo in mons_to_process { + let mut ev; + super::channelmonitor::process_events_body!( + self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await); } } @@ -782,30 +809,13 @@ impl, { - #[cfg(not(anchors))] - /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. - /// - /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in - /// order to handle these events. - /// - /// [`SpendableOutputs`]: events::Event::SpendableOutputs - fn process_pending_events(&self, handler: H) where H::Target: EventHandler { - let mut pending_events = Vec::new(); - for monitor_state in self.monitors.read().unwrap().values() { - pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); - } - for event in pending_events { - handler.handle_event(event); - } - } - #[cfg(anchors)] /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] /// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain /// within each channel. As the confirmation of a commitment transaction may be critical to the - /// safety of funds, this method must be invoked frequently, ideally once for every chain tip - /// update (block connected or disconnected). + /// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an + /// environment with spotty connections, like on mobile. /// /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in /// order to handle these events. @@ -813,12 +823,8 @@ impl(&self, handler: H) where H::Target: EventHandler { - let mut pending_events = Vec::new(); for monitor_state in self.monitors.read().unwrap().values() { - pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); - } - for event in pending_events { - handler.handle_event(event); + monitor_state.monitor.process_pending_events(&handler); } } } @@ -826,7 +832,7 @@ impl