From 2dd8c2b3deb363aeb15e47ea1c5e076f108599b0 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 19 Jul 2024 13:41:37 +0200 Subject: [PATCH] Add `Notifier` to `OnionMessenger` --- lightning-background-processor/src/lib.rs | 76 ++++++++++++++++++----- lightning/src/onion_message/messenger.rs | 20 ++++++ lightning/src/util/wakers.rs | 13 +++- 3 files changed, 94 insertions(+), 15 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 314de7465..874a1936e 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -492,23 +492,28 @@ pub(crate) mod futures_util { pub(crate) struct Selector< A: Future + Unpin, B: Future + Unpin, - C: Future + Unpin, + C: Future + Unpin, + D: Future + Unpin, > { pub a: A, pub b: B, pub c: C, + pub d: D, } + pub(crate) enum SelectorOutput { A, B, - C(bool), + C, + D(bool), } impl< A: Future + Unpin, B: Future + Unpin, - C: Future + Unpin, - > Future for Selector + C: Future + Unpin, + D: Future + Unpin, + > Future for Selector { type Output = SelectorOutput; fn poll( @@ -527,8 +532,14 @@ pub(crate) mod futures_util { Poll::Pending => {}, } match Pin::new(&mut self.c).poll(ctx) { + Poll::Ready(()) => { + return Poll::Ready(SelectorOutput::C); + }, + Poll::Pending => {}, + } + match Pin::new(&mut self.d).poll(ctx) { Poll::Ready(res) => { - return Poll::Ready(SelectorOutput::C(res)); + return Poll::Ready(SelectorOutput::D(res)); }, Poll::Pending => {}, } @@ -536,6 +547,28 @@ pub(crate) mod futures_util { } } + /// A selector that takes a future wrapped in an option that will be polled if it is `Some` and + /// will always be pending otherwise. + pub(crate) struct OptionalSelector + Unpin> { + pub optional_future: Option, + } + + impl + Unpin> Future for OptionalSelector { + type Output = (); + fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { + match self.optional_future.as_mut() { + Some(f) => match Pin::new(f).poll(ctx) { + Poll::Ready(()) => { + self.optional_future.take(); + Poll::Ready(()) + }, + Poll::Pending => Poll::Pending, + }, + None => Poll::Pending, + } + } + } + // If we want to poll a future without an async context to figure out if it has completed or // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values // but sadly there's a good bit of boilerplate here. @@ -557,7 +590,7 @@ pub(crate) mod futures_util { #[cfg(feature = "futures")] use core::task; #[cfg(feature = "futures")] -use futures_util::{dummy_waker, Selector, SelectorOutput}; +use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; /// Processes background events in a future. /// @@ -782,18 +815,25 @@ where scorer, should_break, { + let om_fut = if let Some(om) = onion_messenger.as_ref() { + let fut = om.get_om().get_update_future(); + OptionalSelector { optional_future: Some(fut) } + } else { + OptionalSelector { optional_future: None } + }; let fut = Selector { a: channel_manager.get_cm().get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), - c: sleeper(if mobile_interruptable_platform { + c: om_fut, + d: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), }; match fut.await { - SelectorOutput::A | SelectorOutput::B => {}, - SelectorOutput::C(exit) => { + SelectorOutput::A | SelectorOutput::B | SelectorOutput::C => {}, + SelectorOutput::D(exit) => { should_break = exit; }, } @@ -938,11 +978,19 @@ impl BackgroundProcessor { scorer, stop_thread.load(Ordering::Acquire), { - Sleeper::from_two_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - ) - .wait_timeout(Duration::from_millis(100)); + let sleeper = if let Some(om) = onion_messenger.as_ref() { + Sleeper::from_three_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + &om.get_om().get_update_future(), + ) + } else { + Sleeper::from_two_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + ) + }; + sleeper.wait_timeout(Duration::from_millis(100)); }, |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 7c7cd2610..590886721 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -34,6 +34,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture}; use crate::util::logger::{Logger, WithContext}; use crate::util::ser::Writeable; +use crate::util::wakers::{Future, Notifier}; use core::fmt; use core::ops::Deref; @@ -266,6 +267,9 @@ pub struct OnionMessenger< pending_intercepted_msgs_events: Mutex>, pending_peer_connected_events: Mutex>, pending_events_processor: AtomicBool, + /// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for + /// it to give to users. + event_notifier: Notifier, } /// [`OnionMessage`]s buffered to be sent. @@ -1037,6 +1041,7 @@ macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: if $res.iter().any(|r| r.is_err()) { // We failed handling some events. Return to have them eventually replayed. $self.pending_events_processor.store(false, Ordering::Release); + $self.event_notifier.notify(); return; } } @@ -1119,6 +1124,7 @@ where pending_intercepted_msgs_events: Mutex::new(Vec::new()), pending_peer_connected_events: Mutex::new(Vec::new()), pending_events_processor: AtomicBool::new(false), + event_notifier: Notifier::new(), } } @@ -1230,6 +1236,7 @@ where Some(addresses) => { e.insert(OnionMessageRecipient::pending_connection(addresses)) .enqueue_message(onion_message); + self.event_notifier.notify(); Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) }, }, @@ -1345,6 +1352,18 @@ where return } pending_intercepted_msgs_events.push(event); + self.event_notifier.notify(); + } + + /// Gets a [`Future`] that completes when an event is available via + /// [`EventsProvider::process_pending_events`] or [`Self::process_pending_events_async`]. + /// + /// Note that callbacks registered on the [`Future`] MUST NOT call back into this + /// [`OnionMessenger`] and should instead register actions to be taken later. + /// + /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events + pub fn get_update_future(&self) -> Future { + self.event_notifier.get_future() } /// Processes any events asynchronously using the given handler. @@ -1616,6 +1635,7 @@ where pending_peer_connected_events.push( Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } ); + self.event_notifier.notify(); } } else { self.message_recipients.lock().unwrap().remove(their_node_id); diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index c95b3dbef..5107d0529 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -244,10 +244,21 @@ impl Sleeper { Self { notifiers: vec![Arc::clone(&future.state)] } } /// Constructs a new sleeper from two futures, allowing blocking on both at once. - // Note that this is the common case - a ChannelManager and ChainMonitor. pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self { Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] } } + /// Constructs a new sleeper from three futures, allowing blocking on all three at once. + /// + // Note that this is the common case - a ChannelManager, a ChainMonitor, and an + // OnionMessenger. + pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self { + let notifiers = vec![ + Arc::clone(&fut_a.state), + Arc::clone(&fut_b.state), + Arc::clone(&fut_c.state) + ]; + Self { notifiers } + } /// Constructs a new sleeper on many futures, allowing blocking on all at once. pub fn new(futures: Vec) -> Self { Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() } -- 2.39.5