From f5cea0e57a72f3218eda355f7f67326756e85820 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 2 Jul 2024 12:04:33 +0200 Subject: [PATCH] Handle fallible events in `OnionMessenger` Previously, we would just fire-and-forget in `OnionMessenger`'s event handling. Since we now introduced the possibility of event handling failures, we here adapt the event handling logic to retain any events which we failed to handle to have them replayed upon the next invocation of `process_pending_events`/`process_pending_events_async`. --- lightning/src/onion_message/messenger.rs | 124 ++++++++++++++++------- 1 file changed, 87 insertions(+), 37 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 3a94d6c26..7c7cd2610 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -37,6 +37,7 @@ use crate::util::ser::Writeable; use core::fmt; use core::ops::Deref; +use core::sync::atomic::{AtomicBool, Ordering}; use crate::io; use crate::sync::Mutex; use crate::prelude::*; @@ -264,6 +265,7 @@ pub struct OnionMessenger< intercept_messages_for_offline_peers: bool, pending_intercepted_msgs_events: Mutex>, pending_peer_connected_events: Mutex>, + pending_events_processor: AtomicBool, } /// [`OnionMessage`]s buffered to be sent. @@ -1018,6 +1020,28 @@ where } } +macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => { + // We want to make sure to cleanly abort upon event handling failure. To this end, we drop all + // successfully handled events from the given queue, reset the events processing flag, and + // return, to have the events eventually replayed upon next invocation. + { + let mut queue_lock = $event_queue.lock().unwrap(); + + // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`. + let mut res_iter = $res.iter().skip($offset); + + // Keep all events which previously error'd *or* any that have been added since we dropped + // the Mutex before. + queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err())); + + if $res.iter().any(|r| r.is_err()) { + // We failed handling some events. Return to have them eventually replayed. + $self.pending_events_processor.store(false, Ordering::Release); + return; + } + } +}} + impl OnionMessenger where @@ -1094,6 +1118,7 @@ where intercept_messages_for_offline_peers, pending_intercepted_msgs_events: Mutex::new(Vec::new()), pending_peer_connected_events: Mutex::new(Vec::new()), + pending_events_processor: AtomicBool::new(false), } } @@ -1332,45 +1357,60 @@ where pub async fn process_pending_events_async> + core::marker::Unpin, H: Fn(Event) -> Future>( &self, handler: H ) { - let mut intercepted_msgs = Vec::new(); - let mut peer_connecteds = Vec::new(); - { - let mut pending_intercepted_msgs_events = - self.pending_intercepted_msgs_events.lock().unwrap(); - let mut pending_peer_connected_events = - self.pending_peer_connected_events.lock().unwrap(); - core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs); - core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds); + if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; } - let mut futures = Vec::with_capacity(intercepted_msgs.len()); - for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { - if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { - if let Some(addresses) = addresses.take() { - let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })); - futures.push(future); + { + let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone(); + let mut futures = Vec::with_capacity(intercepted_msgs.len()); + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })); + futures.push(future); + } } } - } - for ev in intercepted_msgs { - if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } - let future = ResultFuture::Pending(handler(ev)); - futures.push(future); - } - // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds - MultiResultFuturePoller::new(futures).await; + // The offset in the `futures` vec at which `intercepted_msgs` start. We don't bother + // replaying `ConnectionNeeded` events. + let intercepted_msgs_offset = futures.len(); - if peer_connecteds.len() <= 1 { - for event in peer_connecteds { handler(event).await; } - } else { - let mut futures = Vec::new(); - for event in peer_connecteds { - let future = ResultFuture::Pending(handler(event)); + for ev in intercepted_msgs { + if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } + let future = ResultFuture::Pending(handler(ev)); futures.push(future); } - MultiResultFuturePoller::new(futures).await; + // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds + let res = MultiResultFuturePoller::new(futures).await; + drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events); } + + { + let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone(); + let num_peer_connecteds = peer_connecteds.len(); + if num_peer_connecteds <= 1 { + for event in peer_connecteds { + if handler(event).await.is_ok() { + self.pending_peer_connected_events.lock().unwrap().drain(..num_peer_connecteds); + } else { + // We failed handling the event. Return to have it eventually replayed. + self.pending_events_processor.store(false, Ordering::Release); + return; + } + } + } else { + let mut futures = Vec::new(); + for event in peer_connecteds { + let future = ResultFuture::Pending(handler(event)); + futures.push(future); + } + let res = MultiResultFuturePoller::new(futures).await; + drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events); + } + } + self.pending_events_processor.store(false, Ordering::Release); } } @@ -1410,6 +1450,10 @@ where CMH::Target: CustomOnionMessageHandler, { fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; + } + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { if let Some(addresses) = addresses.take() { @@ -1417,10 +1461,13 @@ where } } } - let mut events = Vec::new(); + let intercepted_msgs; + let peer_connecteds; { - let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap(); + let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap(); + intercepted_msgs = pending_intercepted_msgs_events.clone(); let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); + peer_connecteds = pending_peer_connected_events.clone(); #[cfg(debug_assertions)] { for ev in pending_intercepted_msgs_events.iter() { if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); } @@ -1429,13 +1476,16 @@ where if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } } } - core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events); - events.append(&mut pending_peer_connected_events); pending_peer_connected_events.shrink_to(10); // Limit total heap usage } - for ev in events { - handler.handle_event(ev); - } + + let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::>(); + drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events); + + let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::>(); + drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events); + + self.pending_events_processor.store(false, Ordering::Release); } } -- 2.39.5