From a383beda2f93dae7dd759b79ddd866ea3f22aef7 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 19 Jul 2024 10:57:42 +0200 Subject: [PATCH] Avoid allocating `Vec` for a single result --- lightning/src/onion_message/messenger.rs | 67 +++++++++++++++++++----- 1 file changed, 53 insertions(+), 14 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 13e6696c5..9f8cab2b1 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -1047,21 +1047,25 @@ where } } -macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => { +macro_rules! drop_handled_events_and_abort { ($self: expr, $res_iter: expr, $event_queue: expr) => { // We want to make sure to cleanly abort upon event handling failure. To this end, we drop all // successfully handled events from the given queue, reset the events processing flag, and // return, to have the events eventually replayed upon next invocation. { let mut queue_lock = $event_queue.lock().unwrap(); - // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`. - let mut res_iter = $res.iter().skip($offset); - // Keep all events which previously error'd *or* any that have been added since we dropped // the Mutex before. - queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err())); + let mut any_error = false; + queue_lock.retain(|_| { + $res_iter.next().map_or(true, |r| { + let is_err = r.is_err(); + any_error |= is_err; + is_err + }) + }); - if $res.iter().any(|r| r.is_err()) { + if any_error { // We failed handling some events. Return to have them eventually replayed. $self.pending_events_processor.store(false, Ordering::Release); $self.event_notifier.notify(); @@ -1426,7 +1430,8 @@ where } // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds let res = MultiResultFuturePoller::new(futures).await; - drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events); + let mut res_iter = res.iter().skip(intercepted_msgs_offset); + drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events); } { @@ -1449,7 +1454,8 @@ where futures.push(future); } let res = MultiResultFuturePoller::new(futures).await; - drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events); + let mut res_iter = res.iter(); + drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events); } } self.pending_events_processor.store(false, Ordering::Release); @@ -1508,7 +1514,7 @@ where { let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap(); intercepted_msgs = pending_intercepted_msgs_events.clone(); - let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); + let pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); peer_connecteds = pending_peer_connected_events.clone(); #[cfg(debug_assertions)] { for ev in pending_intercepted_msgs_events.iter() { @@ -1518,14 +1524,47 @@ where if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } } } - pending_peer_connected_events.shrink_to(10); // Limit total heap usage } - let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::>(); - drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events); + let mut handling_intercepted_msgs_failed = false; + let mut num_handled_intercepted_events = 0; + for ev in intercepted_msgs { + match handler.handle_event(ev) { + Ok(()) => num_handled_intercepted_events += 1, + Err(ReplayEvent ()) => { + handling_intercepted_msgs_failed = true; + break; + } + } + } + + { + let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap(); + pending_intercepted_msgs_events.drain(..num_handled_intercepted_events); + } - let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::>(); - drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events); + if handling_intercepted_msgs_failed { + self.pending_events_processor.store(false, Ordering::Release); + self.event_notifier.notify(); + return; + } + + let mut num_handled_peer_connecteds = 0; + for ev in peer_connecteds { + match handler.handle_event(ev) { + Ok(()) => num_handled_peer_connecteds += 1, + Err(ReplayEvent ()) => { + self.event_notifier.notify(); + break; + } + } + } + + { + let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); + pending_peer_connected_events.drain(..num_handled_peer_connecteds); + pending_peer_connected_events.shrink_to(10); // Limit total heap usage + } self.pending_events_processor.store(false, Ordering::Release); } -- 2.39.5