Implement AsyncPaymentsMessageHandler for ChanMan and use everywhere
[rust-lightning] / lightning / src / onion_message / messenger.rs
index aa5f658949254fd98dd8de656c11eefba93c56d3..21012bd7fed2f471c4601b6047feb17cc3107c8a 100644 (file)
@@ -24,6 +24,9 @@ use crate::ln::features::{InitFeatures, NodeFeatures};
 use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress};
 use crate::ln::onion_utils;
 use crate::routing::gossip::{NetworkGraph, NodeId, ReadOnlyNetworkGraph};
+use super::async_payments::AsyncPaymentsMessageHandler;
+#[cfg(async_payments)]
+use super::async_payments::AsyncPaymentsMessage;
 use super::packet::OnionMessageContents;
 use super::packet::ParsedOnionMessageContents;
 use super::offers::OffersMessageHandler;
@@ -47,6 +50,77 @@ use {
 
 pub(super) const MAX_TIMER_TICKS: usize = 2;
 
+/// A trivial trait which describes any [`OnionMessenger`].
+///
+/// This is not exported to bindings users as general cover traits aren't useful in other
+/// languages.
+pub trait AOnionMessenger {
+       /// A type implementing [`EntropySource`]
+       type EntropySource: EntropySource + ?Sized;
+       /// A type that may be dereferenced to [`Self::EntropySource`]
+       type ES: Deref<Target = Self::EntropySource>;
+       /// A type implementing [`NodeSigner`]
+       type NodeSigner: NodeSigner + ?Sized;
+       /// A type that may be dereferenced to [`Self::NodeSigner`]
+       type NS: Deref<Target = Self::NodeSigner>;
+       /// A type implementing [`Logger`]
+       type Logger: Logger + ?Sized;
+       /// A type that may be dereferenced to [`Self::Logger`]
+       type L: Deref<Target = Self::Logger>;
+       /// A type implementing [`NodeIdLookUp`]
+       type NodeIdLookUp: NodeIdLookUp + ?Sized;
+       /// A type that may be dereferenced to [`Self::NodeIdLookUp`]
+       type NL: Deref<Target = Self::NodeIdLookUp>;
+       /// A type implementing [`MessageRouter`]
+       type MessageRouter: MessageRouter + ?Sized;
+       /// A type that may be dereferenced to [`Self::MessageRouter`]
+       type MR: Deref<Target = Self::MessageRouter>;
+       /// A type implementing [`OffersMessageHandler`]
+       type OffersMessageHandler: OffersMessageHandler + ?Sized;
+       /// A type that may be dereferenced to [`Self::OffersMessageHandler`]
+       type OMH: Deref<Target = Self::OffersMessageHandler>;
+       /// A type implementing [`AsyncPaymentsMessageHandler`]
+       type AsyncPaymentsMessageHandler: AsyncPaymentsMessageHandler + ?Sized;
+       /// A type that may be dereferenced to [`Self::AsyncPaymentsMessageHandler`]
+       type APH: Deref<Target = Self::AsyncPaymentsMessageHandler>;
+       /// A type implementing [`CustomOnionMessageHandler`]
+       type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized;
+       /// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`]
+       type CMH: Deref<Target = Self::CustomOnionMessageHandler>;
+       /// Returns a reference to the actual [`OnionMessenger`] object.
+       fn get_om(&self) -> &OnionMessenger<Self::ES, Self::NS, Self::L, Self::NL, Self::MR, Self::OMH, Self::APH, Self::CMH>;
+}
+
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref> AOnionMessenger
+for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH> where
+       ES::Target: EntropySource,
+       NS::Target: NodeSigner,
+       L::Target: Logger,
+       NL::Target: NodeIdLookUp,
+       MR::Target: MessageRouter,
+       OMH::Target: OffersMessageHandler,
+       APH:: Target: AsyncPaymentsMessageHandler,
+       CMH::Target: CustomOnionMessageHandler,
+{
+       type EntropySource = ES::Target;
+       type ES = ES;
+       type NodeSigner = NS::Target;
+       type NS = NS;
+       type Logger = L::Target;
+       type L = L;
+       type NodeIdLookUp = NL::Target;
+       type NL = NL;
+       type MessageRouter = MR::Target;
+       type MR = MR;
+       type OffersMessageHandler = OMH::Target;
+       type OMH = OMH;
+       type AsyncPaymentsMessageHandler = APH::Target;
+       type APH = APH;
+       type CustomOnionMessageHandler = CMH::Target;
+       type CMH = CMH;
+       fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH> { self }
+}
+
 /// A sender, receiver and forwarder of [`OnionMessage`]s.
 ///
 /// # Handling Messages
@@ -98,7 +172,7 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 /// #         })
 /// #     }
 /// #     fn create_blinded_paths<T: secp256k1::Signing + secp256k1::Verification>(
-/// #         &self, _recipient: PublicKey, _peers: Vec<ForwardNode>, _secp_ctx: &Secp256k1<T>
+/// #         &self, _recipient: PublicKey, _peers: Vec<PublicKey>, _secp_ctx: &Secp256k1<T>
 /// #     ) -> Result<Vec<BlindedPath>, ()> {
 /// #         unreachable!()
 /// #     }
@@ -116,11 +190,12 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 /// # let message_router = Arc::new(FakeMessageRouter {});
 /// # let custom_message_handler = IgnoringMessageHandler {};
 /// # let offers_message_handler = IgnoringMessageHandler {};
+/// # let async_payments_message_handler = IgnoringMessageHandler {};
 /// // Create the onion messenger. This must use the same `keys_manager` as is passed to your
 /// // ChannelManager.
 /// let onion_messenger = OnionMessenger::new(
 ///     &keys_manager, &keys_manager, logger, &node_id_lookup, message_router,
-///     &offers_message_handler, &custom_message_handler
+///     &offers_message_handler, &async_payments_message_handler, &custom_message_handler
 /// );
 
 /// # #[derive(Debug)]
@@ -161,14 +236,16 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 ///
 /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
 /// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice
-pub struct OnionMessenger<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref>
-where
+pub struct OnionMessenger<
+       ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref
+> where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
        L::Target: Logger,
        NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
+       APH::Target: AsyncPaymentsMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
 {
        entropy_source: ES,
@@ -179,9 +256,16 @@ where
        node_id_lookup: NL,
        message_router: MR,
        offers_handler: OMH,
+       #[allow(unused)]
+       async_payments_handler: APH,
        custom_handler: CMH,
        intercept_messages_for_offline_peers: bool,
-       pending_events: Mutex<Vec<Event>>,
+       pending_events: Mutex<PendingEvents>,
+}
+
+struct PendingEvents {
+       intercepted_msgs: Vec<Event>,
+       peer_connecteds: Vec<Event>,
 }
 
 /// [`OnionMessage`]s buffered to be sent.
@@ -256,12 +340,18 @@ impl OnionMessageRecipient {
 
 /// The `Responder` struct creates an appropriate [`ResponseInstruction`]
 /// for responding to a message.
+#[derive(Clone, Debug, Eq, PartialEq)]
 pub struct Responder {
        /// The path along which a response can be sent.
        reply_path: BlindedPath,
        path_id: Option<[u8; 32]>
 }
 
+impl_writeable_tlv_based!(Responder, {
+       (0, reply_path, required),
+       (2, path_id, option),
+});
+
 impl Responder {
        /// Creates a new [`Responder`] instance with the provided reply path.
        pub(super) fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self {
@@ -357,11 +447,43 @@ pub trait MessageRouter {
        fn create_blinded_paths<
                T: secp256k1::Signing + secp256k1::Verification
        >(
-               &self, recipient: PublicKey, peers: Vec<ForwardNode>, secp_ctx: &Secp256k1<T>,
+               &self, recipient: PublicKey, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
        ) -> Result<Vec<BlindedPath>, ()>;
+
+       /// Creates compact [`BlindedPath`]s to the `recipient` node. The nodes in `peers` are assumed
+       /// to be direct peers with the `recipient`.
+       ///
+       /// Compact blinded paths use short channel ids instead of pubkeys for a smaller serialization,
+       /// which is beneficial when a QR code is used to transport the data. The SCID is passed using a
+       /// [`ForwardNode`] but may be `None` for graceful degradation.
+       ///
+       /// Implementations using additional intermediate nodes are responsible for using a
+       /// [`ForwardNode`] with `Some` short channel id, if possible. Similarly, implementations should
+       /// call [`BlindedPath::use_compact_introduction_node`].
+       ///
+       /// The provided implementation simply delegates to [`MessageRouter::create_blinded_paths`],
+       /// ignoring the short channel ids.
+       fn create_compact_blinded_paths<
+               T: secp256k1::Signing + secp256k1::Verification
+       >(
+               &self, recipient: PublicKey, peers: Vec<ForwardNode>, secp_ctx: &Secp256k1<T>,
+       ) -> Result<Vec<BlindedPath>, ()> {
+               let peers = peers
+                       .into_iter()
+                       .map(|ForwardNode { node_id, short_channel_id: _ }| node_id)
+                       .collect();
+               self.create_blinded_paths(recipient, peers, secp_ctx)
+       }
 }
 
 /// A [`MessageRouter`] that can only route to a directly connected [`Destination`].
+///
+/// # Privacy
+///
+/// Creating [`BlindedPath`]s may affect privacy since, if a suitable path cannot be found, it will
+/// create a one-hop path using the recipient as the introduction node if it is a announced node.
+/// Otherwise, there is no way to find a path to the introduction node in order to send a message,
+/// and thus an `Err` is returned.
 pub struct DefaultMessageRouter<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref>
 where
        L::Target: Logger,
@@ -380,51 +502,12 @@ where
        pub fn new(network_graph: G, entropy_source: ES) -> Self {
                Self { network_graph, entropy_source }
        }
-}
 
-impl<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter<G, L, ES>
-where
-       L::Target: Logger,
-       ES::Target: EntropySource,
-{
-       fn find_path(
-               &self, sender: PublicKey, peers: Vec<PublicKey>, mut destination: Destination
-       ) -> Result<OnionMessagePath, ()> {
-               let network_graph = self.network_graph.deref().read_only();
-               destination.resolve(&network_graph);
-
-               let first_node = match destination.first_node() {
-                       Some(first_node) => first_node,
-                       None => return Err(()),
-               };
-
-               if peers.contains(&first_node) || sender == first_node {
-                       Ok(OnionMessagePath {
-                               intermediate_nodes: vec![], destination, first_node_addresses: None
-                       })
-               } else {
-                       let node_announcement = network_graph
-                               .node(&NodeId::from_pubkey(&first_node))
-                               .and_then(|node_info| node_info.announcement_info.as_ref())
-                               .and_then(|announcement_info| announcement_info.announcement_message.as_ref())
-                               .map(|node_announcement| &node_announcement.contents);
-
-                       match node_announcement {
-                               Some(node_announcement) if node_announcement.features.supports_onion_messages() => {
-                                       let first_node_addresses = Some(node_announcement.addresses.clone());
-                                       Ok(OnionMessagePath {
-                                               intermediate_nodes: vec![], destination, first_node_addresses
-                                       })
-                               },
-                               _ => Err(()),
-                       }
-               }
-       }
-
-       fn create_blinded_paths<
+       fn create_blinded_paths_from_iter<
+               I: ExactSizeIterator<Item = ForwardNode>,
                T: secp256k1::Signing + secp256k1::Verification
        >(
-               &self, recipient: PublicKey, peers: Vec<ForwardNode>, secp_ctx: &Secp256k1<T>,
+               &self, recipient: PublicKey, peers: I, secp_ctx: &Secp256k1<T>, compact_paths: bool
        ) -> Result<Vec<BlindedPath>, ()> {
                // Limit the number of blinded paths that are computed.
                const MAX_PATHS: usize = 3;
@@ -437,13 +520,20 @@ where
                let is_recipient_announced =
                        network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient));
 
-               let mut peer_info = peers.into_iter()
-                       // Limit to peers with announced channels
+               let has_one_peer = peers.len() == 1;
+               let mut peer_info = peers
+                       // Limit to peers with announced channels unless the recipient is unannounced.
                        .filter_map(|peer|
                                network_graph
                                        .node(&NodeId::from_pubkey(&peer.node_id))
-                                       .filter(|info| info.channels.len() >= MIN_PEER_CHANNELS)
+                                       .filter(|info|
+                                               !is_recipient_announced || info.channels.len() >= MIN_PEER_CHANNELS
+                                       )
                                        .map(|info| (peer, info.is_tor_only(), info.channels.len()))
+                                       // Allow messages directly with the only peer when unannounced.
+                                       .or_else(|| (!is_recipient_announced && has_one_peer)
+                                               .then(|| (peer, false, 0))
+                                       )
                        )
                        // Exclude Tor-only nodes when the recipient is announced.
                        .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced))
@@ -472,14 +562,75 @@ where
                                }
                        },
                }?;
-               for path in &mut paths {
-                       path.use_compact_introduction_node(&network_graph);
+
+               if compact_paths {
+                       for path in &mut paths {
+                               path.use_compact_introduction_node(&network_graph);
+                       }
                }
 
                Ok(paths)
        }
 }
 
+impl<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter<G, L, ES>
+where
+       L::Target: Logger,
+       ES::Target: EntropySource,
+{
+       fn find_path(
+               &self, sender: PublicKey, peers: Vec<PublicKey>, mut destination: Destination
+       ) -> Result<OnionMessagePath, ()> {
+               let network_graph = self.network_graph.deref().read_only();
+               destination.resolve(&network_graph);
+
+               let first_node = match destination.first_node() {
+                       Some(first_node) => first_node,
+                       None => return Err(()),
+               };
+
+               if peers.contains(&first_node) || sender == first_node {
+                       Ok(OnionMessagePath {
+                               intermediate_nodes: vec![], destination, first_node_addresses: None
+                       })
+               } else {
+                       let node_details = network_graph
+                               .node(&NodeId::from_pubkey(&first_node))
+                               .and_then(|node_info| node_info.announcement_info.as_ref())
+                               .map(|announcement_info| (announcement_info.features(), announcement_info.addresses()));
+
+                       match node_details {
+                               Some((features, addresses)) if features.supports_onion_messages() && addresses.len() > 0 => {
+                                       let first_node_addresses = Some(addresses.clone());
+                                       Ok(OnionMessagePath {
+                                               intermediate_nodes: vec![], destination, first_node_addresses
+                                       })
+                               },
+                               _ => Err(()),
+                       }
+               }
+       }
+
+       fn create_blinded_paths<
+               T: secp256k1::Signing + secp256k1::Verification
+       >(
+               &self, recipient: PublicKey, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
+       ) -> Result<Vec<BlindedPath>, ()> {
+               let peers = peers
+                       .into_iter()
+                       .map(|node_id| ForwardNode { node_id, short_channel_id: None });
+               self.create_blinded_paths_from_iter(recipient, peers, secp_ctx, false)
+       }
+
+       fn create_compact_blinded_paths<
+               T: secp256k1::Signing + secp256k1::Verification
+       >(
+               &self, recipient: PublicKey, peers: Vec<ForwardNode>, secp_ctx: &Secp256k1<T>,
+       ) -> Result<Vec<BlindedPath>, ()> {
+               self.create_blinded_paths_from_iter(recipient, peers.into_iter(), secp_ctx, true)
+       }
+}
+
 /// A path for sending an [`OnionMessage`].
 #[derive(Clone)]
 pub struct OnionMessagePath {
@@ -857,8 +1008,8 @@ where
        }
 }
 
-impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref>
-OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref>
+OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
 where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
@@ -866,17 +1017,18 @@ where
        NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
+       APH::Target: AsyncPaymentsMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
 {
        /// Constructs a new `OnionMessenger` to send, forward, and delegate received onion messages to
        /// their respective handlers.
        pub fn new(
                entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
-               offers_handler: OMH, custom_handler: CMH
+               offers_handler: OMH, async_payments_handler: APH, custom_handler: CMH
        ) -> Self {
                Self::new_inner(
                        entropy_source, node_signer, logger, node_id_lookup, message_router,
-                       offers_handler, custom_handler, false
+                       offers_handler, async_payments_handler, custom_handler, false
                )
        }
 
@@ -903,17 +1055,17 @@ where
        /// peers.
        pub fn new_with_offline_peer_interception(
                entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
-               message_router: MR, offers_handler: OMH, custom_handler: CMH
+               message_router: MR, offers_handler: OMH, async_payments_handler: APH, custom_handler: CMH
        ) -> Self {
                Self::new_inner(
                        entropy_source, node_signer, logger, node_id_lookup, message_router,
-                       offers_handler, custom_handler, true
+                       offers_handler, async_payments_handler, custom_handler, true
                )
        }
 
        fn new_inner(
                entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
-               message_router: MR, offers_handler: OMH, custom_handler: CMH,
+               message_router: MR, offers_handler: OMH, async_payments_handler: APH, custom_handler: CMH,
                intercept_messages_for_offline_peers: bool
        ) -> Self {
                let mut secp_ctx = Secp256k1::new();
@@ -927,9 +1079,13 @@ where
                        node_id_lookup,
                        message_router,
                        offers_handler,
+                       async_payments_handler,
                        custom_handler,
                        intercept_messages_for_offline_peers,
-                       pending_events: Mutex::new(Vec::new()),
+                       pending_events: Mutex::new(PendingEvents {
+                               intercepted_msgs: Vec::new(),
+                               peer_connecteds: Vec::new(),
+                       }),
                }
        }
 
@@ -1010,10 +1166,7 @@ where
                let peers = self.message_recipients.lock().unwrap()
                        .iter()
                        .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_)))
-                       .map(|(node_id, _ )| ForwardNode {
-                               node_id: *node_id,
-                               short_channel_id: None,
-                       })
+                       .map(|(node_id, _ )| *node_id)
                        .collect::<Vec<_>>();
 
                self.message_router
@@ -1150,18 +1303,61 @@ where
                msgs
        }
 
-       fn enqueue_event(&self, event: Event) {
+       fn enqueue_intercepted_event(&self, event: Event) {
                const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
                let mut pending_events = self.pending_events.lock().unwrap();
-               let total_buffered_bytes: usize = pending_events
-                       .iter()
-                       .map(|ev| ev.serialized_length())
-                       .sum();
+               let total_buffered_bytes: usize =
+                       pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
                if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
                        log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
                        return
                }
-               pending_events.push(event);
+               pending_events.intercepted_msgs.push(event);
+       }
+
+       /// Processes any events asynchronously using the given handler.
+       ///
+       /// Note that the event handler is called in the order each event was generated, however
+       /// futures are polled in parallel for some events to allow for parallelism where events do not
+       /// have an ordering requirement.
+       ///
+       /// See the trait-level documentation of [`EventsProvider`] for requirements.
+       pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
+               &self, handler: H
+       ) {
+               let mut intercepted_msgs = Vec::new();
+               let mut peer_connecteds = Vec::new();
+               {
+                       let mut pending_events = self.pending_events.lock().unwrap();
+                       core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
+                       core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
+               }
+
+               let mut futures = Vec::with_capacity(intercepted_msgs.len());
+               for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
+                       if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
+                               if let Some(addresses) = addresses.take() {
+                                       futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
+                               }
+                       }
+               }
+
+               for ev in intercepted_msgs {
+                       if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
+                       futures.push(Some(handler(ev)));
+               }
+               // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
+               crate::util::async_poll::MultiFuturePoller(futures).await;
+
+               if peer_connecteds.len() <= 1 {
+                       for event in peer_connecteds { handler(event).await; }
+               } else {
+                       let mut futures = Vec::new();
+                       for event in peer_connecteds {
+                               futures.push(Some(handler(event)));
+                       }
+                       crate::util::async_poll::MultiFuturePoller(futures).await;
+               }
        }
 }
 
@@ -1188,8 +1384,8 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, On
        false
 }
 
-impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> EventsProvider
-for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref> EventsProvider
+for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
 where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
@@ -1197,6 +1393,7 @@ where
        NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
+       APH::Target: AsyncPaymentsMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
 {
        fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
@@ -1208,15 +1405,28 @@ where
                        }
                }
                let mut events = Vec::new();
-               core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
+               {
+                       let mut pending_events = self.pending_events.lock().unwrap();
+                       #[cfg(debug_assertions)] {
+                               for ev in pending_events.intercepted_msgs.iter() {
+                                       if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
+                               }
+                               for ev in pending_events.peer_connecteds.iter() {
+                                       if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
+                               }
+                       }
+                       core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
+                       events.append(&mut pending_events.peer_connecteds);
+                       pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
+               }
                for ev in events {
                        handler.handle_event(ev);
                }
        }
 }
 
-impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> OnionMessageHandler
-for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref> OnionMessageHandler
+for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
 where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
@@ -1224,6 +1434,7 @@ where
        NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
+       APH::Target: AsyncPaymentsMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
 {
        fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) {
@@ -1235,18 +1446,26 @@ where
                                        "Received an onion message with path_id {:02x?} and {} reply_path: {:?}",
                                        path_id, if reply_path.is_some() { "a" } else { "no" }, message);
 
+                               let responder = reply_path.map(
+                                       |reply_path| Responder::new(reply_path, path_id)
+                               );
                                match message {
                                        ParsedOnionMessageContents::Offers(msg) => {
-                                               let responder = reply_path.map(
-                                                       |reply_path| Responder::new(reply_path, path_id)
-                                               );
                                                let response_instructions = self.offers_handler.handle_message(msg, responder);
                                                let _ = self.handle_onion_message_response(response_instructions);
                                        },
-                                       ParsedOnionMessageContents::Custom(msg) => {
-                                               let responder = reply_path.map(
-                                                       |reply_path| Responder::new(reply_path, path_id)
+                                       #[cfg(async_payments)]
+                                       ParsedOnionMessageContents::AsyncPayments(AsyncPaymentsMessage::HeldHtlcAvailable(msg)) => {
+                                               let response_instructions = self.async_payments_handler.held_htlc_available(
+                                                       msg, responder
                                                );
+                                               let _ = self.handle_onion_message_response(response_instructions);
+                                       },
+                                       #[cfg(async_payments)]
+                                       ParsedOnionMessageContents::AsyncPayments(AsyncPaymentsMessage::ReleaseHeldHtlc(msg)) => {
+                                               self.async_payments_handler.release_held_htlc(msg);
+                                       },
+                                       ParsedOnionMessageContents::Custom(msg) => {
                                                let response_instructions = self.custom_handler.handle_custom_message(msg, responder);
                                                let _ = self.handle_onion_message_response(response_instructions);
                                        },
@@ -1286,7 +1505,7 @@ where
                                                log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
                                        },
                                        _ if self.intercept_messages_for_offline_peers => {
-                                               self.enqueue_event(
+                                               self.enqueue_intercepted_event(
                                                        Event::OnionMessageIntercepted {
                                                                peer_node_id: next_node_id, message: onion_message
                                                        }
@@ -1314,7 +1533,7 @@ where
                                .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
                                .mark_connected();
                        if self.intercept_messages_for_offline_peers {
-                               self.enqueue_event(
+                               self.pending_events.lock().unwrap().peer_connecteds.push(
                                        Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
                                );
                        }
@@ -1414,6 +1633,7 @@ pub type SimpleArcOnionMessenger<M, T, F, L> = OnionMessenger<
        Arc<SimpleArcChannelManager<M, T, F, L>>,
        Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<L>>>, Arc<L>, Arc<KeysManager>>>,
        Arc<SimpleArcChannelManager<M, T, F, L>>,
+       Arc<SimpleArcChannelManager<M, T, F, L>>,
        IgnoringMessageHandler
 >;
 
@@ -1434,6 +1654,7 @@ pub type SimpleRefOnionMessenger<
        &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>,
        &'j DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>,
        &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>,
+       &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>,
        IgnoringMessageHandler
 >;