offers_handler: OMH,
custom_handler: CMH,
intercept_messages_for_offline_peers: bool,
- pending_events: Mutex<Vec<Event>>,
+ pending_events: Mutex<PendingEvents>,
+}
+
+struct PendingEvents {
+ intercepted_msgs: Vec<Event>,
+ peer_connecteds: Vec<Event>,
}
/// [`OnionMessage`]s buffered to be sent.
offers_handler,
custom_handler,
intercept_messages_for_offline_peers,
- pending_events: Mutex::new(Vec::new()),
+ pending_events: Mutex::new(PendingEvents {
+ intercepted_msgs: Vec::new(),
+ peer_connecteds: Vec::new(),
+ }),
}
}
msgs
}
- fn enqueue_event(&self, event: Event) {
+ fn enqueue_intercepted_event(&self, event: Event) {
const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
let mut pending_events = self.pending_events.lock().unwrap();
- let total_buffered_bytes: usize = pending_events
- .iter()
- .map(|ev| ev.serialized_length())
- .sum();
+ let total_buffered_bytes: usize =
+ pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
return
}
- pending_events.push(event);
+ pending_events.intercepted_msgs.push(event);
+ }
+
+ /// Processes any events asynchronously using the given handler.
+ ///
+ /// Note that the event handler is called in the order each event was generated, however
+ /// futures are polled in parallel for some events to allow for parallelism where events do not
+ /// have an ordering requirement.
+ ///
+ /// See the trait-level documentation of [`EventsProvider`] for requirements.
+ pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
+ &self, handler: H
+ ) {
+ let mut intercepted_msgs = Vec::new();
+ let mut peer_connecteds = Vec::new();
+ {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
+ core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
+ }
+
+ let mut futures = Vec::with_capacity(intercepted_msgs.len());
+ for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
+ if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
+ if let Some(addresses) = addresses.take() {
+ futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
+ }
+ }
+ }
+
+ for ev in intercepted_msgs {
+ if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
+ futures.push(Some(handler(ev)));
+ }
+ // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
+ crate::util::async_poll::MultiFuturePoller(futures).await;
+
+ if peer_connecteds.len() <= 1 {
+ for event in peer_connecteds { handler(event).await; }
+ } else {
+ let mut futures = Vec::new();
+ for event in peer_connecteds {
+ futures.push(Some(handler(event)));
+ }
+ crate::util::async_poll::MultiFuturePoller(futures).await;
+ }
}
}
}
}
let mut events = Vec::new();
- core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
+ {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ #[cfg(debug_assertions)] {
+ for ev in pending_events.intercepted_msgs.iter() {
+ if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
+ }
+ for ev in pending_events.peer_connecteds.iter() {
+ if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
+ }
+ }
+ core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
+ events.append(&mut pending_events.peer_connecteds);
+ pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
+ }
for ev in events {
handler.handle_event(ev);
}
log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
},
_ if self.intercept_messages_for_offline_peers => {
- self.enqueue_event(
+ self.enqueue_intercepted_event(
Event::OnionMessageIntercepted {
peer_node_id: next_node_id, message: onion_message
}
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
.mark_connected();
if self.intercept_messages_for_offline_peers {
- self.enqueue_event(
+ self.pending_events.lock().unwrap().peer_connecteds.push(
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
);
}