use core::fmt;
use core::ops::Deref;
+use core::sync::atomic::{AtomicBool, Ordering};
use crate::io;
use crate::sync::Mutex;
use crate::prelude::*;
intercept_messages_for_offline_peers: bool,
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
pending_peer_connected_events: Mutex<Vec<Event>>,
+ pending_events_processor: AtomicBool,
}
/// [`OnionMessage`]s buffered to be sent.
}
}
+macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
+ // We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
+ // successfully handled events from the given queue, reset the events processing flag, and
+ // return, to have the events eventually replayed upon next invocation.
+ {
+ let mut queue_lock = $event_queue.lock().unwrap();
+
+ // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
+ let mut res_iter = $res.iter().skip($offset);
+
+ // Keep all events which previously error'd *or* any that have been added since we dropped
+ // the Mutex before.
+ queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
+
+ if $res.iter().any(|r| r.is_err()) {
+ // We failed handling some events. Return to have them eventually replayed.
+ $self.pending_events_processor.store(false, Ordering::Release);
+ return;
+ }
+ }
+}}
+
impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref>
OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
where
intercept_messages_for_offline_peers,
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
pending_peer_connected_events: Mutex::new(Vec::new()),
+ pending_events_processor: AtomicBool::new(false),
}
}
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
&self, handler: H
) {
- let mut intercepted_msgs = Vec::new();
- let mut peer_connecteds = Vec::new();
- {
- let mut pending_intercepted_msgs_events =
- self.pending_intercepted_msgs_events.lock().unwrap();
- let mut pending_peer_connected_events =
- self.pending_peer_connected_events.lock().unwrap();
- core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs);
- core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds);
+ if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
+ return;
}
- let mut futures = Vec::with_capacity(intercepted_msgs.len());
- for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
- if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
- if let Some(addresses) = addresses.take() {
- let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
- futures.push(future);
+ {
+ let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone();
+ let mut futures = Vec::with_capacity(intercepted_msgs.len());
+ for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
+ if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
+ if let Some(addresses) = addresses.take() {
+ let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
+ futures.push(future);
+ }
}
}
- }
- for ev in intercepted_msgs {
- if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
- let future = ResultFuture::Pending(handler(ev));
- futures.push(future);
- }
- // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
- MultiResultFuturePoller::new(futures).await;
+ // The offset in the `futures` vec at which `intercepted_msgs` start. We don't bother
+ // replaying `ConnectionNeeded` events.
+ let intercepted_msgs_offset = futures.len();
- if peer_connecteds.len() <= 1 {
- for event in peer_connecteds { handler(event).await; }
- } else {
- let mut futures = Vec::new();
- for event in peer_connecteds {
- let future = ResultFuture::Pending(handler(event));
+ for ev in intercepted_msgs {
+ if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
+ let future = ResultFuture::Pending(handler(ev));
futures.push(future);
}
- MultiResultFuturePoller::new(futures).await;
+ // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
+ let res = MultiResultFuturePoller::new(futures).await;
+ drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
}
+
+ {
+ let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone();
+ let num_peer_connecteds = peer_connecteds.len();
+ if num_peer_connecteds <= 1 {
+ for event in peer_connecteds {
+ if handler(event).await.is_ok() {
+ self.pending_peer_connected_events.lock().unwrap().drain(..num_peer_connecteds);
+ } else {
+ // We failed handling the event. Return to have it eventually replayed.
+ self.pending_events_processor.store(false, Ordering::Release);
+ return;
+ }
+ }
+ } else {
+ let mut futures = Vec::new();
+ for event in peer_connecteds {
+ let future = ResultFuture::Pending(handler(event));
+ futures.push(future);
+ }
+ let res = MultiResultFuturePoller::new(futures).await;
+ drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
+ }
+ }
+ self.pending_events_processor.store(false, Ordering::Release);
}
}
CMH::Target: CustomOnionMessageHandler,
{
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
+ if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
+ return;
+ }
+
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
if let Some(addresses) = addresses.take() {
}
}
}
- let mut events = Vec::new();
+ let intercepted_msgs;
+ let peer_connecteds;
{
- let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
+ let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
+ intercepted_msgs = pending_intercepted_msgs_events.clone();
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
+ peer_connecteds = pending_peer_connected_events.clone();
#[cfg(debug_assertions)] {
for ev in pending_intercepted_msgs_events.iter() {
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
}
}
- core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events);
- events.append(&mut pending_peer_connected_events);
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
}
- for ev in events {
- handler.handle_event(ev);
- }
+
+ let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
+ drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
+
+ let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
+ drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
+
+ self.pending_events_processor.store(false, Ordering::Release);
}
}