From: Matt Corallo Date: Mon, 3 Jun 2024 18:28:45 +0000 (+0000) Subject: Store `OnionMessenger` events in different `Vec`s X-Git-Tag: v0.0.124-beta~100^2~4 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=9ef61748e419e3e993784ee1c7f785ed434f5c67;p=rust-lightning Store `OnionMessenger` events in different `Vec`s In the next commit, `OnionMessenger` events are handled in parallel using rust async. When we do that, we'll want to handle `OnionMessageIntercepted` events prior to `OnionMessagePeerConnected` ones. While we'd generally prefer to handle all events in the order they were generated, if we want to handle them in parallel, we don't want a `OnionMessageIntercepted` event to start being processed, then handle an `OnionMessagePeerConnected` prior to the first completing. This could cause us to store a freshly-intercepted message for a peer in a DB that was just wiped because the peer is now connected. This does run the risk of processing a `OnionMessagePeerConnected` event prior to an `OnionMessageIntercepted` event (because a peer connected, then disconnected, then we received a message for that peer all before any events were handled), that is somewhat less likely and discarding a message in a rare race is better than leaving a message lying around undelivered. Thus, here, we store `OnionMessenger` events in separate `Vec`s which we can pull from in message-type-order. --- diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 80ce86498..9043eccbe 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -241,7 +241,12 @@ where offers_handler: OMH, custom_handler: CMH, intercept_messages_for_offline_peers: bool, - pending_events: Mutex>, + pending_events: Mutex, +} + +struct PendingEvents { + intercepted_msgs: Vec, + peer_connecteds: Vec, } /// [`OnionMessage`]s buffered to be sent. @@ -963,7 +968,10 @@ where offers_handler, custom_handler, intercept_messages_for_offline_peers, - pending_events: Mutex::new(Vec::new()), + pending_events: Mutex::new(PendingEvents { + intercepted_msgs: Vec::new(), + peer_connecteds: Vec::new(), + }), } } @@ -1135,18 +1143,16 @@ where msgs } - fn enqueue_event(&self, event: Event) { + fn enqueue_intercepted_event(&self, event: Event) { const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256; let mut pending_events = self.pending_events.lock().unwrap(); - let total_buffered_bytes: usize = pending_events - .iter() - .map(|ev| ev.serialized_length()) - .sum(); + let total_buffered_bytes: usize = + pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum(); if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE { log_trace!(self.logger, "Dropping event {:?}: buffer full", event); return } - pending_events.push(event); + pending_events.intercepted_msgs.push(event); } } @@ -1193,7 +1199,20 @@ where } } let mut events = Vec::new(); - core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events); + { + let mut pending_events = self.pending_events.lock().unwrap(); + #[cfg(debug_assertions)] { + for ev in pending_events.intercepted_msgs.iter() { + if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); } + } + for ev in pending_events.peer_connecteds.iter() { + if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } + } + } + core::mem::swap(&mut pending_events.intercepted_msgs, &mut events); + events.append(&mut pending_events.peer_connecteds); + pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage + } for ev in events { handler.handle_event(ev); } @@ -1271,7 +1290,7 @@ where log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id); }, _ if self.intercept_messages_for_offline_peers => { - self.enqueue_event( + self.enqueue_intercepted_event( Event::OnionMessageIntercepted { peer_node_id: next_node_id, message: onion_message } @@ -1299,7 +1318,7 @@ where .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); if self.intercept_messages_for_offline_peers { - self.enqueue_event( + self.pending_events.lock().unwrap().peer_connecteds.push( Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } ); }