X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=8ce59597d846b960d5bb11d843ace21b20d0ee53;hb=refs%2Fheads%2F2023-06-fix-fuzz-dep;hp=f5aa47d18d76c722d221c90ca6782a5cf1cf51dc;hpb=fc9a4c22d195a75ad5942eed271757f285452214;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index f5aa47d1..8ce59597 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -24,19 +24,20 @@ //! servicing [`ChannelMonitor`] updates from the client. use bitcoin::blockdata::block::BlockHeader; -use bitcoin::hash_types::Txid; +use bitcoin::hash_types::{Txid, BlockHash}; use crate::chain; use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; use crate::chain::transaction::{OutPoint, TransactionData}; -use crate::chain::keysinterface::Sign; +use crate::sign::WriteableEcdsaChannelSigner; +use crate::events; +use crate::events::{Event, EventHandler}; use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::Logger; use crate::util::errors::APIError; -use crate::util::events; -use crate::util::events::EventHandler; +use crate::util::wakers::{Future, Notifier}; use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; @@ -68,7 +69,7 @@ impl MonitorUpdateId { pub(crate) fn from_monitor_update(update: &ChannelMonitorUpdate) -> Self { Self { contents: UpdateOrigin::OffChain(update.update_id) } } - pub(crate) fn from_new_monitor(monitor: &ChannelMonitor) -> Self { + pub(crate) fn from_new_monitor(monitor: &ChannelMonitor) -> Self { Self { contents: UpdateOrigin::OffChain(monitor.get_latest_update_id()) } } } @@ -93,7 +94,7 @@ impl MonitorUpdateId { /// [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be /// closed without broadcasting the latest state. See /// [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details. -pub trait Persist { +pub trait Persist { /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup. /// @@ -144,10 +145,10 @@ pub trait Persist { /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; } -struct MonitorHolder { +struct MonitorHolder { monitor: ChannelMonitor, /// The full set of pending monitor updates for this Channel. /// @@ -182,7 +183,7 @@ struct MonitorHolder { last_chain_persist_height: AtomicUsize, } -impl MonitorHolder { +impl MonitorHolder { fn has_pending_offchain_updates(&self, pending_monitor_updates_lock: &MutexGuard>) -> bool { pending_monitor_updates_lock.iter().any(|update_id| if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false }) @@ -197,12 +198,12 @@ impl MonitorHolder { /// /// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is /// released. -pub struct LockedChannelMonitor<'a, ChannelSigner: Sign> { +pub struct LockedChannelMonitor<'a, ChannelSigner: WriteableEcdsaChannelSigner> { lock: RwLockReadGuard<'a, HashMap>>, funding_txo: OutPoint, } -impl Deref for LockedChannelMonitor<'_, ChannelSigner> { +impl Deref for LockedChannelMonitor<'_, ChannelSigner> { type Target = ChannelMonitor; fn deref(&self) -> &ChannelMonitor { &self.lock.get(&self.funding_txo).expect("Checked at construction").monitor @@ -216,9 +217,16 @@ impl Deref for LockedChannelMonitor<'_, ChannelSigner> { /// or used independently to monitor channels remotely. See the [module-level documentation] for /// details. /// +/// Note that `ChainMonitor` should regularly trigger rebroadcasts/fee bumps of pending claims from +/// a force-closed channel. This is crucial in preventing certain classes of pinning attacks, +/// detecting substantial mempool feerate changes between blocks, and ensuring reliability if +/// broadcasting fails. We recommend invoking this every 30 seconds, or lower if running in an +/// environment with spotty connections, like on mobile. +/// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [module-level documentation]: crate::chain::chainmonitor -pub struct ChainMonitor +/// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims +pub struct ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, @@ -240,9 +248,11 @@ pub struct ChainMonitor, Option)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, + + event_notifier: Notifier, } -impl ChainMonitor +impl ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, @@ -294,12 +304,13 @@ where C::Target: chain::Filter, } log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); - match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) { + match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { ChannelMonitorUpdateStatus::Completed => log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), ChannelMonitorUpdateStatus::PermanentFailure => { monitor_state.channel_perm_failed.store(true, Ordering::Release); self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); + self.event_notifier.notify(); }, ChannelMonitorUpdateStatus::InProgress => { log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); @@ -345,6 +356,7 @@ where C::Target: chain::Filter, persister, pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), + event_notifier: Notifier::new(), } } @@ -395,6 +407,23 @@ where C::Target: chain::Filter, self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() } + #[cfg(not(c_bindings))] + /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). + pub fn list_pending_monitor_updates(&self) -> HashMap> { + self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) + }).collect() + } + + #[cfg(c_bindings)] + /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). + pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec)> { + self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) + }).collect() + } + + #[cfg(test)] pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor { self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor @@ -455,6 +484,7 @@ where C::Target: chain::Filter, } }, } + self.event_notifier.notify(); Ok(()) } @@ -469,19 +499,64 @@ where C::Target: chain::Filter, funding_txo, monitor_update_id, }], counterparty_node_id)); + self.event_notifier.notify(); } - #[cfg(any(test, fuzzing, feature = "_test_utils"))] + #[cfg(any(test, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { - use crate::util::events::EventsProvider; + use crate::events::EventsProvider; let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone()); + let event_handler = |event: events::Event| events.borrow_mut().push(event); self.process_pending_events(&event_handler); events.into_inner() } + + /// Processes any events asynchronously in the order they were generated since the last call + /// using the given event handler. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + /// + /// [`EventsProvider`]: crate::events::EventsProvider + pub async fn process_pending_events_async Future>( + &self, handler: H + ) { + let mut pending_events = Vec::new(); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); + } + for event in pending_events { + handler(event).await; + } + } + + /// Gets a [`Future`] that completes when an event is available either via + /// [`chain::Watch::release_pending_monitor_events`] or + /// [`EventsProvider::process_pending_events`]. + /// + /// Note that callbacks registered on the [`Future`] MUST NOT call back into this + /// [`ChainMonitor`] and should instead register actions to be taken later. + /// + /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events + pub fn get_update_future(&self) -> Future { + self.event_notifier.get_future() + } + + /// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is + /// crucial in preventing certain classes of pinning attacks, detecting substantial mempool + /// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend + /// invoking this every 30 seconds, or lower if running in an environment with spotty + /// connections, like on mobile. + pub fn rebroadcast_pending_claims(&self) { + let monitors = self.monitors.read().unwrap(); + for (_, monitor_holder) in &*monitors { + monitor_holder.monitor.rebroadcast_pending_claims( + &*self.broadcaster, &*self.fee_estimator, &*self.logger + ) + } + } } -impl +impl chain::Listen for ChainMonitor where C::Target: chain::Filter, @@ -508,7 +583,7 @@ where } } -impl +impl chain::Confirm for ChainMonitor where C::Target: chain::Filter, @@ -544,7 +619,7 @@ where }); } - fn get_relevant_txids(&self) -> Vec { + fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { let mut txids = Vec::new(); let monitor_states = self.monitors.read().unwrap(); for monitor_state in monitor_states.values() { @@ -557,7 +632,7 @@ where } } -impl +impl chain::Watch for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, @@ -611,7 +686,7 @@ where C::Target: chain::Filter, /// Note that we persist the given `ChannelMonitor` update while holding the /// `ChainMonitor` monitors lock. - fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { + fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { // Update the monitor that watches the channel referred to by the given outpoint. let monitors = self.monitors.read().unwrap(); match monitors.get(&funding_txo) { @@ -629,15 +704,15 @@ where C::Target: chain::Filter, Some(monitor_state) => { let monitor = &monitor_state.monitor; log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor)); - let update_res = monitor.update_monitor(&update, &self.broadcaster, &*self.fee_estimator, &self.logger); + let update_res = monitor.update_monitor(update, &self.broadcaster, &*self.fee_estimator, &self.logger); if update_res.is_err() { log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor)); } // Even if updating the monitor returns an error, the monitor's state will // still be changed. So, persist the updated monitor despite the error. - let update_id = MonitorUpdateId::from_monitor_update(&update); + let update_id = MonitorUpdateId::from_monitor_update(update); let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id); + let persist_res = self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id); match persist_res { ChannelMonitorUpdateStatus::InProgress => { pending_monitor_updates.push(update_id); @@ -700,7 +775,7 @@ where C::Target: chain::Filter, } } -impl events::EventsProvider for ChainMonitor +impl events::EventsProvider for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, @@ -719,8 +794,8 @@ impl even for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } #[cfg(anchors)] @@ -742,26 +817,24 @@ impl even for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } } #[cfg(test)] mod tests { - use bitcoin::{BlockHeader, TxMerkleNode}; - use bitcoin::hashes::Hash; use crate::{check_added_monitors, check_closed_broadcast, check_closed_event}; use crate::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg}; use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err}; use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch}; use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS; - use crate::ln::channelmanager::{self, PaymentSendFailure}; + use crate::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider}; + use crate::ln::channelmanager::{PaymentSendFailure, PaymentId, RecipientOnionFields}; use crate::ln::functional_test_utils::*; use crate::ln::msgs::ChannelMessageHandler; use crate::util::errors::APIError; - use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider}; #[test] fn test_async_ooo_offchain_updates() { @@ -772,7 +845,7 @@ mod tests { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features()); + create_announced_chan_between_nodes(&nodes, 0, 1); // Route two payments to be claimed at the same time. let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); @@ -780,15 +853,12 @@ mod tests { chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear(); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); nodes[1].node.claim_funds(payment_preimage_2); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); - - chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone(); assert_eq!(persistences.len(), 1); @@ -798,11 +868,42 @@ mod tests { // Note that updates is a HashMap so the ordering here is actually random. This shouldn't // fail either way but if it fails intermittently it's depending on the ordering of updates. let mut update_iter = updates.iter(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); + let next_update = update_iter.next().unwrap().clone(); + // Should contain next_update when pending updates listed. + #[cfg(not(c_bindings))] + assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo) + .unwrap().contains(&next_update)); + #[cfg(c_bindings)] + assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter() + .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, next_update.clone()).unwrap(); + // Should not contain the previously pending next_update when pending updates listed. + #[cfg(not(c_bindings))] + assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo) + .unwrap().contains(&next_update)); + #[cfg(c_bindings)] + assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter() + .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty()); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); + let claim_events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(claim_events.len(), 2); + match claim_events[0] { + Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => { + assert_eq!(payment_hash_1, *payment_hash); + }, + _ => panic!("Unexpected event"), + } + match claim_events[1] { + Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => { + assert_eq!(payment_hash_2, *payment_hash); + }, + _ => panic!("Unexpected event"), + } + // Now manually walk the commitment signed dance - because we claimed two payments // back-to-back it doesn't fit into the neat walk commitment_signed_dance does. @@ -848,8 +949,7 @@ mod tests { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let channel = create_announced_chan_between_nodes( - &nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features()); + let channel = create_announced_chan_between_nodes(&nodes, 0, 1); // Get a route for later and rebalance the channel somewhat send_payment(&nodes[0], &[&nodes[1]], 10_000_000); @@ -870,10 +970,7 @@ mod tests { // Connect B's commitment transaction, but only to the ChainMonitor/ChannelMonitor. The // channel is now closed, but the ChannelManager doesn't know that yet. - let new_header = BlockHeader { - version: 2, time: 0, bits: 0, nonce: 0, - prev_blockhash: nodes[0].best_block_info().0, - merkle_root: TxMerkleNode::all_zeros() }; + let new_header = create_dummy_header(nodes[0].best_block_info().0, 0); nodes[0].chain_monitor.chain_monitor.transactions_confirmed(&new_header, &[(0, &remote_txn[0]), (1, &remote_txn[1])], nodes[0].best_block_info().1 + 1); assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty()); @@ -883,8 +980,9 @@ mod tests { // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass // the update through to the ChannelMonitor which will refuse it (as the channel is closed). chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret)), - true, APIError::ChannelUnavailable { ref err }, + unwrap_send_err!(nodes[0].node.send_payment_with_route(&route, second_payment_hash, + RecipientOnionFields::secret_only(second_payment_secret), PaymentId(second_payment_hash.0) + ), true, APIError::ChannelUnavailable { ref err }, assert!(err.contains("ChannelMonitor storage failure"))); check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update check_closed_broadcast!(nodes[0], true); @@ -896,10 +994,7 @@ mod tests { if block_timeout { // After three blocks, pending MontiorEvents should be released either way. - let latest_header = BlockHeader { - version: 2, time: 0, bits: 0, nonce: 0, - prev_blockhash: nodes[0].best_block_info().0, - merkle_root: TxMerkleNode::all_zeros() }; + let latest_header = create_dummy_header(nodes[0].best_block_info().0, 0); nodes[0].chain_monitor.chain_monitor.best_block_updated(&latest_header, nodes[0].best_block_info().1 + LATENCY_GRACE_PERIOD_BLOCKS); } else { let persistences = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clone(); @@ -925,7 +1020,7 @@ mod tests { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features()); + create_announced_chan_between_nodes(&nodes, 0, 1); chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear(); chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);