Don't drop `ChannelMonitor` `Event`s until they're processed 2023-06-mon-event-less-race
authorMatt Corallo <git@bluematt.me>
Tue, 20 Jun 2023 22:52:27 +0000 (22:52 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 10 Jul 2023 16:52:04 +0000 (16:52 +0000)
We currently assume the owner of `ChannelMonitor`s won't persist
the `ChannelMonitor` while `Event`s are being processed. This is
fine, except (a) its generally hard to do so and (b) the
`ChainMonitor` doesn't even do this.

Thus, in rare cases, a user could begin processing events which
are, generated by connecting a transaction or a new best-block,
take some time to do so, and while doing so process a further chain
event, causing persistece. This could lose the event being
processed alltogether, which could lose the user funds.

This should be very rare, but may have been made slightly more
reachable with (a) the async event processing making it more
common to do networking in event handling, (b) the new future
generation in the `ChainMonitor`, which now wakes the
`background-processor` directly when chain actions happen on the
`ChainMonitor`.

lightning/src/chain/chainmonitor.rs
lightning/src/chain/channelmonitor.rs

index 562c76fa3e2c996f136c54d8fb4adb36baeba802..2cc71a2ecc7ce7a77e99abee6fc0a135a058c7bc 100644 (file)
@@ -520,12 +520,13 @@ where C::Target: chain::Filter,
        pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
                &self, handler: H
        ) {
-               let mut pending_events = Vec::new();
-               for monitor_state in self.monitors.read().unwrap().values() {
-                       pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
-               }
-               for event in pending_events {
-                       handler(event).await;
+               // Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
+               // crazy dance to process a monitor's events then only remove them once we've done so.
+               let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
+               for funding_txo in mons_to_process {
+                       let mut ev;
+                       super::channelmonitor::process_events_body!(
+                               self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
                }
        }
 
@@ -796,12 +797,8 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L
        /// [`SpendableOutputs`]: events::Event::SpendableOutputs
        /// [`BumpTransaction`]: events::Event::BumpTransaction
        fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
-               let mut pending_events = Vec::new();
                for monitor_state in self.monitors.read().unwrap().values() {
-                       pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
-               }
-               for event in pending_events {
-                       handler.handle_event(event);
+                       monitor_state.monitor.process_pending_events(&handler);
                }
        }
 }
index 904d9941349804f99cb31127e2757fcc51bbf473..36e3bc46bfcc40370519a3d906d06e9e90458bea 100644 (file)
@@ -49,7 +49,7 @@ use crate::chain::Filter;
 use crate::util::logger::Logger;
 use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
 use crate::util::byte_utils;
-use crate::events::Event;
+use crate::events::{Event, EventHandler};
 use crate::events::bump_transaction::{AnchorDescriptor, HTLCDescriptor, BumpTransactionEvent};
 
 use crate::prelude::*;
@@ -738,11 +738,6 @@ impl Readable for IrrevocablyResolvedHTLC {
 /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date
 /// information and are actively monitoring the chain.
 ///
-/// Pending Events or updated HTLCs which have not yet been read out by
-/// get_and_clear_pending_monitor_events or get_and_clear_pending_events are serialized to disk and
-/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
-/// gotten are fully handled before re-serializing the new state.
-///
 /// Note that the deserializer is only implemented for (BlockHash, ChannelMonitor), which
 /// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along
 /// the "reorg path" (ie disconnecting blocks until you find a common ancestor from both the
@@ -752,7 +747,7 @@ pub struct ChannelMonitor<Signer: WriteableEcdsaChannelSigner> {
        #[cfg(test)]
        pub(crate) inner: Mutex<ChannelMonitorImpl<Signer>>,
        #[cfg(not(test))]
-       inner: Mutex<ChannelMonitorImpl<Signer>>,
+       pub(super) inner: Mutex<ChannelMonitorImpl<Signer>>,
 }
 
 #[derive(PartialEq)]
@@ -829,7 +824,8 @@ pub(crate) struct ChannelMonitorImpl<Signer: WriteableEcdsaChannelSigner> {
        // we further MUST NOT generate events during block/transaction-disconnection.
        pending_monitor_events: Vec<MonitorEvent>,
 
-       pending_events: Vec<Event>,
+       pub(super) pending_events: Vec<Event>,
+       pub(super) is_processing_pending_events: bool,
 
        // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on
        // which to take actions once they reach enough confirmations. Each entry includes the
@@ -1088,6 +1084,42 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signe
        }
 }
 
+macro_rules! _process_events_body {
+       ($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
+               loop {
+                       let (pending_events, repeated_events);
+                       if let Some(us) = $self_opt {
+                               let mut inner = us.inner.lock().unwrap();
+                               if inner.is_processing_pending_events {
+                                       break;
+                               }
+                               inner.is_processing_pending_events = true;
+
+                               pending_events = inner.pending_events.clone();
+                               repeated_events = inner.get_repeated_events();
+                       } else { break; }
+                       let num_events = pending_events.len();
+
+                       for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
+                               $event_to_handle = event;
+                               $handle_event;
+                       }
+
+                       if let Some(us) = $self_opt {
+                               let mut inner = us.inner.lock().unwrap();
+                               inner.pending_events.drain(..num_events);
+                               inner.is_processing_pending_events = false;
+                               if !inner.pending_events.is_empty() {
+                                       // If there's more events to process, go ahead and do so.
+                                       continue;
+                               }
+                       }
+                       break;
+               }
+       }
+}
+pub(super) use _process_events_body as process_events_body;
+
 impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
        /// For lockorder enforcement purposes, we need to have a single site which constructs the
        /// `inner` mutex, otherwise cases where we lock two monitors at the same time (eg in our
@@ -1179,6 +1211,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
                        payment_preimages: HashMap::new(),
                        pending_monitor_events: Vec::new(),
                        pending_events: Vec::new(),
+                       is_processing_pending_events: false,
 
                        onchain_events_awaiting_threshold_conf: Vec::new(),
                        outputs_to_watch,
@@ -1306,16 +1339,41 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
                self.inner.lock().unwrap().get_and_clear_pending_monitor_events()
        }
 
-       /// Gets the list of pending events which were generated by previous actions, clearing the list
-       /// in the process.
+       /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
+       ///
+       /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
+       /// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
+       /// within each channel. As the confirmation of a commitment transaction may be critical to the
+       /// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an
+       /// environment with spotty connections, like on mobile.
        ///
-       /// This is called by the [`EventsProvider::process_pending_events`] implementation for
-       /// [`ChainMonitor`].
+       /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
+       /// order to handle these events.
+       ///
+       /// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
+       /// [`BumpTransaction`]: crate::events::Event::BumpTransaction
+       pub fn process_pending_events<H: Deref>(&self, handler: &H) where H::Target: EventHandler {
+               let mut ev;
+               process_events_body!(Some(self), ev, handler.handle_event(ev));
+       }
+
+       /// Processes any events asynchronously.
        ///
-       /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
-       /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
+       /// See [`Self::process_pending_events`] for more information.
+       pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+               &self, handler: &H
+       ) {
+               let mut ev;
+               process_events_body!(Some(self), ev, { handler(ev).await });
+       }
+
+       #[cfg(test)]
        pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
-               self.inner.lock().unwrap().get_and_clear_pending_events()
+               let mut ret = Vec::new();
+               let mut lck = self.inner.lock().unwrap();
+               mem::swap(&mut ret, &mut lck.pending_events);
+               ret.append(&mut lck.get_repeated_events());
+               ret
        }
 
        pub(crate) fn get_min_seen_secret(&self) -> u64 {
@@ -2531,10 +2589,13 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
                ret
        }
 
-       pub fn get_and_clear_pending_events(&mut self) -> Vec<Event> {
-               let mut ret = Vec::new();
-               mem::swap(&mut ret, &mut self.pending_events);
-               for (claim_id, claim_event) in self.onchain_tx_handler.get_and_clear_pending_claim_events().drain(..) {
+       /// Gets the set of events that are repeated regularly (e.g. those which RBF bump
+       /// transactions). We're okay if we lose these on restart as they'll be regenerated for us at
+       /// some regular interval via [`ChannelMonitor::rebroadcast_pending_claims`].
+       pub(super) fn get_repeated_events(&mut self) -> Vec<Event> {
+               let pending_claim_events = self.onchain_tx_handler.get_and_clear_pending_claim_events();
+               let mut ret = Vec::with_capacity(pending_claim_events.len());
+               for (claim_id, claim_event) in pending_claim_events {
                        match claim_event {
                                ClaimEvent::BumpCommitment {
                                        package_target_feerate_sat_per_1000_weight, commitment_tx, anchor_output_idx,
@@ -4096,6 +4157,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
                        payment_preimages,
                        pending_monitor_events: pending_monitor_events.unwrap(),
                        pending_events,
+                       is_processing_pending_events: false,
 
                        onchain_events_awaiting_threshold_conf,
                        outputs_to_watch,