}
}
-macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
+macro_rules! drop_handled_events_and_abort { ($self: expr, $res_iter: expr, $event_queue: expr) => {
// We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
// successfully handled events from the given queue, reset the events processing flag, and
// return, to have the events eventually replayed upon next invocation.
{
let mut queue_lock = $event_queue.lock().unwrap();
- // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
- let mut res_iter = $res.iter().skip($offset);
-
// Keep all events which previously error'd *or* any that have been added since we dropped
// the Mutex before.
- queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
+ let mut any_error = false;
+ queue_lock.retain(|_| {
+ $res_iter.next().map_or(true, |r| {
+ let is_err = r.is_err();
+ any_error |= is_err;
+ is_err
+ })
+ });
- if $res.iter().any(|r| r.is_err()) {
+ if any_error {
// We failed handling some events. Return to have them eventually replayed.
$self.pending_events_processor.store(false, Ordering::Release);
$self.event_notifier.notify();
}
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
let res = MultiResultFuturePoller::new(futures).await;
- drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
+ let mut res_iter = res.iter().skip(intercepted_msgs_offset);
+ drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events);
}
{
futures.push(future);
}
let res = MultiResultFuturePoller::new(futures).await;
- drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
+ let mut res_iter = res.iter();
+ drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events);
}
}
self.pending_events_processor.store(false, Ordering::Release);
{
let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
intercepted_msgs = pending_intercepted_msgs_events.clone();
- let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
+ let pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
peer_connecteds = pending_peer_connected_events.clone();
#[cfg(debug_assertions)] {
for ev in pending_intercepted_msgs_events.iter() {
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
}
}
- pending_peer_connected_events.shrink_to(10); // Limit total heap usage
}
- let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
- drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
+ let mut handling_intercepted_msgs_failed = false;
+ let mut num_handled_intercepted_events = 0;
+ for ev in intercepted_msgs {
+ match handler.handle_event(ev) {
+ Ok(()) => num_handled_intercepted_events += 1,
+ Err(ReplayEvent ()) => {
+ handling_intercepted_msgs_failed = true;
+ break;
+ }
+ }
+ }
+
+ {
+ let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
+ pending_intercepted_msgs_events.drain(..num_handled_intercepted_events);
+ }
- let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
- drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
+ if handling_intercepted_msgs_failed {
+ self.pending_events_processor.store(false, Ordering::Release);
+ self.event_notifier.notify();
+ return;
+ }
+
+ let mut num_handled_peer_connecteds = 0;
+ for ev in peer_connecteds {
+ match handler.handle_event(ev) {
+ Ok(()) => num_handled_peer_connecteds += 1,
+ Err(ReplayEvent ()) => {
+ self.event_notifier.notify();
+ break;
+ }
+ }
+ }
+
+ {
+ let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
+ pending_peer_connected_events.drain(..num_handled_peer_connecteds);
+ pending_peer_connected_events.shrink_to(10); // Limit total heap usage
+ }
self.pending_events_processor.store(false, Ordering::Release);
}