Wake the background processor if an async monitor update completes
authorMatt Corallo <git@bluematt.me>
Thu, 9 Mar 2023 03:11:13 +0000 (03:11 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 3 Apr 2023 16:49:54 +0000 (16:49 +0000)
If the `ChainMonitor` gets an async monitor update completion, this
means the `ChannelManager` needs to be polled for event processing.
Here we wake it using the new multi-`Future`-await `Sleeper`, or
the existing `select` block in the async BP.

Fixes #2052.

lightning-background-processor/src/lib.rs
lightning/src/chain/chainmonitor.rs

index cbff019a48cb769bdeaa110c28d482065c07f1e2..11d4d7af299f8969c61746f5548152e0b97f3fcb 100644 (file)
@@ -38,6 +38,8 @@ use lightning::routing::router::Router;
 use lightning::routing::scoring::{Score, WriteableScore};
 use lightning::util::logger::Logger;
 use lightning::util::persist::Persister;
+#[cfg(feature = "std")]
+use lightning::util::wakers::Sleeper;
 use lightning_rapid_gossip_sync::RapidGossipSync;
 
 use core::ops::Deref;
@@ -388,15 +390,20 @@ pub(crate) mod futures_util {
        use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
        use core::pin::Pin;
        use core::marker::Unpin;
-       pub(crate) struct Selector<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> {
+       pub(crate) struct Selector<
+               A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
+       > {
                pub a: A,
                pub b: B,
+               pub c: C,
        }
        pub(crate) enum SelectorOutput {
-               A, B(bool),
+               A, B, C(bool),
        }
 
-       impl<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> Future for Selector<A, B> {
+       impl<
+               A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
+       > Future for Selector<A, B, C> {
                type Output = SelectorOutput;
                fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
                        match Pin::new(&mut self.a).poll(ctx) {
@@ -404,7 +411,11 @@ pub(crate) mod futures_util {
                                Poll::Pending => {},
                        }
                        match Pin::new(&mut self.b).poll(ctx) {
-                               Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
+                               Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
+                               Poll::Pending => {},
+                       }
+                       match Pin::new(&mut self.c).poll(ctx) {
+                               Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
                                Poll::Pending => {},
                        }
                        Poll::Pending
@@ -514,11 +525,13 @@ where
                gossip_sync, peer_manager, logger, scorer, should_break, {
                        let fut = Selector {
                                a: channel_manager.get_persistable_update_future(),
-                               b: sleeper(Duration::from_millis(100)),
+                               b: chain_monitor.get_update_future(),
+                               c: sleeper(Duration::from_millis(100)),
                        };
                        match fut.await {
                                SelectorOutput::A => true,
-                               SelectorOutput::B(exit) => {
+                               SelectorOutput::B => false,
+                               SelectorOutput::C(exit) => {
                                        should_break = exit;
                                        false
                                }
@@ -643,7 +656,10 @@ impl BackgroundProcessor {
                        define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
                                channel_manager, channel_manager.process_pending_events(&event_handler),
                                gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
-                               channel_manager.get_persistable_update_future().wait_timeout(Duration::from_millis(100)),
+                               Sleeper::from_two_futures(
+                                       channel_manager.get_persistable_update_future(),
+                                       chain_monitor.get_update_future()
+                               ).wait_timeout(Duration::from_millis(100)),
                                |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
index 4bfb47de402fad766b3d0feb951ababd99415556..f4109ac173d2e1cca2ec068a8271cd266f7b0593 100644 (file)
@@ -37,6 +37,7 @@ use crate::events::{Event, EventHandler};
 use crate::util::atomic_counter::AtomicCounter;
 use crate::util::logger::Logger;
 use crate::util::errors::APIError;
+use crate::util::wakers::{Future, Notifier};
 use crate::ln::channelmanager::ChannelDetails;
 
 use crate::prelude::*;
@@ -240,6 +241,8 @@ pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T:
        pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
        /// The best block height seen, used as a proxy for the passage of time.
        highest_chain_height: AtomicUsize,
+
+       event_notifier: Notifier,
 }
 
 impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@@ -300,6 +303,7 @@ where C::Target: chain::Filter,
                                        ChannelMonitorUpdateStatus::PermanentFailure => {
                                                monitor_state.channel_perm_failed.store(true, Ordering::Release);
                                                self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
+                                               self.event_notifier.notify();
                                        },
                                        ChannelMonitorUpdateStatus::InProgress => {
                                                log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
@@ -345,6 +349,7 @@ where C::Target: chain::Filter,
                        persister,
                        pending_monitor_events: Mutex::new(Vec::new()),
                        highest_chain_height: AtomicUsize::new(0),
+                       event_notifier: Notifier::new(),
                }
        }
 
@@ -472,6 +477,7 @@ where C::Target: chain::Filter,
                                }
                        },
                }
+               self.event_notifier.notify();
                Ok(())
        }
 
@@ -486,6 +492,7 @@ where C::Target: chain::Filter,
                        funding_txo,
                        monitor_update_id,
                }], counterparty_node_id));
+               self.event_notifier.notify();
        }
 
        #[cfg(any(test, fuzzing, feature = "_test_utils"))]
@@ -514,6 +521,18 @@ where C::Target: chain::Filter,
                        handler(event).await;
                }
        }
+
+       /// Gets a [`Future`] that completes when an event is available either via
+       /// [`chain::Watch::release_pending_monitor_events`] or
+       /// [`EventsProvider::process_pending_events`].
+       ///
+       /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
+       /// [`ChainMonitor`] and should instead register actions to be taken later.
+       ///
+       /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
+       pub fn get_update_future(&self) -> Future {
+               self.event_notifier.get_future()
+       }
 }
 
 impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>