X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=e7c2b0f18ec001d703eb0249964b90a8a3f1a5ed;hb=02ae5cb9a7b66d17c23d9cdc10d7cfa659ea4450;hp=430f6bbac1d2244b7d49c37d10a51153484a5c28;hpb=31b0a13158b256b3261f32bb349569d043a65b11;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 430f6bba..e7c2b0f1 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -31,12 +31,13 @@ use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; use crate::chain::transaction::{OutPoint, TransactionData}; -use crate::chain::keysinterface::Sign; +use crate::chain::keysinterface::WriteableEcdsaChannelSigner; +use crate::events; +use crate::events::{Event, EventHandler}; use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::Logger; use crate::util::errors::APIError; -use crate::util::events; -use crate::util::events::{Event, EventHandler}; +use crate::util::wakers::{Future, Notifier}; use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; @@ -68,7 +69,7 @@ impl MonitorUpdateId { pub(crate) fn from_monitor_update(update: &ChannelMonitorUpdate) -> Self { Self { contents: UpdateOrigin::OffChain(update.update_id) } } - pub(crate) fn from_new_monitor(monitor: &ChannelMonitor) -> Self { + pub(crate) fn from_new_monitor(monitor: &ChannelMonitor) -> Self { Self { contents: UpdateOrigin::OffChain(monitor.get_latest_update_id()) } } } @@ -93,7 +94,7 @@ impl MonitorUpdateId { /// [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be /// closed without broadcasting the latest state. See /// [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details. -pub trait Persist { +pub trait Persist { /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup. /// @@ -147,7 +148,7 @@ pub trait Persist { fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; } -struct MonitorHolder { +struct MonitorHolder { monitor: ChannelMonitor, /// The full set of pending monitor updates for this Channel. /// @@ -182,7 +183,7 @@ struct MonitorHolder { last_chain_persist_height: AtomicUsize, } -impl MonitorHolder { +impl MonitorHolder { fn has_pending_offchain_updates(&self, pending_monitor_updates_lock: &MutexGuard>) -> bool { pending_monitor_updates_lock.iter().any(|update_id| if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false }) @@ -197,12 +198,12 @@ impl MonitorHolder { /// /// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is /// released. -pub struct LockedChannelMonitor<'a, ChannelSigner: Sign> { +pub struct LockedChannelMonitor<'a, ChannelSigner: WriteableEcdsaChannelSigner> { lock: RwLockReadGuard<'a, HashMap>>, funding_txo: OutPoint, } -impl Deref for LockedChannelMonitor<'_, ChannelSigner> { +impl Deref for LockedChannelMonitor<'_, ChannelSigner> { type Target = ChannelMonitor; fn deref(&self) -> &ChannelMonitor { &self.lock.get(&self.funding_txo).expect("Checked at construction").monitor @@ -216,9 +217,16 @@ impl Deref for LockedChannelMonitor<'_, ChannelSigner> { /// or used independently to monitor channels remotely. See the [module-level documentation] for /// details. /// +/// Note that `ChainMonitor` should regularly trigger rebroadcasts/fee bumps of pending claims from +/// a force-closed channel. This is crucial in preventing certain classes of pinning attacks, +/// detecting substantial mempool feerate changes between blocks, and ensuring reliability if +/// broadcasting fails. We recommend invoking this every 30 seconds, or lower if running in an +/// environment with spotty connections, like on mobile. +/// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [module-level documentation]: crate::chain::chainmonitor -pub struct ChainMonitor +/// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims +pub struct ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, @@ -240,9 +248,11 @@ pub struct ChainMonitor, Option)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, + + event_notifier: Notifier, } -impl ChainMonitor +impl ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, @@ -300,6 +310,7 @@ where C::Target: chain::Filter, ChannelMonitorUpdateStatus::PermanentFailure => { monitor_state.channel_perm_failed.store(true, Ordering::Release); self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); + self.event_notifier.notify(); }, ChannelMonitorUpdateStatus::InProgress => { log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); @@ -345,6 +356,7 @@ where C::Target: chain::Filter, persister, pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), + event_notifier: Notifier::new(), } } @@ -472,6 +484,7 @@ where C::Target: chain::Filter, } }, } + self.event_notifier.notify(); Ok(()) } @@ -486,11 +499,12 @@ where C::Target: chain::Filter, funding_txo, monitor_update_id, }], counterparty_node_id)); + self.event_notifier.notify(); } #[cfg(any(test, fuzzing, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { - use crate::util::events::EventsProvider; + use crate::events::EventsProvider; let events = core::cell::RefCell::new(Vec::new()); let event_handler = |event: events::Event| events.borrow_mut().push(event); self.process_pending_events(&event_handler); @@ -502,7 +516,7 @@ where C::Target: chain::Filter, /// /// See the trait-level documentation of [`EventsProvider`] for requirements. /// - /// [`EventsProvider`]: crate::util::events::EventsProvider + /// [`EventsProvider`]: crate::events::EventsProvider pub async fn process_pending_events_async Future>( &self, handler: H ) { @@ -514,9 +528,35 @@ where C::Target: chain::Filter, handler(event).await; } } + + /// Gets a [`Future`] that completes when an event is available either via + /// [`chain::Watch::release_pending_monitor_events`] or + /// [`EventsProvider::process_pending_events`]. + /// + /// Note that callbacks registered on the [`Future`] MUST NOT call back into this + /// [`ChainMonitor`] and should instead register actions to be taken later. + /// + /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events + pub fn get_update_future(&self) -> Future { + self.event_notifier.get_future() + } + + /// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is + /// crucial in preventing certain classes of pinning attacks, detecting substantial mempool + /// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend + /// invoking this every 30 seconds, or lower if running in an environment with spotty + /// connections, like on mobile. + pub fn rebroadcast_pending_claims(&self) { + let monitors = self.monitors.read().unwrap(); + for (_, monitor_holder) in &*monitors { + monitor_holder.monitor.rebroadcast_pending_claims( + &*self.broadcaster, &*self.fee_estimator, &*self.logger + ) + } + } } -impl +impl chain::Listen for ChainMonitor where C::Target: chain::Filter, @@ -543,7 +583,7 @@ where } } -impl +impl chain::Confirm for ChainMonitor where C::Target: chain::Filter, @@ -592,7 +632,7 @@ where } } -impl +impl chain::Watch for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, @@ -735,7 +775,7 @@ where C::Target: chain::Filter, } } -impl events::EventsProvider for ChainMonitor +impl events::EventsProvider for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, @@ -792,11 +832,11 @@ mod tests { use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err}; use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch}; use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS; - use crate::ln::channelmanager::{PaymentSendFailure, PaymentId}; + use crate::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider}; + use crate::ln::channelmanager::{PaymentSendFailure, PaymentId, RecipientOnionFields}; use crate::ln::functional_test_utils::*; use crate::ln::msgs::ChannelMessageHandler; use crate::util::errors::APIError; - use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider}; #[test] fn test_async_ooo_offchain_updates() { @@ -819,10 +859,8 @@ mod tests { nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); nodes[1].node.claim_funds(payment_preimage_2); check_added_monitors!(nodes[1], 1); - expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone(); assert_eq!(persistences.len(), 1); @@ -850,8 +888,24 @@ mod tests { .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty()); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); + let claim_events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(claim_events.len(), 2); + match claim_events[0] { + Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => { + assert_eq!(payment_hash_1, *payment_hash); + }, + _ => panic!("Unexpected event"), + } + match claim_events[1] { + Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => { + assert_eq!(payment_hash_2, *payment_hash); + }, + _ => panic!("Unexpected event"), + } + // Now manually walk the commitment signed dance - because we claimed two payments // back-to-back it doesn't fit into the neat walk commitment_signed_dance does. @@ -931,8 +985,9 @@ mod tests { // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass // the update through to the ChannelMonitor which will refuse it (as the channel is closed). chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret), PaymentId(second_payment_hash.0)), - true, APIError::ChannelUnavailable { ref err }, + unwrap_send_err!(nodes[0].node.send_payment_with_route(&route, second_payment_hash, + RecipientOnionFields::secret_only(second_payment_secret), PaymentId(second_payment_hash.0) + ), true, APIError::ChannelUnavailable { ref err }, assert!(err.contains("ChannelMonitor storage failure"))); check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update check_closed_broadcast!(nodes[0], true);