]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Add `Notifier` to `OnionMessenger`
authorElias Rohrer <dev@tnull.de>
Fri, 19 Jul 2024 11:41:37 +0000 (13:41 +0200)
committerElias Rohrer <dev@tnull.de>
Thu, 8 Aug 2024 07:10:41 +0000 (09:10 +0200)
lightning-background-processor/src/lib.rs
lightning/src/onion_message/messenger.rs
lightning/src/util/wakers.rs

index 314de7465089af363dd08abe267888380d5eb090..874a1936ea1ed4a06fa189e8cb18a488462e5a20 100644 (file)
@@ -492,23 +492,28 @@ pub(crate) mod futures_util {
        pub(crate) struct Selector<
                A: Future<Output = ()> + Unpin,
                B: Future<Output = ()> + Unpin,
-               C: Future<Output = bool> + Unpin,
+               C: Future<Output = ()> + Unpin,
+               D: Future<Output = bool> + Unpin,
        > {
                pub a: A,
                pub b: B,
                pub c: C,
+               pub d: D,
        }
+
        pub(crate) enum SelectorOutput {
                A,
                B,
-               C(bool),
+               C,
+               D(bool),
        }
 
        impl<
                        A: Future<Output = ()> + Unpin,
                        B: Future<Output = ()> + Unpin,
-                       C: Future<Output = bool> + Unpin,
-               > Future for Selector<A, B, C>
+                       C: Future<Output = ()> + Unpin,
+                       D: Future<Output = bool> + Unpin,
+               > Future for Selector<A, B, C, D>
        {
                type Output = SelectorOutput;
                fn poll(
@@ -527,8 +532,14 @@ pub(crate) mod futures_util {
                                Poll::Pending => {},
                        }
                        match Pin::new(&mut self.c).poll(ctx) {
+                               Poll::Ready(()) => {
+                                       return Poll::Ready(SelectorOutput::C);
+                               },
+                               Poll::Pending => {},
+                       }
+                       match Pin::new(&mut self.d).poll(ctx) {
                                Poll::Ready(res) => {
-                                       return Poll::Ready(SelectorOutput::C(res));
+                                       return Poll::Ready(SelectorOutput::D(res));
                                },
                                Poll::Pending => {},
                        }
@@ -536,6 +547,28 @@ pub(crate) mod futures_util {
                }
        }
 
+       /// A selector that takes a future wrapped in an option that will be polled if it is `Some` and
+       /// will always be pending otherwise.
+       pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
+               pub optional_future: Option<F>,
+       }
+
+       impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
+               type Output = ();
+               fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
+                       match self.optional_future.as_mut() {
+                               Some(f) => match Pin::new(f).poll(ctx) {
+                                       Poll::Ready(()) => {
+                                               self.optional_future.take();
+                                               Poll::Ready(())
+                                       },
+                                       Poll::Pending => Poll::Pending,
+                               },
+                               None => Poll::Pending,
+                       }
+               }
+       }
+
        // If we want to poll a future without an async context to figure out if it has completed or
        // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
        // but sadly there's a good bit of boilerplate here.
@@ -557,7 +590,7 @@ pub(crate) mod futures_util {
 #[cfg(feature = "futures")]
 use core::task;
 #[cfg(feature = "futures")]
-use futures_util::{dummy_waker, Selector, SelectorOutput};
+use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
 
 /// Processes background events in a future.
 ///
@@ -782,18 +815,25 @@ where
                scorer,
                should_break,
                {
+                       let om_fut = if let Some(om) = onion_messenger.as_ref() {
+                               let fut = om.get_om().get_update_future();
+                               OptionalSelector { optional_future: Some(fut) }
+                       } else {
+                               OptionalSelector { optional_future: None }
+                       };
                        let fut = Selector {
                                a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
                                b: chain_monitor.get_update_future(),
-                               c: sleeper(if mobile_interruptable_platform {
+                               c: om_fut,
+                               d: sleeper(if mobile_interruptable_platform {
                                        Duration::from_millis(100)
                                } else {
                                        Duration::from_secs(FASTEST_TIMER)
                                }),
                        };
                        match fut.await {
-                               SelectorOutput::A | SelectorOutput::B => {},
-                               SelectorOutput::C(exit) => {
+                               SelectorOutput::A | SelectorOutput::B | SelectorOutput::C => {},
+                               SelectorOutput::D(exit) => {
                                        should_break = exit;
                                },
                        }
@@ -938,11 +978,19 @@ impl BackgroundProcessor {
                                scorer,
                                stop_thread.load(Ordering::Acquire),
                                {
-                                       Sleeper::from_two_futures(
-                                               &channel_manager.get_cm().get_event_or_persistence_needed_future(),
-                                               &chain_monitor.get_update_future(),
-                                       )
-                                       .wait_timeout(Duration::from_millis(100));
+                                       let sleeper = if let Some(om) = onion_messenger.as_ref() {
+                                               Sleeper::from_three_futures(
+                                                       &channel_manager.get_cm().get_event_or_persistence_needed_future(),
+                                                       &chain_monitor.get_update_future(),
+                                                       &om.get_om().get_update_future(),
+                                               )
+                                       } else {
+                                               Sleeper::from_two_futures(
+                                                       &channel_manager.get_cm().get_event_or_persistence_needed_future(),
+                                                       &chain_monitor.get_update_future(),
+                                               )
+                                       };
+                                       sleeper.wait_timeout(Duration::from_millis(100));
                                },
                                |_| Instant::now(),
                                |time: &Instant, dur| time.elapsed().as_secs() > dur,
index 7c7cd261089dc7b29128fd4775a295e7b6cf013b..590886721306e5c2038b98e1f8a67478aaa0a205 100644 (file)
@@ -34,6 +34,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
 use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
 use crate::util::logger::{Logger, WithContext};
 use crate::util::ser::Writeable;
+use crate::util::wakers::{Future, Notifier};
 
 use core::fmt;
 use core::ops::Deref;
@@ -266,6 +267,9 @@ pub struct OnionMessenger<
        pending_intercepted_msgs_events: Mutex<Vec<Event>>,
        pending_peer_connected_events: Mutex<Vec<Event>>,
        pending_events_processor: AtomicBool,
+       /// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
+       /// it to give to users.
+       event_notifier: Notifier,
 }
 
 /// [`OnionMessage`]s buffered to be sent.
@@ -1037,6 +1041,7 @@ macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset:
                if $res.iter().any(|r| r.is_err()) {
                        // We failed handling some events. Return to have them eventually replayed.
                        $self.pending_events_processor.store(false, Ordering::Release);
+                       $self.event_notifier.notify();
                        return;
                }
        }
@@ -1119,6 +1124,7 @@ where
                        pending_intercepted_msgs_events: Mutex::new(Vec::new()),
                        pending_peer_connected_events: Mutex::new(Vec::new()),
                        pending_events_processor: AtomicBool::new(false),
+                       event_notifier: Notifier::new(),
                }
        }
 
@@ -1230,6 +1236,7 @@ where
                                Some(addresses) => {
                                        e.insert(OnionMessageRecipient::pending_connection(addresses))
                                                .enqueue_message(onion_message);
+                                       self.event_notifier.notify();
                                        Ok(SendSuccess::BufferedAwaitingConnection(first_node_id))
                                },
                        },
@@ -1345,6 +1352,18 @@ where
                        return
                }
                pending_intercepted_msgs_events.push(event);
+               self.event_notifier.notify();
+       }
+
+       /// Gets a [`Future`] that completes when an event is available via
+       /// [`EventsProvider::process_pending_events`] or [`Self::process_pending_events_async`].
+       ///
+       /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
+       /// [`OnionMessenger`] and should instead register actions to be taken later.
+       ///
+       /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
+       pub fn get_update_future(&self) -> Future {
+               self.event_notifier.get_future()
        }
 
        /// Processes any events asynchronously using the given handler.
@@ -1616,6 +1635,7 @@ where
                                pending_peer_connected_events.push(
                                        Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
                                );
+                               self.event_notifier.notify();
                        }
                } else {
                        self.message_recipients.lock().unwrap().remove(their_node_id);
index c95b3dbef3d90e66b118f92790256073a2f67dce..5107d05297f22f7493b38ddc67465b3e8e8703af 100644 (file)
@@ -244,10 +244,21 @@ impl Sleeper {
                Self { notifiers: vec![Arc::clone(&future.state)] }
        }
        /// Constructs a new sleeper from two futures, allowing blocking on both at once.
-       // Note that this is the common case - a ChannelManager and ChainMonitor.
        pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
                Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
        }
+       /// Constructs a new sleeper from three futures, allowing blocking on all three at once.
+       ///
+       // Note that this is the common case - a ChannelManager, a ChainMonitor, and an
+       // OnionMessenger.
+       pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self {
+               let notifiers = vec![
+                       Arc::clone(&fut_a.state),
+                       Arc::clone(&fut_b.state),
+                       Arc::clone(&fut_c.state)
+               ];
+               Self { notifiers }
+       }
        /// Constructs a new sleeper on many futures, allowing blocking on all at once.
        pub fn new(futures: Vec<Future>) -> Self {
                Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }