From 3abec4f03194d818b72bf377b2086563c56f1812 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 10 May 2024 20:52:10 +0000 Subject: [PATCH] Add a parallel async event handler to `OnionMessenger` This adds an `OnionMessenger::process_pending_events_async` mirroring the same in `ChannelManager`. However, unlike the one in `ChannelManager`, this processes the events in parallel by spawning all futures and using the new `MultiFuturePoller`. Because `OnionMessenger` just generates a stream of messages to store/fetch, we first process all the events to store new messages, `await` them, then process all the events to fetch stored messages, ensuring reordering shouldn't result in lost messages (unless we race with a peer disconnection, which could happen anyway). --- lightning/src/onion_message/messenger.rs | 45 ++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 9043eccbe..7bea65ab5 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -1154,6 +1154,51 @@ where } pending_events.intercepted_msgs.push(event); } + + /// Processes any events asynchronously using the given handler. + /// + /// Note that the event handler is called in the order each event was generated, however + /// futures are polled in parallel for some events to allow for parallelism where events do not + /// have an ordering requirement. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + pub async fn process_pending_events_async + core::marker::Unpin, H: Fn(Event) -> Future>( + &self, handler: H + ) { + let mut intercepted_msgs = Vec::new(); + let mut peer_connecteds = Vec::new(); + { + let mut pending_events = self.pending_events.lock().unwrap(); + core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs); + core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds); + } + + let mut futures = Vec::with_capacity(intercepted_msgs.len()); + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }))); + } + } + } + + for ev in intercepted_msgs { + if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } + futures.push(Some(handler(ev))); + } + // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds + crate::util::async_poll::MultiFuturePoller(futures).await; + + if peer_connecteds.len() <= 1 { + for event in peer_connecteds { handler(event).await; } + } else { + let mut futures = Vec::new(); + for event in peer_connecteds { + futures.push(Some(handler(event))); + } + crate::util::async_poll::MultiFuturePoller(futures).await; + } + } } fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { -- 2.39.5