X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchannelmonitor.rs;h=36e3bc46bfcc40370519a3d906d06e9e90458bea;hb=refs%2Fheads%2F2023-06-mon-event-less-race;hp=904d9941349804f99cb31127e2757fcc51bbf473;hpb=bd1206777735696c7aa5ece2f2f2bda6c5a87661;p=rust-lightning diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 904d9941..36e3bc46 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -49,7 +49,7 @@ use crate::chain::Filter; use crate::util::logger::Logger; use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48}; use crate::util::byte_utils; -use crate::events::Event; +use crate::events::{Event, EventHandler}; use crate::events::bump_transaction::{AnchorDescriptor, HTLCDescriptor, BumpTransactionEvent}; use crate::prelude::*; @@ -738,11 +738,6 @@ impl Readable for IrrevocablyResolvedHTLC { /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date /// information and are actively monitoring the chain. /// -/// Pending Events or updated HTLCs which have not yet been read out by -/// get_and_clear_pending_monitor_events or get_and_clear_pending_events are serialized to disk and -/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events -/// gotten are fully handled before re-serializing the new state. -/// /// Note that the deserializer is only implemented for (BlockHash, ChannelMonitor), which /// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along /// the "reorg path" (ie disconnecting blocks until you find a common ancestor from both the @@ -752,7 +747,7 @@ pub struct ChannelMonitor { #[cfg(test)] pub(crate) inner: Mutex>, #[cfg(not(test))] - inner: Mutex>, + pub(super) inner: Mutex>, } #[derive(PartialEq)] @@ -829,7 +824,8 @@ pub(crate) struct ChannelMonitorImpl { // we further MUST NOT generate events during block/transaction-disconnection. pending_monitor_events: Vec, - pending_events: Vec, + pub(super) pending_events: Vec, + pub(super) is_processing_pending_events: bool, // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on // which to take actions once they reach enough confirmations. Each entry includes the @@ -1088,6 +1084,42 @@ impl Writeable for ChannelMonitorImpl { + loop { + let (pending_events, repeated_events); + if let Some(us) = $self_opt { + let mut inner = us.inner.lock().unwrap(); + if inner.is_processing_pending_events { + break; + } + inner.is_processing_pending_events = true; + + pending_events = inner.pending_events.clone(); + repeated_events = inner.get_repeated_events(); + } else { break; } + let num_events = pending_events.len(); + + for event in pending_events.into_iter().chain(repeated_events.into_iter()) { + $event_to_handle = event; + $handle_event; + } + + if let Some(us) = $self_opt { + let mut inner = us.inner.lock().unwrap(); + inner.pending_events.drain(..num_events); + inner.is_processing_pending_events = false; + if !inner.pending_events.is_empty() { + // If there's more events to process, go ahead and do so. + continue; + } + } + break; + } + } +} +pub(super) use _process_events_body as process_events_body; + impl ChannelMonitor { /// For lockorder enforcement purposes, we need to have a single site which constructs the /// `inner` mutex, otherwise cases where we lock two monitors at the same time (eg in our @@ -1179,6 +1211,7 @@ impl ChannelMonitor { payment_preimages: HashMap::new(), pending_monitor_events: Vec::new(), pending_events: Vec::new(), + is_processing_pending_events: false, onchain_events_awaiting_threshold_conf: Vec::new(), outputs_to_watch, @@ -1306,16 +1339,41 @@ impl ChannelMonitor { self.inner.lock().unwrap().get_and_clear_pending_monitor_events() } - /// Gets the list of pending events which were generated by previous actions, clearing the list - /// in the process. + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. + /// + /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] + /// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain + /// within each channel. As the confirmation of a commitment transaction may be critical to the + /// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an + /// environment with spotty connections, like on mobile. /// - /// This is called by the [`EventsProvider::process_pending_events`] implementation for - /// [`ChainMonitor`]. + /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in + /// order to handle these events. + /// + /// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs + /// [`BumpTransaction`]: crate::events::Event::BumpTransaction + pub fn process_pending_events(&self, handler: &H) where H::Target: EventHandler { + let mut ev; + process_events_body!(Some(self), ev, handler.handle_event(ev)); + } + + /// Processes any events asynchronously. /// - /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events - /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor + /// See [`Self::process_pending_events`] for more information. + pub async fn process_pending_events_async Future>( + &self, handler: &H + ) { + let mut ev; + process_events_body!(Some(self), ev, { handler(ev).await }); + } + + #[cfg(test)] pub fn get_and_clear_pending_events(&self) -> Vec { - self.inner.lock().unwrap().get_and_clear_pending_events() + let mut ret = Vec::new(); + let mut lck = self.inner.lock().unwrap(); + mem::swap(&mut ret, &mut lck.pending_events); + ret.append(&mut lck.get_repeated_events()); + ret } pub(crate) fn get_min_seen_secret(&self) -> u64 { @@ -2531,10 +2589,13 @@ impl ChannelMonitorImpl { ret } - pub fn get_and_clear_pending_events(&mut self) -> Vec { - let mut ret = Vec::new(); - mem::swap(&mut ret, &mut self.pending_events); - for (claim_id, claim_event) in self.onchain_tx_handler.get_and_clear_pending_claim_events().drain(..) { + /// Gets the set of events that are repeated regularly (e.g. those which RBF bump + /// transactions). We're okay if we lose these on restart as they'll be regenerated for us at + /// some regular interval via [`ChannelMonitor::rebroadcast_pending_claims`]. + pub(super) fn get_repeated_events(&mut self) -> Vec { + let pending_claim_events = self.onchain_tx_handler.get_and_clear_pending_claim_events(); + let mut ret = Vec::with_capacity(pending_claim_events.len()); + for (claim_id, claim_event) in pending_claim_events { match claim_event { ClaimEvent::BumpCommitment { package_target_feerate_sat_per_1000_weight, commitment_tx, anchor_output_idx, @@ -4096,6 +4157,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP payment_preimages, pending_monitor_events: pending_monitor_events.unwrap(), pending_events, + is_processing_pending_events: false, onchain_events_awaiting_threshold_conf, outputs_to_watch,