]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Store `OnionMessenger` events in different `Vec`s
authorMatt Corallo <git@bluematt.me>
Mon, 3 Jun 2024 18:28:45 +0000 (18:28 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 3 Jun 2024 18:28:45 +0000 (18:28 +0000)
In the next commit, `OnionMessenger` events are handled in parallel
using rust async. When we do that, we'll want to handle
`OnionMessageIntercepted` events prior to
`OnionMessagePeerConnected` ones.

While we'd generally prefer to handle all events in the order they
were generated, if we want to handle them in parallel, we don't
want a `OnionMessageIntercepted` event to start being processed,
then handle an `OnionMessagePeerConnected` prior to the first
completing. This could cause us to store a freshly-intercepted
message for a peer in a DB that was just wiped because the peer
is now connected.

This does run the risk of processing a `OnionMessagePeerConnected`
event prior to an `OnionMessageIntercepted` event (because a peer
connected, then disconnected, then we received a message for that
peer all before any events were handled), that is somewhat less
likely and discarding a message in a rare race is better than
leaving a message lying around undelivered.

Thus, here, we store `OnionMessenger` events in separate `Vec`s
which we can pull from in message-type-order.

lightning/src/onion_message/messenger.rs

index 80ce86498e672309d0e8ebe93560adf0fec84be1..9043eccbe74013dfc91b5d58183e46d1f7bf65ca 100644 (file)
@@ -241,7 +241,12 @@ where
        offers_handler: OMH,
        custom_handler: CMH,
        intercept_messages_for_offline_peers: bool,
-       pending_events: Mutex<Vec<Event>>,
+       pending_events: Mutex<PendingEvents>,
+}
+
+struct PendingEvents {
+       intercepted_msgs: Vec<Event>,
+       peer_connecteds: Vec<Event>,
 }
 
 /// [`OnionMessage`]s buffered to be sent.
@@ -963,7 +968,10 @@ where
                        offers_handler,
                        custom_handler,
                        intercept_messages_for_offline_peers,
-                       pending_events: Mutex::new(Vec::new()),
+                       pending_events: Mutex::new(PendingEvents {
+                               intercepted_msgs: Vec::new(),
+                               peer_connecteds: Vec::new(),
+                       }),
                }
        }
 
@@ -1135,18 +1143,16 @@ where
                msgs
        }
 
-       fn enqueue_event(&self, event: Event) {
+       fn enqueue_intercepted_event(&self, event: Event) {
                const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
                let mut pending_events = self.pending_events.lock().unwrap();
-               let total_buffered_bytes: usize = pending_events
-                       .iter()
-                       .map(|ev| ev.serialized_length())
-                       .sum();
+               let total_buffered_bytes: usize =
+                       pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
                if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
                        log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
                        return
                }
-               pending_events.push(event);
+               pending_events.intercepted_msgs.push(event);
        }
 }
 
@@ -1193,7 +1199,20 @@ where
                        }
                }
                let mut events = Vec::new();
-               core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
+               {
+                       let mut pending_events = self.pending_events.lock().unwrap();
+                       #[cfg(debug_assertions)] {
+                               for ev in pending_events.intercepted_msgs.iter() {
+                                       if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
+                               }
+                               for ev in pending_events.peer_connecteds.iter() {
+                                       if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
+                               }
+                       }
+                       core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
+                       events.append(&mut pending_events.peer_connecteds);
+                       pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
+               }
                for ev in events {
                        handler.handle_event(ev);
                }
@@ -1271,7 +1290,7 @@ where
                                                log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
                                        },
                                        _ if self.intercept_messages_for_offline_peers => {
-                                               self.enqueue_event(
+                                               self.enqueue_intercepted_event(
                                                        Event::OnionMessageIntercepted {
                                                                peer_node_id: next_node_id, message: onion_message
                                                        }
@@ -1299,7 +1318,7 @@ where
                                .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
                                .mark_connected();
                        if self.intercept_messages_for_offline_peers {
-                               self.enqueue_event(
+                               self.pending_events.lock().unwrap().peer_connecteds.push(
                                        Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
                                );
                        }