]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Handle fallible events in `OnionMessenger`
authorElias Rohrer <dev@tnull.de>
Tue, 2 Jul 2024 10:04:33 +0000 (12:04 +0200)
committerElias Rohrer <dev@tnull.de>
Thu, 18 Jul 2024 13:54:21 +0000 (15:54 +0200)
Previously, we would just fire-and-forget in `OnionMessenger`'s event
handling. Since we now introduced the possibility of event handling
failures, we here adapt the event handling logic to retain any
events which we failed to handle to have them replayed upon the next
invocation of `process_pending_events`/`process_pending_events_async`.

lightning/src/onion_message/messenger.rs

index 3a94d6c26de05c466ccf12ad109b2767b3396f93..7c7cd261089dc7b29128fd4775a295e7b6cf013b 100644 (file)
@@ -37,6 +37,7 @@ use crate::util::ser::Writeable;
 
 use core::fmt;
 use core::ops::Deref;
+use core::sync::atomic::{AtomicBool, Ordering};
 use crate::io;
 use crate::sync::Mutex;
 use crate::prelude::*;
@@ -264,6 +265,7 @@ pub struct OnionMessenger<
        intercept_messages_for_offline_peers: bool,
        pending_intercepted_msgs_events: Mutex<Vec<Event>>,
        pending_peer_connected_events: Mutex<Vec<Event>>,
+       pending_events_processor: AtomicBool,
 }
 
 /// [`OnionMessage`]s buffered to be sent.
@@ -1018,6 +1020,28 @@ where
        }
 }
 
+macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
+       // We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
+       // successfully handled events from the given queue, reset the events processing flag, and
+       // return, to have the events eventually replayed upon next invocation.
+       {
+               let mut queue_lock = $event_queue.lock().unwrap();
+
+               // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
+               let mut res_iter = $res.iter().skip($offset);
+
+               // Keep all events which previously error'd *or* any that have been added since we dropped
+               // the Mutex before.
+               queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
+
+               if $res.iter().any(|r| r.is_err()) {
+                       // We failed handling some events. Return to have them eventually replayed.
+                       $self.pending_events_processor.store(false, Ordering::Release);
+                       return;
+               }
+       }
+}}
+
 impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref>
 OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
 where
@@ -1094,6 +1118,7 @@ where
                        intercept_messages_for_offline_peers,
                        pending_intercepted_msgs_events: Mutex::new(Vec::new()),
                        pending_peer_connected_events: Mutex::new(Vec::new()),
+                       pending_events_processor: AtomicBool::new(false),
                }
        }
 
@@ -1332,45 +1357,60 @@ where
        pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
                &self, handler: H
        ) {
-               let mut intercepted_msgs = Vec::new();
-               let mut peer_connecteds = Vec::new();
-               {
-                       let mut pending_intercepted_msgs_events =
-                               self.pending_intercepted_msgs_events.lock().unwrap();
-                       let mut pending_peer_connected_events =
-                               self.pending_peer_connected_events.lock().unwrap();
-                       core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs);
-                       core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds);
+               if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
+                       return;
                }
 
-               let mut futures = Vec::with_capacity(intercepted_msgs.len());
-               for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
-                       if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
-                               if let Some(addresses) = addresses.take() {
-                                       let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
-                                       futures.push(future);
+               {
+                       let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone();
+                       let mut futures = Vec::with_capacity(intercepted_msgs.len());
+                       for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
+                               if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
+                                       if let Some(addresses) = addresses.take() {
+                                               let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
+                                               futures.push(future);
+                                       }
                                }
                        }
-               }
 
-               for ev in intercepted_msgs {
-                       if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
-                       let future = ResultFuture::Pending(handler(ev));
-                       futures.push(future);
-               }
-               // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
-               MultiResultFuturePoller::new(futures).await;
+                       // The offset in the `futures` vec at which `intercepted_msgs` start. We don't bother
+                       // replaying `ConnectionNeeded` events.
+                       let intercepted_msgs_offset = futures.len();
 
-               if peer_connecteds.len() <= 1 {
-                       for event in peer_connecteds { handler(event).await; }
-               } else {
-                       let mut futures = Vec::new();
-                       for event in peer_connecteds {
-                               let future = ResultFuture::Pending(handler(event));
+                       for ev in intercepted_msgs {
+                               if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
+                               let future = ResultFuture::Pending(handler(ev));
                                futures.push(future);
                        }
-                       MultiResultFuturePoller::new(futures).await;
+                       // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
+                       let res = MultiResultFuturePoller::new(futures).await;
+                       drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
                }
+
+               {
+                       let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone();
+                       let num_peer_connecteds = peer_connecteds.len();
+                       if num_peer_connecteds <= 1 {
+                               for event in peer_connecteds {
+                                       if handler(event).await.is_ok() {
+                                               self.pending_peer_connected_events.lock().unwrap().drain(..num_peer_connecteds);
+                                       } else {
+                                               // We failed handling the event. Return to have it eventually replayed.
+                                               self.pending_events_processor.store(false, Ordering::Release);
+                                               return;
+                                       }
+                               }
+                       } else {
+                               let mut futures = Vec::new();
+                               for event in peer_connecteds {
+                                       let future = ResultFuture::Pending(handler(event));
+                                       futures.push(future);
+                               }
+                               let res = MultiResultFuturePoller::new(futures).await;
+                               drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
+                       }
+               }
+               self.pending_events_processor.store(false, Ordering::Release);
        }
 }
 
@@ -1410,6 +1450,10 @@ where
        CMH::Target: CustomOnionMessageHandler,
 {
        fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
+               if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
+                       return;
+               }
+
                for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
                        if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
                                if let Some(addresses) = addresses.take() {
@@ -1417,10 +1461,13 @@ where
                                }
                        }
                }
-               let mut events = Vec::new();
+               let intercepted_msgs;
+               let peer_connecteds;
                {
-                       let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
+                       let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
+                       intercepted_msgs = pending_intercepted_msgs_events.clone();
                        let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
+                       peer_connecteds = pending_peer_connected_events.clone();
                        #[cfg(debug_assertions)] {
                                for ev in pending_intercepted_msgs_events.iter() {
                                        if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
@@ -1429,13 +1476,16 @@ where
                                        if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
                                }
                        }
-                       core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events);
-                       events.append(&mut pending_peer_connected_events);
                        pending_peer_connected_events.shrink_to(10); // Limit total heap usage
                }
-               for ev in events {
-                       handler.handle_event(ev);
-               }
+
+               let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
+               drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
+
+               let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
+               drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
+
+               self.pending_events_processor.store(false, Ordering::Release);
        }
 }