BlindedPath with unannounced introduction node
[rust-lightning] / lightning / src / onion_message / messenger.rs
index 69f51b514a83ef0a95870a55119f624b85dd56ca..d333eb2103c5ca516217518ab144dab2f84b9807 100644 (file)
@@ -16,7 +16,7 @@ use bitcoin::hashes::sha256::Hash as Sha256;
 use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
 
 use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp};
-use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, ReceiveTlvs};
+use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, ReceiveTlvs};
 use crate::blinded_path::utils;
 use crate::events::{Event, EventHandler, EventsProvider};
 use crate::sign::{EntropySource, NodeSigner, Recipient};
@@ -47,6 +47,70 @@ use {
 
 pub(super) const MAX_TIMER_TICKS: usize = 2;
 
+/// A trivial trait which describes any [`OnionMessenger`].
+///
+/// This is not exported to bindings users as general cover traits aren't useful in other
+/// languages.
+pub trait AOnionMessenger {
+       /// A type implementing [`EntropySource`]
+       type EntropySource: EntropySource + ?Sized;
+       /// A type that may be dereferenced to [`Self::EntropySource`]
+       type ES: Deref<Target = Self::EntropySource>;
+       /// A type implementing [`NodeSigner`]
+       type NodeSigner: NodeSigner + ?Sized;
+       /// A type that may be dereferenced to [`Self::NodeSigner`]
+       type NS: Deref<Target = Self::NodeSigner>;
+       /// A type implementing [`Logger`]
+       type Logger: Logger + ?Sized;
+       /// A type that may be dereferenced to [`Self::Logger`]
+       type L: Deref<Target = Self::Logger>;
+       /// A type implementing [`NodeIdLookUp`]
+       type NodeIdLookUp: NodeIdLookUp + ?Sized;
+       /// A type that may be dereferenced to [`Self::NodeIdLookUp`]
+       type NL: Deref<Target = Self::NodeIdLookUp>;
+       /// A type implementing [`MessageRouter`]
+       type MessageRouter: MessageRouter + ?Sized;
+       /// A type that may be dereferenced to [`Self::MessageRouter`]
+       type MR: Deref<Target = Self::MessageRouter>;
+       /// A type implementing [`OffersMessageHandler`]
+       type OffersMessageHandler: OffersMessageHandler + ?Sized;
+       /// A type that may be dereferenced to [`Self::OffersMessageHandler`]
+       type OMH: Deref<Target = Self::OffersMessageHandler>;
+       /// A type implementing [`CustomOnionMessageHandler`]
+       type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized;
+       /// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`]
+       type CMH: Deref<Target = Self::CustomOnionMessageHandler>;
+       /// Returns a reference to the actual [`OnionMessenger`] object.
+       fn get_om(&self) -> &OnionMessenger<Self::ES, Self::NS, Self::L, Self::NL, Self::MR, Self::OMH, Self::CMH>;
+}
+
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> AOnionMessenger
+for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> where
+       ES::Target: EntropySource,
+       NS::Target: NodeSigner,
+       L::Target: Logger,
+       NL::Target: NodeIdLookUp,
+       MR::Target: MessageRouter,
+       OMH::Target: OffersMessageHandler,
+       CMH::Target: CustomOnionMessageHandler,
+{
+       type EntropySource = ES::Target;
+       type ES = ES;
+       type NodeSigner = NS::Target;
+       type NS = NS;
+       type Logger = L::Target;
+       type L = L;
+       type NodeIdLookUp = NL::Target;
+       type NL = NL;
+       type MessageRouter = MR::Target;
+       type MR = MR;
+       type OffersMessageHandler = OMH::Target;
+       type OMH = OMH;
+       type CustomOnionMessageHandler = CMH::Target;
+       type CMH = CMH;
+       fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> { self }
+}
+
 /// A sender, receiver and forwarder of [`OnionMessage`]s.
 ///
 /// # Handling Messages
@@ -71,6 +135,7 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 /// # use bitcoin::hashes::hex::FromHex;
 /// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey, self};
 /// # use lightning::blinded_path::{BlindedPath, EmptyNodeIdLookUp};
+/// # use lightning::blinded_path::message::ForwardNode;
 /// # use lightning::sign::{EntropySource, KeysManager};
 /// # use lightning::ln::peer_handler::IgnoringMessageHandler;
 /// # use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath, OnionMessenger};
@@ -145,8 +210,11 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 ///
 /// // Create a blinded path to yourself, for someone to send an onion message to.
 /// # let your_node_id = hop_node_id1;
-/// let hops = [hop_node_id3, hop_node_id4, your_node_id];
-/// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap();
+/// let hops = [
+///    ForwardNode { node_id: hop_node_id3, short_channel_id: None },
+///    ForwardNode { node_id: hop_node_id4, short_channel_id: None },
+/// ];
+/// let blinded_path = BlindedPath::new_for_message(&hops, your_node_id, &keys_manager, &secp_ctx).unwrap();
 ///
 /// // Send a custom onion message to a blinded path.
 /// let destination = Destination::BlindedPath(blinded_path);
@@ -177,7 +245,12 @@ where
        offers_handler: OMH,
        custom_handler: CMH,
        intercept_messages_for_offline_peers: bool,
-       pending_events: Mutex<Vec<Event>>,
+       pending_events: Mutex<PendingEvents>,
+}
+
+struct PendingEvents {
+       intercepted_msgs: Vec<Event>,
+       peer_connecteds: Vec<Event>,
 }
 
 /// [`OnionMessage`]s buffered to be sent.
@@ -252,22 +325,30 @@ impl OnionMessageRecipient {
 
 /// The `Responder` struct creates an appropriate [`ResponseInstruction`]
 /// for responding to a message.
+#[derive(Clone, Debug, Eq, PartialEq)]
 pub struct Responder {
        /// The path along which a response can be sent.
        reply_path: BlindedPath,
        path_id: Option<[u8; 32]>
 }
 
+impl_writeable_tlv_based!(Responder, {
+       (0, reply_path, required),
+       (2, path_id, option),
+});
+
 impl Responder {
        /// Creates a new [`Responder`] instance with the provided reply path.
-       fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self {
+       pub(super) fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self {
                Responder {
                        reply_path,
                        path_id,
                }
        }
 
-       /// Creates the appropriate [`ResponseInstruction`] for a given response.
+       /// Creates a [`ResponseInstruction::WithoutReplyPath`] for a given response.
+       ///
+       /// Use when the recipient doesn't need to send back a reply to us.
        pub fn respond<T: OnionMessageContents>(self, response: T) -> ResponseInstruction<T> {
                ResponseInstruction::WithoutReplyPath(OnionMessageResponse {
                        message: response,
@@ -275,6 +356,17 @@ impl Responder {
                        path_id: self.path_id,
                })
        }
+
+       /// Creates a [`ResponseInstruction::WithReplyPath`] for a given response.
+       ///
+       /// Use when the recipient needs to send back a reply to us.
+       pub fn respond_with_reply_path<T: OnionMessageContents>(self, response: T) -> ResponseInstruction<T> {
+               ResponseInstruction::WithReplyPath(OnionMessageResponse {
+                       message: response,
+                       reply_path: self.reply_path,
+                       path_id: self.path_id,
+               })
+       }
 }
 
 /// This struct contains the information needed to reply to a received message.
@@ -286,6 +378,9 @@ pub struct OnionMessageResponse<T: OnionMessageContents> {
 
 /// `ResponseInstruction` represents instructions for responding to received messages.
 pub enum ResponseInstruction<T: OnionMessageContents> {
+       /// Indicates that a response should be sent including a reply path for
+       /// the recipient to respond back.
+       WithReplyPath(OnionMessageResponse<T>),
        /// Indicates that a response should be sent without including a reply path
        /// for the recipient to respond back.
        WithoutReplyPath(OnionMessageResponse<T>),
@@ -339,9 +434,41 @@ pub trait MessageRouter {
        >(
                &self, recipient: PublicKey, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
        ) -> Result<Vec<BlindedPath>, ()>;
+
+       /// Creates compact [`BlindedPath`]s to the `recipient` node. The nodes in `peers` are assumed
+       /// to be direct peers with the `recipient`.
+       ///
+       /// Compact blinded paths use short channel ids instead of pubkeys for a smaller serialization,
+       /// which is beneficial when a QR code is used to transport the data. The SCID is passed using a
+       /// [`ForwardNode`] but may be `None` for graceful degradation.
+       ///
+       /// Implementations using additional intermediate nodes are responsible for using a
+       /// [`ForwardNode`] with `Some` short channel id, if possible. Similarly, implementations should
+       /// call [`BlindedPath::use_compact_introduction_node`].
+       ///
+       /// The provided implementation simply delegates to [`MessageRouter::create_blinded_paths`],
+       /// ignoring the short channel ids.
+       fn create_compact_blinded_paths<
+               T: secp256k1::Signing + secp256k1::Verification
+       >(
+               &self, recipient: PublicKey, peers: Vec<ForwardNode>, secp_ctx: &Secp256k1<T>,
+       ) -> Result<Vec<BlindedPath>, ()> {
+               let peers = peers
+                       .into_iter()
+                       .map(|ForwardNode { node_id, short_channel_id: _ }| node_id)
+                       .collect();
+               self.create_blinded_paths(recipient, peers, secp_ctx)
+       }
 }
 
 /// A [`MessageRouter`] that can only route to a directly connected [`Destination`].
+///
+/// # Privacy
+///
+/// Creating [`BlindedPath`]s may affect privacy since, if a suitable path cannot be found, it will
+/// create a one-hop path using the recipient as the introduction node if it is a announced node.
+/// Otherwise, there is no way to find a path to the introduction node in order to send a message,
+/// and thus an `Err` is returned.
 pub struct DefaultMessageRouter<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref>
 where
        L::Target: Logger,
@@ -360,6 +487,73 @@ where
        pub fn new(network_graph: G, entropy_source: ES) -> Self {
                Self { network_graph, entropy_source }
        }
+
+       fn create_blinded_paths_from_iter<
+               I: ExactSizeIterator<Item = ForwardNode>,
+               T: secp256k1::Signing + secp256k1::Verification
+       >(
+               &self, recipient: PublicKey, peers: I, secp_ctx: &Secp256k1<T>, compact_paths: bool
+       ) -> Result<Vec<BlindedPath>, ()> {
+               // Limit the number of blinded paths that are computed.
+               const MAX_PATHS: usize = 3;
+
+               // Ensure peers have at least three channels so that it is more difficult to infer the
+               // recipient's node_id.
+               const MIN_PEER_CHANNELS: usize = 3;
+
+               let network_graph = self.network_graph.deref().read_only();
+               let is_recipient_announced =
+                       network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient));
+
+               let has_one_peer = peers.len() == 1;
+               let mut peer_info = peers
+                       // Limit to peers with announced channels unless the recipient is unannounced.
+                       .filter_map(|peer|
+                               network_graph
+                                       .node(&NodeId::from_pubkey(&peer.node_id))
+                                       .filter(|info| info.channels.len() >= MIN_PEER_CHANNELS)
+                                       .map(|info| (peer, info.is_tor_only(), info.channels.len()))
+                                       // Allow messages directly with the only peer when unannounced.
+                                       .or_else(|| (!is_recipient_announced && has_one_peer)
+                                               .then(|| (peer, false, 0))
+                                       )
+                       )
+                       // Exclude Tor-only nodes when the recipient is announced.
+                       .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced))
+                       .collect::<Vec<_>>();
+
+               // Prefer using non-Tor nodes with the most channels as the introduction node.
+               peer_info.sort_unstable_by(|(_, a_tor_only, a_channels), (_, b_tor_only, b_channels)| {
+                       a_tor_only.cmp(b_tor_only).then(a_channels.cmp(b_channels).reverse())
+               });
+
+               let paths = peer_info.into_iter()
+                       .map(|(peer, _, _)| {
+                               BlindedPath::new_for_message(&[peer], recipient, &*self.entropy_source, secp_ctx)
+                       })
+                       .take(MAX_PATHS)
+                       .collect::<Result<Vec<_>, _>>();
+
+               let mut paths = match paths {
+                       Ok(paths) if !paths.is_empty() => Ok(paths),
+                       _ => {
+                               if is_recipient_announced {
+                                       BlindedPath::one_hop_for_message(recipient, &*self.entropy_source, secp_ctx)
+                                               .map(|path| vec![path])
+                               } else {
+                                       Err(())
+                               }
+                       },
+               }?;
+
+               if compact_paths {
+                       for path in &mut paths {
+                               path.use_compact_introduction_node(&network_graph);
+                       }
+               }
+
+               Ok(paths)
+       }
 }
 
 impl<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter<G, L, ES>
@@ -405,51 +599,18 @@ where
        >(
                &self, recipient: PublicKey, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
        ) -> Result<Vec<BlindedPath>, ()> {
-               // Limit the number of blinded paths that are computed.
-               const MAX_PATHS: usize = 3;
-
-               // Ensure peers have at least three channels so that it is more difficult to infer the
-               // recipient's node_id.
-               const MIN_PEER_CHANNELS: usize = 3;
-
-               let network_graph = self.network_graph.deref().read_only();
-               let is_recipient_announced =
-                       network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient));
-
-               let mut peer_info = peers.iter()
-                       // Limit to peers with announced channels
-                       .filter_map(|pubkey|
-                               network_graph
-                                       .node(&NodeId::from_pubkey(pubkey))
-                                       .filter(|info| info.channels.len() >= MIN_PEER_CHANNELS)
-                                       .map(|info| (*pubkey, info.is_tor_only(), info.channels.len()))
-                       )
-                       // Exclude Tor-only nodes when the recipient is announced.
-                       .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced))
-                       .collect::<Vec<_>>();
-
-               // Prefer using non-Tor nodes with the most channels as the introduction node.
-               peer_info.sort_unstable_by(|(_, a_tor_only, a_channels), (_, b_tor_only, b_channels)| {
-                       a_tor_only.cmp(b_tor_only).then(a_channels.cmp(b_channels).reverse())
-               });
-
-               let paths = peer_info.into_iter()
-                       .map(|(pubkey, _, _)| vec![pubkey, recipient])
-                       .map(|node_pks| BlindedPath::new_for_message(&node_pks, &*self.entropy_source, secp_ctx))
-                       .take(MAX_PATHS)
-                       .collect::<Result<Vec<_>, _>>();
+               let peers = peers
+                       .into_iter()
+                       .map(|node_id| ForwardNode { node_id, short_channel_id: None });
+               self.create_blinded_paths_from_iter(recipient, peers, secp_ctx, false)
+       }
 
-               match paths {
-                       Ok(paths) if !paths.is_empty() => Ok(paths),
-                       _ => {
-                               if is_recipient_announced {
-                                       BlindedPath::one_hop_for_message(recipient, &*self.entropy_source, secp_ctx)
-                                               .map(|path| vec![path])
-                               } else {
-                                       Err(())
-                               }
-                       },
-               }
+       fn create_compact_blinded_paths<
+               T: secp256k1::Signing + secp256k1::Verification
+       >(
+               &self, recipient: PublicKey, peers: Vec<ForwardNode>, secp_ctx: &Secp256k1<T>,
+       ) -> Result<Vec<BlindedPath>, ()> {
+               self.create_blinded_paths_from_iter(recipient, peers.into_iter(), secp_ctx, true)
        }
 }
 
@@ -553,7 +714,11 @@ pub enum SendError {
        TooFewBlindedHops,
        /// The first hop is not a peer and doesn't have a known [`SocketAddress`].
        InvalidFirstHop(PublicKey),
-       /// A path from the sender to the destination could not be found by the [`MessageRouter`].
+       /// Indicates that a path could not be found by the [`MessageRouter`].
+       ///
+       /// This occurs when either:
+       /// - No path from the sender to the destination was found to send the onion message
+       /// - No reply path to the sender could be created when responding to an onion message
        PathNotFound,
        /// Onion message contents must have a TLV type >= 64.
        InvalidMessage,
@@ -898,7 +1063,10 @@ where
                        offers_handler,
                        custom_handler,
                        intercept_messages_for_offline_peers,
-                       pending_events: Mutex::new(Vec::new()),
+                       pending_events: Mutex::new(PendingEvents {
+                               intercepted_msgs: Vec::new(),
+                               peer_connecteds: Vec::new(),
+                       }),
                }
        }
 
@@ -970,6 +1138,24 @@ where
                        .map_err(|_| SendError::PathNotFound)
        }
 
+       fn create_blinded_path(&self) -> Result<BlindedPath, SendError> {
+               let recipient = self.node_signer
+                       .get_node_id(Recipient::Node)
+                       .map_err(|_| SendError::GetNodeIdFailed)?;
+               let secp_ctx = &self.secp_ctx;
+
+               let peers = self.message_recipients.lock().unwrap()
+                       .iter()
+                       .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_)))
+                       .map(|(node_id, _ )| *node_id)
+                       .collect::<Vec<_>>();
+
+               self.message_router
+                       .create_blinded_paths(recipient, peers, secp_ctx)
+                       .and_then(|paths| paths.into_iter().next().ok_or(()))
+                       .map_err(|_| SendError::PathNotFound)
+       }
+
        fn enqueue_onion_message<T: OnionMessageContents>(
                &self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>,
                log_suffix: fmt::Arguments
@@ -1042,20 +1228,48 @@ where
                )
        }
 
-       fn handle_onion_message_response<T: OnionMessageContents>(
+       /// Handles the response to an [`OnionMessage`] based on its [`ResponseInstruction`],
+       /// enqueueing any response for sending.
+       ///
+       /// This function is useful for asynchronous handling of [`OnionMessage`]s.
+       /// Handlers have the option to return [`ResponseInstruction::NoResponse`], indicating that
+       /// no immediate response should be sent. Then, they can transfer the associated [`Responder`]
+       /// to another task responsible for generating the response asynchronously. Subsequently, when
+       /// the response is prepared and ready for sending, that task can invoke this method to enqueue
+       /// the response for delivery.
+       pub fn handle_onion_message_response<T: OnionMessageContents>(
                &self, response: ResponseInstruction<T>
-       ) {
-               if let ResponseInstruction::WithoutReplyPath(response) = response {
-                       let message_type = response.message.msg_type();
-                       let _ = self.find_path_and_enqueue_onion_message(
-                               response.message, Destination::BlindedPath(response.reply_path), None,
-                               format_args!(
-                                       "when responding with {} to an onion message with path_id {:02x?}",
-                                       message_type,
-                                       response.path_id
-                               )
-                       );
-               }
+       ) -> Result<Option<SendSuccess>, SendError> {
+               let (response, create_reply_path) = match response {
+                       ResponseInstruction::WithReplyPath(response) => (response, true),
+                       ResponseInstruction::WithoutReplyPath(response) => (response, false),
+                       ResponseInstruction::NoResponse => return Ok(None),
+               };
+
+               let message_type = response.message.msg_type();
+               let reply_path = if create_reply_path {
+                       match self.create_blinded_path() {
+                               Ok(reply_path) => Some(reply_path),
+                               Err(err) => {
+                                       log_trace!(
+                                               self.logger,
+                                               "Failed to create reply path when responding with {} to an onion message \
+                                               with path_id {:02x?}: {:?}",
+                                               message_type, response.path_id, err
+                                       );
+                                       return Err(err);
+                               }
+                       }
+               } else { None };
+
+               self.find_path_and_enqueue_onion_message(
+                       response.message, Destination::BlindedPath(response.reply_path), reply_path,
+                       format_args!(
+                               "when responding with {} to an onion message with path_id {:02x?}",
+                               message_type,
+                               response.path_id
+                       )
+               ).map(|result| Some(result))
        }
 
        #[cfg(test)]
@@ -1070,18 +1284,61 @@ where
                msgs
        }
 
-       fn enqueue_event(&self, event: Event) {
+       fn enqueue_intercepted_event(&self, event: Event) {
                const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
                let mut pending_events = self.pending_events.lock().unwrap();
-               let total_buffered_bytes: usize = pending_events
-                       .iter()
-                       .map(|ev| ev.serialized_length())
-                       .sum();
+               let total_buffered_bytes: usize =
+                       pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
                if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
                        log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
                        return
                }
-               pending_events.push(event);
+               pending_events.intercepted_msgs.push(event);
+       }
+
+       /// Processes any events asynchronously using the given handler.
+       ///
+       /// Note that the event handler is called in the order each event was generated, however
+       /// futures are polled in parallel for some events to allow for parallelism where events do not
+       /// have an ordering requirement.
+       ///
+       /// See the trait-level documentation of [`EventsProvider`] for requirements.
+       pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
+               &self, handler: H
+       ) {
+               let mut intercepted_msgs = Vec::new();
+               let mut peer_connecteds = Vec::new();
+               {
+                       let mut pending_events = self.pending_events.lock().unwrap();
+                       core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
+                       core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
+               }
+
+               let mut futures = Vec::with_capacity(intercepted_msgs.len());
+               for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
+                       if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
+                               if let Some(addresses) = addresses.take() {
+                                       futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
+                               }
+                       }
+               }
+
+               for ev in intercepted_msgs {
+                       if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
+                       futures.push(Some(handler(ev)));
+               }
+               // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
+               crate::util::async_poll::MultiFuturePoller(futures).await;
+
+               if peer_connecteds.len() <= 1 {
+                       for event in peer_connecteds { handler(event).await; }
+               } else {
+                       let mut futures = Vec::new();
+                       for event in peer_connecteds {
+                               futures.push(Some(handler(event)));
+                       }
+                       crate::util::async_poll::MultiFuturePoller(futures).await;
+               }
        }
 }
 
@@ -1128,7 +1385,20 @@ where
                        }
                }
                let mut events = Vec::new();
-               core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
+               {
+                       let mut pending_events = self.pending_events.lock().unwrap();
+                       #[cfg(debug_assertions)] {
+                               for ev in pending_events.intercepted_msgs.iter() {
+                                       if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
+                               }
+                               for ev in pending_events.peer_connecteds.iter() {
+                                       if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
+                               }
+                       }
+                       core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
+                       events.append(&mut pending_events.peer_connecteds);
+                       pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
+               }
                for ev in events {
                        handler.handle_event(ev);
                }
@@ -1161,14 +1431,14 @@ where
                                                        |reply_path| Responder::new(reply_path, path_id)
                                                );
                                                let response_instructions = self.offers_handler.handle_message(msg, responder);
-                                               self.handle_onion_message_response(response_instructions);
+                                               let _ = self.handle_onion_message_response(response_instructions);
                                        },
                                        ParsedOnionMessageContents::Custom(msg) => {
                                                let responder = reply_path.map(
                                                        |reply_path| Responder::new(reply_path, path_id)
                                                );
                                                let response_instructions = self.custom_handler.handle_custom_message(msg, responder);
-                                               self.handle_onion_message_response(response_instructions);
+                                               let _ = self.handle_onion_message_response(response_instructions);
                                        },
                                }
                        },
@@ -1206,7 +1476,7 @@ where
                                                log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
                                        },
                                        _ if self.intercept_messages_for_offline_peers => {
-                                               self.enqueue_event(
+                                               self.enqueue_intercepted_event(
                                                        Event::OnionMessageIntercepted {
                                                                peer_node_id: next_node_id, message: onion_message
                                                        }
@@ -1234,7 +1504,7 @@ where
                                .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
                                .mark_connected();
                        if self.intercept_messages_for_offline_peers {
-                               self.enqueue_event(
+                               self.pending_events.lock().unwrap().peer_connecteds.push(
                                        Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
                                );
                        }