]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Avoid allocating `Vec` for a single result
authorElias Rohrer <dev@tnull.de>
Fri, 19 Jul 2024 08:57:42 +0000 (10:57 +0200)
committerElias Rohrer <dev@tnull.de>
Wed, 21 Aug 2024 19:57:54 +0000 (21:57 +0200)
lightning/src/onion_message/messenger.rs

index 13e6696c586b571bd6b1d47f6ddb33a0be3360bd..9f8cab2b1ab41588f927e3737470f60adcd0110c 100644 (file)
@@ -1047,21 +1047,25 @@ where
        }
 }
 
-macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
+macro_rules! drop_handled_events_and_abort { ($self: expr, $res_iter: expr, $event_queue: expr) => {
        // We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
        // successfully handled events from the given queue, reset the events processing flag, and
        // return, to have the events eventually replayed upon next invocation.
        {
                let mut queue_lock = $event_queue.lock().unwrap();
 
-               // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
-               let mut res_iter = $res.iter().skip($offset);
-
                // Keep all events which previously error'd *or* any that have been added since we dropped
                // the Mutex before.
-               queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
+               let mut any_error = false;
+               queue_lock.retain(|_| {
+                       $res_iter.next().map_or(true, |r| {
+                               let is_err = r.is_err();
+                               any_error |= is_err;
+                               is_err
+                       })
+               });
 
-               if $res.iter().any(|r| r.is_err()) {
+               if any_error {
                        // We failed handling some events. Return to have them eventually replayed.
                        $self.pending_events_processor.store(false, Ordering::Release);
                        $self.event_notifier.notify();
@@ -1426,7 +1430,8 @@ where
                        }
                        // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
                        let res = MultiResultFuturePoller::new(futures).await;
-                       drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
+                       let mut res_iter = res.iter().skip(intercepted_msgs_offset);
+                       drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events);
                }
 
                {
@@ -1449,7 +1454,8 @@ where
                                        futures.push(future);
                                }
                                let res = MultiResultFuturePoller::new(futures).await;
-                               drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
+                               let mut res_iter = res.iter();
+                               drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events);
                        }
                }
                self.pending_events_processor.store(false, Ordering::Release);
@@ -1508,7 +1514,7 @@ where
                {
                        let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
                        intercepted_msgs = pending_intercepted_msgs_events.clone();
-                       let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
+                       let pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
                        peer_connecteds = pending_peer_connected_events.clone();
                        #[cfg(debug_assertions)] {
                                for ev in pending_intercepted_msgs_events.iter() {
@@ -1518,14 +1524,47 @@ where
                                        if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
                                }
                        }
-                       pending_peer_connected_events.shrink_to(10); // Limit total heap usage
                }
 
-               let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
-               drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
+               let mut handling_intercepted_msgs_failed = false;
+               let mut num_handled_intercepted_events = 0;
+               for ev in intercepted_msgs {
+                       match handler.handle_event(ev) {
+                               Ok(()) => num_handled_intercepted_events += 1,
+                               Err(ReplayEvent ()) => {
+                                       handling_intercepted_msgs_failed = true;
+                                       break;
+                               }
+                       }
+               }
+
+               {
+                       let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
+                       pending_intercepted_msgs_events.drain(..num_handled_intercepted_events);
+               }
 
-               let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
-               drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
+               if handling_intercepted_msgs_failed {
+                       self.pending_events_processor.store(false, Ordering::Release);
+                       self.event_notifier.notify();
+                       return;
+               }
+
+               let mut num_handled_peer_connecteds = 0;
+               for ev in peer_connecteds {
+                       match handler.handle_event(ev) {
+                               Ok(()) => num_handled_peer_connecteds += 1,
+                               Err(ReplayEvent ()) => {
+                                       self.event_notifier.notify();
+                                       break;
+                               }
+                       }
+               }
+
+               {
+                       let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
+                       pending_peer_connected_events.drain(..num_handled_peer_connecteds);
+                       pending_peer_connected_events.shrink_to(10); // Limit total heap usage
+               }
 
                self.pending_events_processor.store(false, Ordering::Release);
        }