]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Hold sep. Mutexes for pending `intercepted_msgs`/`peer_connected` events
authorElias Rohrer <dev@tnull.de>
Tue, 2 Jul 2024 09:11:40 +0000 (11:11 +0200)
committerElias Rohrer <dev@tnull.de>
Thu, 18 Jul 2024 07:05:43 +0000 (09:05 +0200)
This is a minor refactor that will allow us to access the individual
event queue Mutexes separately, allowing us to drop the locks earlier
when processing them individually.

lightning/src/onion_message/messenger.rs

index 28f1bc79253eec20bda086f2a14850c4ed261b85..75d92610ae597740fbde450c0c6c0b8be6531ae6 100644 (file)
@@ -261,12 +261,8 @@ pub struct OnionMessenger<
        async_payments_handler: APH,
        custom_handler: CMH,
        intercept_messages_for_offline_peers: bool,
-       pending_events: Mutex<PendingEvents>,
-}
-
-struct PendingEvents {
-       intercepted_msgs: Vec<Event>,
-       peer_connecteds: Vec<Event>,
+       pending_intercepted_msgs_events: Mutex<Vec<Event>>,
+       pending_peer_connected_events: Mutex<Vec<Event>>,
 }
 
 /// [`OnionMessage`]s buffered to be sent.
@@ -1095,10 +1091,8 @@ where
                        async_payments_handler,
                        custom_handler,
                        intercept_messages_for_offline_peers,
-                       pending_events: Mutex::new(PendingEvents {
-                               intercepted_msgs: Vec::new(),
-                               peer_connecteds: Vec::new(),
-                       }),
+                       pending_intercepted_msgs_events: Mutex::new(Vec::new()),
+                       pending_peer_connected_events: Mutex::new(Vec::new()),
                }
        }
 
@@ -1316,14 +1310,15 @@ where
 
        fn enqueue_intercepted_event(&self, event: Event) {
                const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
-               let mut pending_events = self.pending_events.lock().unwrap();
-               let total_buffered_bytes: usize =
-                       pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
+               let mut pending_intercepted_msgs_events =
+                       self.pending_intercepted_msgs_events.lock().unwrap();
+               let total_buffered_bytes: usize = pending_intercepted_msgs_events.iter()
+                       .map(|ev| ev.serialized_length()).sum();
                if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
                        log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
                        return
                }
-               pending_events.intercepted_msgs.push(event);
+               pending_intercepted_msgs_events.push(event);
        }
 
        /// Processes any events asynchronously using the given handler.
@@ -1339,9 +1334,12 @@ where
                let mut intercepted_msgs = Vec::new();
                let mut peer_connecteds = Vec::new();
                {
-                       let mut pending_events = self.pending_events.lock().unwrap();
-                       core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
-                       core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
+                       let mut pending_intercepted_msgs_events =
+                               self.pending_intercepted_msgs_events.lock().unwrap();
+                       let mut pending_peer_connected_events =
+                               self.pending_peer_connected_events.lock().unwrap();
+                       core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs);
+                       core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds);
                }
 
                let mut futures = Vec::with_capacity(intercepted_msgs.len());
@@ -1417,18 +1415,19 @@ where
                }
                let mut events = Vec::new();
                {
-                       let mut pending_events = self.pending_events.lock().unwrap();
+                       let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
+                       let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
                        #[cfg(debug_assertions)] {
-                               for ev in pending_events.intercepted_msgs.iter() {
+                               for ev in pending_intercepted_msgs_events.iter() {
                                        if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
                                }
-                               for ev in pending_events.peer_connecteds.iter() {
+                               for ev in pending_peer_connected_events.iter() {
                                        if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
                                }
                        }
-                       core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
-                       events.append(&mut pending_events.peer_connecteds);
-                       pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
+                       core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events);
+                       events.append(&mut pending_peer_connected_events);
+                       pending_peer_connected_events.shrink_to(10); // Limit total heap usage
                }
                for ev in events {
                        handler.handle_event(ev);
@@ -1558,7 +1557,9 @@ where
                                .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
                                .mark_connected();
                        if self.intercept_messages_for_offline_peers {
-                               self.pending_events.lock().unwrap().peer_connecteds.push(
+                               let mut pending_peer_connected_events =
+                                       self.pending_peer_connected_events.lock().unwrap();
+                               pending_peer_connected_events.push(
                                        Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
                                );
                        }