From b5b57f188f2a65b2dbdc49616463437abbb8e073 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 2 Jul 2024 11:11:40 +0200 Subject: [PATCH] Hold sep. Mutexes for pending `intercepted_msgs`/`peer_connected` events This is a minor refactor that will allow us to access the individual event queue Mutexes separately, allowing us to drop the locks earlier when processing them individually. --- lightning/src/onion_message/messenger.rs | 49 ++++++++++++------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 28f1bc792..75d92610a 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -261,12 +261,8 @@ pub struct OnionMessenger< async_payments_handler: APH, custom_handler: CMH, intercept_messages_for_offline_peers: bool, - pending_events: Mutex, -} - -struct PendingEvents { - intercepted_msgs: Vec, - peer_connecteds: Vec, + pending_intercepted_msgs_events: Mutex>, + pending_peer_connected_events: Mutex>, } /// [`OnionMessage`]s buffered to be sent. @@ -1095,10 +1091,8 @@ where async_payments_handler, custom_handler, intercept_messages_for_offline_peers, - pending_events: Mutex::new(PendingEvents { - intercepted_msgs: Vec::new(), - peer_connecteds: Vec::new(), - }), + pending_intercepted_msgs_events: Mutex::new(Vec::new()), + pending_peer_connected_events: Mutex::new(Vec::new()), } } @@ -1316,14 +1310,15 @@ where fn enqueue_intercepted_event(&self, event: Event) { const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256; - let mut pending_events = self.pending_events.lock().unwrap(); - let total_buffered_bytes: usize = - pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum(); + let mut pending_intercepted_msgs_events = + self.pending_intercepted_msgs_events.lock().unwrap(); + let total_buffered_bytes: usize = pending_intercepted_msgs_events.iter() + .map(|ev| ev.serialized_length()).sum(); if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE { log_trace!(self.logger, "Dropping event {:?}: buffer full", event); return } - pending_events.intercepted_msgs.push(event); + pending_intercepted_msgs_events.push(event); } /// Processes any events asynchronously using the given handler. @@ -1339,9 +1334,12 @@ where let mut intercepted_msgs = Vec::new(); let mut peer_connecteds = Vec::new(); { - let mut pending_events = self.pending_events.lock().unwrap(); - core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs); - core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds); + let mut pending_intercepted_msgs_events = + self.pending_intercepted_msgs_events.lock().unwrap(); + let mut pending_peer_connected_events = + self.pending_peer_connected_events.lock().unwrap(); + core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs); + core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds); } let mut futures = Vec::with_capacity(intercepted_msgs.len()); @@ -1417,18 +1415,19 @@ where } let mut events = Vec::new(); { - let mut pending_events = self.pending_events.lock().unwrap(); + let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap(); + let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); #[cfg(debug_assertions)] { - for ev in pending_events.intercepted_msgs.iter() { + for ev in pending_intercepted_msgs_events.iter() { if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); } } - for ev in pending_events.peer_connecteds.iter() { + for ev in pending_peer_connected_events.iter() { if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } } } - core::mem::swap(&mut pending_events.intercepted_msgs, &mut events); - events.append(&mut pending_events.peer_connecteds); - pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage + core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events); + events.append(&mut pending_peer_connected_events); + pending_peer_connected_events.shrink_to(10); // Limit total heap usage } for ev in events { handler.handle_event(ev); @@ -1558,7 +1557,9 @@ where .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); if self.intercept_messages_for_offline_peers { - self.pending_events.lock().unwrap().peer_connecteds.push( + let mut pending_peer_connected_events = + self.pending_peer_connected_events.lock().unwrap(); + pending_peer_connected_events.push( Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } ); } -- 2.39.5