async_payments_handler: APH,
custom_handler: CMH,
intercept_messages_for_offline_peers: bool,
- pending_events: Mutex<PendingEvents>,
-}
-
-struct PendingEvents {
- intercepted_msgs: Vec<Event>,
- peer_connecteds: Vec<Event>,
+ pending_intercepted_msgs_events: Mutex<Vec<Event>>,
+ pending_peer_connected_events: Mutex<Vec<Event>>,
}
/// [`OnionMessage`]s buffered to be sent.
async_payments_handler,
custom_handler,
intercept_messages_for_offline_peers,
- pending_events: Mutex::new(PendingEvents {
- intercepted_msgs: Vec::new(),
- peer_connecteds: Vec::new(),
- }),
+ pending_intercepted_msgs_events: Mutex::new(Vec::new()),
+ pending_peer_connected_events: Mutex::new(Vec::new()),
}
}
fn enqueue_intercepted_event(&self, event: Event) {
const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
- let mut pending_events = self.pending_events.lock().unwrap();
- let total_buffered_bytes: usize =
- pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
+ let mut pending_intercepted_msgs_events =
+ self.pending_intercepted_msgs_events.lock().unwrap();
+ let total_buffered_bytes: usize = pending_intercepted_msgs_events.iter()
+ .map(|ev| ev.serialized_length()).sum();
if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
return
}
- pending_events.intercepted_msgs.push(event);
+ pending_intercepted_msgs_events.push(event);
}
/// Processes any events asynchronously using the given handler.
let mut intercepted_msgs = Vec::new();
let mut peer_connecteds = Vec::new();
{
- let mut pending_events = self.pending_events.lock().unwrap();
- core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
- core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
+ let mut pending_intercepted_msgs_events =
+ self.pending_intercepted_msgs_events.lock().unwrap();
+ let mut pending_peer_connected_events =
+ self.pending_peer_connected_events.lock().unwrap();
+ core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs);
+ core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds);
}
let mut futures = Vec::with_capacity(intercepted_msgs.len());
}
let mut events = Vec::new();
{
- let mut pending_events = self.pending_events.lock().unwrap();
+ let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
+ let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
#[cfg(debug_assertions)] {
- for ev in pending_events.intercepted_msgs.iter() {
+ for ev in pending_intercepted_msgs_events.iter() {
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
}
- for ev in pending_events.peer_connecteds.iter() {
+ for ev in pending_peer_connected_events.iter() {
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
}
}
- core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
- events.append(&mut pending_events.peer_connecteds);
- pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
+ core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events);
+ events.append(&mut pending_peer_connected_events);
+ pending_peer_connected_events.shrink_to(10); // Limit total heap usage
}
for ev in events {
handler.handle_event(ev);
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
.mark_connected();
if self.intercept_messages_for_offline_peers {
- self.pending_events.lock().unwrap().peer_connecteds.push(
+ let mut pending_peer_connected_events =
+ self.pending_peer_connected_events.lock().unwrap();
+ pending_peer_connected_events.push(
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
);
}