Introduce ResponseInstruction::WithReplyPath variant.
[rust-lightning] / lightning / src / onion_message / messenger.rs
index 5be5ba83e3abcb432a09ab58b794c5ecfec48a7f..aa5f658949254fd98dd8de656c11eefba93c56d3 100644 (file)
@@ -15,8 +15,8 @@ use bitcoin::hashes::hmac::{Hmac, HmacEngine};
 use bitcoin::hashes::sha256::Hash as Sha256;
 use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
 
-use crate::blinded_path::{BlindedPath, IntroductionNode};
-use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, NextHop, ReceiveTlvs};
+use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp};
+use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, ReceiveTlvs};
 use crate::blinded_path::utils;
 use crate::events::{Event, EventHandler, EventsProvider};
 use crate::sign::{EntropySource, NodeSigner, Recipient};
@@ -70,7 +70,8 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 /// # use bitcoin::hashes::_export::_core::time::Duration;
 /// # use bitcoin::hashes::hex::FromHex;
 /// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey, self};
-/// # use lightning::blinded_path::BlindedPath;
+/// # use lightning::blinded_path::{BlindedPath, EmptyNodeIdLookUp};
+/// # use lightning::blinded_path::message::ForwardNode;
 /// # use lightning::sign::{EntropySource, KeysManager};
 /// # use lightning::ln::peer_handler::IgnoringMessageHandler;
 /// # use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath, OnionMessenger};
@@ -97,7 +98,7 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 /// #         })
 /// #     }
 /// #     fn create_blinded_paths<T: secp256k1::Signing + secp256k1::Verification>(
-/// #         &self, _recipient: PublicKey, _peers: Vec<PublicKey>, _secp_ctx: &Secp256k1<T>
+/// #         &self, _recipient: PublicKey, _peers: Vec<ForwardNode>, _secp_ctx: &Secp256k1<T>
 /// #     ) -> Result<Vec<BlindedPath>, ()> {
 /// #         unreachable!()
 /// #     }
@@ -111,14 +112,15 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 /// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret);
 /// # let (hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1);
 /// # let destination_node_id = hop_node_id1;
+/// # let node_id_lookup = EmptyNodeIdLookUp {};
 /// # let message_router = Arc::new(FakeMessageRouter {});
 /// # let custom_message_handler = IgnoringMessageHandler {};
 /// # let offers_message_handler = IgnoringMessageHandler {};
 /// // Create the onion messenger. This must use the same `keys_manager` as is passed to your
 /// // ChannelManager.
 /// let onion_messenger = OnionMessenger::new(
-///     &keys_manager, &keys_manager, logger, message_router, &offers_message_handler,
-///     &custom_message_handler
+///     &keys_manager, &keys_manager, logger, &node_id_lookup, message_router,
+///     &offers_message_handler, &custom_message_handler
 /// );
 
 /// # #[derive(Debug)]
@@ -134,6 +136,7 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 ///            # let your_custom_message_type = 42;
 ///            your_custom_message_type
 ///    }
+///    fn msg_type(&self) -> &'static str { "YourCustomMessageType" }
 /// }
 /// // Send a custom onion message to a node id.
 /// let destination = Destination::Node(destination_node_id);
@@ -143,8 +146,11 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 ///
 /// // Create a blinded path to yourself, for someone to send an onion message to.
 /// # let your_node_id = hop_node_id1;
-/// let hops = [hop_node_id3, hop_node_id4, your_node_id];
-/// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap();
+/// let hops = [
+///    ForwardNode { node_id: hop_node_id3, short_channel_id: None },
+///    ForwardNode { node_id: hop_node_id4, short_channel_id: None },
+/// ];
+/// let blinded_path = BlindedPath::new_for_message(&hops, your_node_id, &keys_manager, &secp_ctx).unwrap();
 ///
 /// // Send a custom onion message to a blinded path.
 /// let destination = Destination::BlindedPath(blinded_path);
@@ -155,11 +161,12 @@ pub(super) const MAX_TIMER_TICKS: usize = 2;
 ///
 /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
 /// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice
-pub struct OnionMessenger<ES: Deref, NS: Deref, L: Deref, MR: Deref, OMH: Deref, CMH: Deref>
+pub struct OnionMessenger<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref>
 where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
        L::Target: Logger,
+       NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
@@ -169,9 +176,12 @@ where
        logger: L,
        message_recipients: Mutex<HashMap<PublicKey, OnionMessageRecipient>>,
        secp_ctx: Secp256k1<secp256k1::All>,
+       node_id_lookup: NL,
        message_router: MR,
        offers_handler: OMH,
        custom_handler: CMH,
+       intercept_messages_for_offline_peers: bool,
+       pending_events: Mutex<Vec<Event>>,
 }
 
 /// [`OnionMessage`]s buffered to be sent.
@@ -243,6 +253,66 @@ impl OnionMessageRecipient {
        }
 }
 
+
+/// The `Responder` struct creates an appropriate [`ResponseInstruction`]
+/// for responding to a message.
+pub struct Responder {
+       /// The path along which a response can be sent.
+       reply_path: BlindedPath,
+       path_id: Option<[u8; 32]>
+}
+
+impl Responder {
+       /// Creates a new [`Responder`] instance with the provided reply path.
+       pub(super) fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self {
+               Responder {
+                       reply_path,
+                       path_id,
+               }
+       }
+
+       /// Creates a [`ResponseInstruction::WithoutReplyPath`] for a given response.
+       ///
+       /// Use when the recipient doesn't need to send back a reply to us.
+       pub fn respond<T: OnionMessageContents>(self, response: T) -> ResponseInstruction<T> {
+               ResponseInstruction::WithoutReplyPath(OnionMessageResponse {
+                       message: response,
+                       reply_path: self.reply_path,
+                       path_id: self.path_id,
+               })
+       }
+
+       /// Creates a [`ResponseInstruction::WithReplyPath`] for a given response.
+       ///
+       /// Use when the recipient needs to send back a reply to us.
+       pub fn respond_with_reply_path<T: OnionMessageContents>(self, response: T) -> ResponseInstruction<T> {
+               ResponseInstruction::WithReplyPath(OnionMessageResponse {
+                       message: response,
+                       reply_path: self.reply_path,
+                       path_id: self.path_id,
+               })
+       }
+}
+
+/// This struct contains the information needed to reply to a received message.
+pub struct OnionMessageResponse<T: OnionMessageContents> {
+       message: T,
+       reply_path: BlindedPath,
+       path_id: Option<[u8; 32]>,
+}
+
+/// `ResponseInstruction` represents instructions for responding to received messages.
+pub enum ResponseInstruction<T: OnionMessageContents> {
+       /// Indicates that a response should be sent including a reply path for
+       /// the recipient to respond back.
+       WithReplyPath(OnionMessageResponse<T>),
+       /// Indicates that a response should be sent without including a reply path
+       /// for the recipient to respond back.
+       WithoutReplyPath(OnionMessageResponse<T>),
+       /// Indicates that there's no response to send back.
+       NoResponse,
+}
+
 /// An [`OnionMessage`] for [`OnionMessenger`] to send.
 ///
 /// These are obtained when released from [`OnionMessenger`]'s handlers after which they are
@@ -287,7 +357,7 @@ pub trait MessageRouter {
        fn create_blinded_paths<
                T: secp256k1::Signing + secp256k1::Verification
        >(
-               &self, recipient: PublicKey, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
+               &self, recipient: PublicKey, peers: Vec<ForwardNode>, secp_ctx: &Secp256k1<T>,
        ) -> Result<Vec<BlindedPath>, ()>;
 }
 
@@ -354,7 +424,7 @@ where
        fn create_blinded_paths<
                T: secp256k1::Signing + secp256k1::Verification
        >(
-               &self, recipient: PublicKey, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
+               &self, recipient: PublicKey, peers: Vec<ForwardNode>, secp_ctx: &Secp256k1<T>,
        ) -> Result<Vec<BlindedPath>, ()> {
                // Limit the number of blinded paths that are computed.
                const MAX_PATHS: usize = 3;
@@ -367,13 +437,13 @@ where
                let is_recipient_announced =
                        network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient));
 
-               let mut peer_info = peers.iter()
+               let mut peer_info = peers.into_iter()
                        // Limit to peers with announced channels
-                       .filter_map(|pubkey|
+                       .filter_map(|peer|
                                network_graph
-                                       .node(&NodeId::from_pubkey(pubkey))
+                                       .node(&NodeId::from_pubkey(&peer.node_id))
                                        .filter(|info| info.channels.len() >= MIN_PEER_CHANNELS)
-                                       .map(|info| (*pubkey, info.is_tor_only(), info.channels.len()))
+                                       .map(|info| (peer, info.is_tor_only(), info.channels.len()))
                        )
                        // Exclude Tor-only nodes when the recipient is announced.
                        .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced))
@@ -385,12 +455,13 @@ where
                });
 
                let paths = peer_info.into_iter()
-                       .map(|(pubkey, _, _)| vec![pubkey, recipient])
-                       .map(|node_pks| BlindedPath::new_for_message(&node_pks, &*self.entropy_source, secp_ctx))
+                       .map(|(peer, _, _)| {
+                               BlindedPath::new_for_message(&[peer], recipient, &*self.entropy_source, secp_ctx)
+                       })
                        .take(MAX_PATHS)
                        .collect::<Result<Vec<_>, _>>();
 
-               match paths {
+               let mut paths = match paths {
                        Ok(paths) if !paths.is_empty() => Ok(paths),
                        _ => {
                                if is_recipient_announced {
@@ -400,7 +471,12 @@ where
                                        Err(())
                                }
                        },
+               }?;
+               for path in &mut paths {
+                       path.use_compact_introduction_node(&network_graph);
                }
+
+               Ok(paths)
        }
 }
 
@@ -504,7 +580,11 @@ pub enum SendError {
        TooFewBlindedHops,
        /// The first hop is not a peer and doesn't have a known [`SocketAddress`].
        InvalidFirstHop(PublicKey),
-       /// A path from the sender to the destination could not be found by the [`MessageRouter`].
+       /// Indicates that a path could not be found by the [`MessageRouter`].
+       ///
+       /// This occurs when either:
+       /// - No path from the sender to the destination was found to send the onion message
+       /// - No reply path to the sender could be created when responding to an onion message
        PathNotFound,
        /// Onion message contents must have a TLV type >= 64.
        InvalidMessage,
@@ -543,7 +623,7 @@ pub trait CustomOnionMessageHandler {
        /// Called with the custom message that was received, returning a response to send, if any.
        ///
        /// The returned [`Self::CustomMessage`], if any, is enqueued to be sent by [`OnionMessenger`].
-       fn handle_custom_message(&self, msg: Self::CustomMessage) -> Option<Self::CustomMessage>;
+       fn handle_custom_message(&self, message: Self::CustomMessage, responder: Option<Responder>) -> ResponseInstruction<Self::CustomMessage>;
 
        /// Read a custom message of type `message_type` from `buffer`, returning `Ok(None)` if the
        /// message type is unknown.
@@ -566,26 +646,59 @@ pub trait CustomOnionMessageHandler {
 
 /// A processed incoming onion message, containing either a Forward (another onion message)
 /// or a Receive payload with decrypted contents.
-#[derive(Debug)]
+#[derive(Clone, Debug)]
 pub enum PeeledOnion<T: OnionMessageContents> {
        /// Forwarded onion, with the next node id and a new onion
-       Forward(NextHop, OnionMessage),
+       Forward(NextMessageHop, OnionMessage),
        /// Received onion message, with decrypted contents, path_id, and reply path
        Receive(ParsedOnionMessageContents<T>, Option<[u8; 32]>, Option<BlindedPath>)
 }
 
+
+/// Creates an [`OnionMessage`] with the given `contents` for sending to the destination of
+/// `path`, first calling [`Destination::resolve`] on `path.destination` with the given
+/// [`ReadOnlyNetworkGraph`].
+///
+/// Returns the node id of the peer to send the message to, the message itself, and any addresses
+/// needed to connect to the first node.
+pub fn create_onion_message_resolving_destination<
+       ES: Deref, NS: Deref, NL: Deref, T: OnionMessageContents
+>(
+       entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL,
+       network_graph: &ReadOnlyNetworkGraph, secp_ctx: &Secp256k1<secp256k1::All>,
+       mut path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>,
+) -> Result<(PublicKey, OnionMessage, Option<Vec<SocketAddress>>), SendError>
+where
+       ES::Target: EntropySource,
+       NS::Target: NodeSigner,
+       NL::Target: NodeIdLookUp,
+{
+       path.destination.resolve(network_graph);
+       create_onion_message(
+               entropy_source, node_signer, node_id_lookup, secp_ctx, path, contents, reply_path,
+       )
+}
+
 /// Creates an [`OnionMessage`] with the given `contents` for sending to the destination of
 /// `path`.
 ///
 /// Returns the node id of the peer to send the message to, the message itself, and any addresses
-/// need to connect to the first node.
-pub fn create_onion_message<ES: Deref, NS: Deref, T: OnionMessageContents>(
-       entropy_source: &ES, node_signer: &NS, secp_ctx: &Secp256k1<secp256k1::All>,
-       path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>,
+/// needed to connect to the first node.
+///
+/// Returns [`SendError::UnresolvedIntroductionNode`] if:
+/// - `destination` contains a blinded path with an [`IntroductionNode::DirectedShortChannelId`],
+/// - unless it can be resolved by [`NodeIdLookUp::next_node_id`].
+/// Use [`create_onion_message_resolving_destination`] instead to resolve the introduction node
+/// first with a [`ReadOnlyNetworkGraph`].
+pub fn create_onion_message<ES: Deref, NS: Deref, NL: Deref, T: OnionMessageContents>(
+       entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL,
+       secp_ctx: &Secp256k1<secp256k1::All>, path: OnionMessagePath, contents: T,
+       reply_path: Option<BlindedPath>,
 ) -> Result<(PublicKey, OnionMessage, Option<Vec<SocketAddress>>), SendError>
 where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
+       NL::Target: NodeIdLookUp,
 {
        let OnionMessagePath { intermediate_nodes, mut destination, first_node_addresses } = path;
        if let Destination::BlindedPath(BlindedPath { ref blinded_hops, .. }) = destination {
@@ -600,16 +713,19 @@ where
        // advance the blinded path by 1 hop so the second hop is the new introduction node.
        if intermediate_nodes.len() == 0 {
                if let Destination::BlindedPath(ref mut blinded_path) = destination {
+                       let our_node_id = node_signer.get_node_id(Recipient::Node)
+                               .map_err(|()| SendError::GetNodeIdFailed)?;
                        let introduction_node_id = match blinded_path.introduction_node {
                                IntroductionNode::NodeId(pubkey) => pubkey,
-                               IntroductionNode::DirectedShortChannelId(..) => {
-                                       return Err(SendError::UnresolvedIntroductionNode);
+                               IntroductionNode::DirectedShortChannelId(direction, scid) => {
+                                       match node_id_lookup.next_node_id(scid) {
+                                               Some(next_node_id) => *direction.select_pubkey(&our_node_id, &next_node_id),
+                                               None => return Err(SendError::UnresolvedIntroductionNode),
+                                       }
                                },
                        };
-                       let our_node_id = node_signer.get_node_id(Recipient::Node)
-                               .map_err(|()| SendError::GetNodeIdFailed)?;
                        if introduction_node_id == our_node_id {
-                               advance_path_by_one(blinded_path, node_signer, &secp_ctx)
+                               advance_path_by_one(blinded_path, node_signer, node_id_lookup, &secp_ctx)
                                        .map_err(|()| SendError::BlindedPathAdvanceFailed)?;
                        }
                }
@@ -741,12 +857,13 @@ where
        }
 }
 
-impl<ES: Deref, NS: Deref, L: Deref, MR: Deref, OMH: Deref, CMH: Deref>
-OnionMessenger<ES, NS, L, MR, OMH, CMH>
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref>
+OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
 where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
        L::Target: Logger,
+       NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
@@ -754,8 +871,50 @@ where
        /// Constructs a new `OnionMessenger` to send, forward, and delegate received onion messages to
        /// their respective handlers.
        pub fn new(
-               entropy_source: ES, node_signer: NS, logger: L, message_router: MR, offers_handler: OMH,
-               custom_handler: CMH
+               entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
+               offers_handler: OMH, custom_handler: CMH
+       ) -> Self {
+               Self::new_inner(
+                       entropy_source, node_signer, logger, node_id_lookup, message_router,
+                       offers_handler, custom_handler, false
+               )
+       }
+
+       /// Similar to [`Self::new`], but rather than dropping onion messages that are
+       /// intended to be forwarded to offline peers, we will intercept them for
+       /// later forwarding.
+       ///
+       /// Interception flow:
+       /// 1. If an onion message for an offline peer is received, `OnionMessenger` will
+       ///    generate an [`Event::OnionMessageIntercepted`]. Event handlers can
+       ///    then choose to persist this onion message for later forwarding, or drop
+       ///    it.
+       /// 2. When the offline peer later comes back online, `OnionMessenger` will
+       ///    generate an [`Event::OnionMessagePeerConnected`]. Event handlers will
+       ///    then fetch all previously intercepted onion messages for this peer.
+       /// 3. Once the stored onion messages are fetched, they can finally be
+       ///    forwarded to the now-online peer via [`Self::forward_onion_message`].
+       ///
+       /// # Note
+       ///
+       /// LDK will not rate limit how many [`Event::OnionMessageIntercepted`]s
+       /// are generated, so it is the caller's responsibility to limit how many
+       /// onion messages are persisted and only persist onion messages for relevant
+       /// peers.
+       pub fn new_with_offline_peer_interception(
+               entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
+               message_router: MR, offers_handler: OMH, custom_handler: CMH
+       ) -> Self {
+               Self::new_inner(
+                       entropy_source, node_signer, logger, node_id_lookup, message_router,
+                       offers_handler, custom_handler, true
+               )
+       }
+
+       fn new_inner(
+               entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
+               message_router: MR, offers_handler: OMH, custom_handler: CMH,
+               intercept_messages_for_offline_peers: bool
        ) -> Self {
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes());
@@ -765,9 +924,12 @@ where
                        message_recipients: Mutex::new(new_hash_map()),
                        secp_ctx,
                        logger,
+                       node_id_lookup,
                        message_router,
                        offers_handler,
                        custom_handler,
+                       intercept_messages_for_offline_peers,
+                       pending_events: Mutex::new(Vec::new()),
                }
        }
 
@@ -791,13 +953,12 @@ where
                &self, contents: T, destination: Destination, reply_path: Option<BlindedPath>,
                log_suffix: fmt::Arguments
        ) -> Result<SendSuccess, SendError> {
-               let mut logger = WithContext::from(&self.logger, None, None);
-               let result = self.find_path(destination)
-                       .and_then(|path| {
-                               let first_hop = path.intermediate_nodes.get(0).map(|p| *p);
-                               logger = WithContext::from(&self.logger, first_hop, None);
-                               self.enqueue_onion_message(path, contents, reply_path, log_suffix)
-                       });
+               let mut logger = WithContext::from(&self.logger, None, None, None);
+               let result = self.find_path(destination).and_then(|path| {
+                       let first_hop = path.intermediate_nodes.get(0).map(|p| *p);
+                       logger = WithContext::from(&self.logger, first_hop, None, None);
+                       self.enqueue_onion_message(path, contents, reply_path, log_suffix)
+               });
 
                match result.as_ref() {
                        Err(SendError::GetNodeIdFailed) => {
@@ -840,6 +1001,27 @@ where
                        .map_err(|_| SendError::PathNotFound)
        }
 
+       fn create_blinded_path(&self) -> Result<BlindedPath, SendError> {
+               let recipient = self.node_signer
+                       .get_node_id(Recipient::Node)
+                       .map_err(|_| SendError::GetNodeIdFailed)?;
+               let secp_ctx = &self.secp_ctx;
+
+               let peers = self.message_recipients.lock().unwrap()
+                       .iter()
+                       .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_)))
+                       .map(|(node_id, _ )| ForwardNode {
+                               node_id: *node_id,
+                               short_channel_id: None,
+                       })
+                       .collect::<Vec<_>>();
+
+               self.message_router
+                       .create_blinded_paths(recipient, peers, secp_ctx)
+                       .and_then(|paths| paths.into_iter().next().ok_or(()))
+                       .map_err(|_| SendError::PathNotFound)
+       }
+
        fn enqueue_onion_message<T: OnionMessageContents>(
                &self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>,
                log_suffix: fmt::Arguments
@@ -847,7 +1029,8 @@ where
                log_trace!(self.logger, "Constructing onion message {}: {:?}", log_suffix, contents);
 
                let (first_node_id, onion_message, addresses) = create_onion_message(
-                       &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path
+                       &self.entropy_source, &self.node_signer, &self.node_id_lookup, &self.secp_ctx, path,
+                       contents, reply_path,
                )?;
 
                let mut message_recipients = self.message_recipients.lock().unwrap();
@@ -875,6 +1058,27 @@ where
                }
        }
 
+       /// Forwards an [`OnionMessage`] to `peer_node_id`. Useful if we initialized
+       /// the [`OnionMessenger`] with [`Self::new_with_offline_peer_interception`]
+       /// and want to forward a previously intercepted onion message to a peer that
+       /// has just come online.
+       pub fn forward_onion_message(
+               &self, message: OnionMessage, peer_node_id: &PublicKey
+       ) -> Result<(), SendError> {
+               let mut message_recipients = self.message_recipients.lock().unwrap();
+               if outbound_buffer_full(&peer_node_id, &message_recipients) {
+                       return Err(SendError::BufferFull);
+               }
+
+               match message_recipients.entry(*peer_node_id) {
+                       hash_map::Entry::Occupied(mut e) if e.get().is_connected() => {
+                               e.get_mut().enqueue_message(message);
+                               Ok(())
+                       },
+                       _ => Err(SendError::InvalidFirstHop(*peer_node_id))
+               }
+       }
+
        #[cfg(any(test, feature = "_test_utils"))]
        pub fn send_onion_message_using_path<T: OnionMessageContents>(
                &self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>
@@ -890,21 +1094,48 @@ where
                )
        }
 
-       fn handle_onion_message_response<T: OnionMessageContents>(
-               &self, response: Option<T>, reply_path: Option<BlindedPath>, log_suffix: fmt::Arguments
-       ) {
-               if let Some(response) = response {
-                       match reply_path {
-                               Some(reply_path) => {
-                                       let _ = self.find_path_and_enqueue_onion_message(
-                                               response, Destination::BlindedPath(reply_path), None, log_suffix
+       /// Handles the response to an [`OnionMessage`] based on its [`ResponseInstruction`],
+       /// enqueueing any response for sending.
+       ///
+       /// This function is useful for asynchronous handling of [`OnionMessage`]s.
+       /// Handlers have the option to return [`ResponseInstruction::NoResponse`], indicating that
+       /// no immediate response should be sent. Then, they can transfer the associated [`Responder`]
+       /// to another task responsible for generating the response asynchronously. Subsequently, when
+       /// the response is prepared and ready for sending, that task can invoke this method to enqueue
+       /// the response for delivery.
+       pub fn handle_onion_message_response<T: OnionMessageContents>(
+               &self, response: ResponseInstruction<T>
+       ) -> Result<Option<SendSuccess>, SendError> {
+               let (response, create_reply_path) = match response {
+                       ResponseInstruction::WithReplyPath(response) => (response, true),
+                       ResponseInstruction::WithoutReplyPath(response) => (response, false),
+                       ResponseInstruction::NoResponse => return Ok(None),
+               };
+
+               let message_type = response.message.msg_type();
+               let reply_path = if create_reply_path {
+                       match self.create_blinded_path() {
+                               Ok(reply_path) => Some(reply_path),
+                               Err(err) => {
+                                       log_trace!(
+                                               self.logger,
+                                               "Failed to create reply path when responding with {} to an onion message \
+                                               with path_id {:02x?}: {:?}",
+                                               message_type, response.path_id, err
                                        );
-                               },
-                               None => {
-                                       log_trace!(self.logger, "Missing reply path {}", log_suffix);
-                               },
+                                       return Err(err);
+                               }
                        }
-               }
+               } else { None };
+
+               self.find_path_and_enqueue_onion_message(
+                       response.message, Destination::BlindedPath(response.reply_path), reply_path,
+                       format_args!(
+                               "when responding with {} to an onion message with path_id {:02x?}",
+                               message_type,
+                               response.path_id
+                       )
+               ).map(|result| Some(result))
        }
 
        #[cfg(test)]
@@ -918,6 +1149,20 @@ where
                }
                msgs
        }
+
+       fn enqueue_event(&self, event: Event) {
+               const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
+               let mut pending_events = self.pending_events.lock().unwrap();
+               let total_buffered_bytes: usize = pending_events
+                       .iter()
+                       .map(|ev| ev.serialized_length())
+                       .sum();
+               if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
+                       log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
+                       return
+               }
+               pending_events.push(event);
+       }
 }
 
 fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, OnionMessageRecipient>) -> bool {
@@ -943,12 +1188,13 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, On
        false
 }
 
-impl<ES: Deref, NS: Deref, L: Deref, MR: Deref, OMH: Deref, CMH: Deref> EventsProvider
-for OnionMessenger<ES, NS, L, MR, OMH, CMH>
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> EventsProvider
+for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
 where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
        L::Target: Logger,
+       NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
@@ -961,21 +1207,27 @@ where
                                }
                        }
                }
+               let mut events = Vec::new();
+               core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
+               for ev in events {
+                       handler.handle_event(ev);
+               }
        }
 }
 
-impl<ES: Deref, NS: Deref, L: Deref, MR: Deref, OMH: Deref, CMH: Deref> OnionMessageHandler
-for OnionMessenger<ES, NS, L, MR, OMH, CMH>
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> OnionMessageHandler
+for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
 where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
        L::Target: Logger,
+       NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
 {
        fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) {
-               let logger = WithContext::from(&self.logger, Some(*peer_node_id), None);
+               let logger = WithContext::from(&self.logger, Some(*peer_node_id), None, None);
                match self.peel_onion_message(msg) {
                        Ok(PeeledOnion::Receive(message, path_id, reply_path)) => {
                                log_trace!(
@@ -985,29 +1237,31 @@ where
 
                                match message {
                                        ParsedOnionMessageContents::Offers(msg) => {
-                                               let response = self.offers_handler.handle_message(msg);
-                                               self.handle_onion_message_response(
-                                                       response, reply_path, format_args!(
-                                                               "when responding to Offers onion message with path_id {:02x?}",
-                                                               path_id
-                                                       )
+                                               let responder = reply_path.map(
+                                                       |reply_path| Responder::new(reply_path, path_id)
                                                );
+                                               let response_instructions = self.offers_handler.handle_message(msg, responder);
+                                               let _ = self.handle_onion_message_response(response_instructions);
                                        },
                                        ParsedOnionMessageContents::Custom(msg) => {
-                                               let response = self.custom_handler.handle_custom_message(msg);
-                                               self.handle_onion_message_response(
-                                                       response, reply_path, format_args!(
-                                                               "when responding to Custom onion message with path_id {:02x?}",
-                                                               path_id
-                                                       )
+                                               let responder = reply_path.map(
+                                                       |reply_path| Responder::new(reply_path, path_id)
                                                );
+                                               let response_instructions = self.custom_handler.handle_custom_message(msg, responder);
+                                               let _ = self.handle_onion_message_response(response_instructions);
                                        },
                                }
                        },
                        Ok(PeeledOnion::Forward(next_hop, onion_message)) => {
                                let next_node_id = match next_hop {
-                                       NextHop::NodeId(pubkey) => pubkey,
-                                       NextHop::ShortChannelId(_) => todo!(),
+                                       NextMessageHop::NodeId(pubkey) => pubkey,
+                                       NextMessageHop::ShortChannelId(scid) => match self.node_id_lookup.next_node_id(scid) {
+                                               Some(pubkey) => pubkey,
+                                               None => {
+                                                       log_trace!(self.logger, "Dropping forwarded onion messager: unable to resolve next hop using SCID {}", scid);
+                                                       return
+                                               },
+                                       },
                                };
 
                                let mut message_recipients = self.message_recipients.lock().unwrap();
@@ -1031,6 +1285,13 @@ where
                                                e.get_mut().enqueue_message(onion_message);
                                                log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
                                        },
+                                       _ if self.intercept_messages_for_offline_peers => {
+                                               self.enqueue_event(
+                                                       Event::OnionMessageIntercepted {
+                                                               peer_node_id: next_node_id, message: onion_message
+                                                       }
+                                               );
+                                       },
                                        _ => {
                                                log_trace!(
                                                        logger,
@@ -1052,6 +1313,11 @@ where
                                .entry(*their_node_id)
                                .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
                                .mark_connected();
+                       if self.intercept_messages_for_offline_peers {
+                               self.enqueue_event(
+                                       Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
+                               );
+                       }
                } else {
                        self.message_recipients.lock().unwrap().remove(their_node_id);
                }
@@ -1145,6 +1411,7 @@ pub type SimpleArcOnionMessenger<M, T, F, L> = OnionMessenger<
        Arc<KeysManager>,
        Arc<KeysManager>,
        Arc<L>,
+       Arc<SimpleArcChannelManager<M, T, F, L>>,
        Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<L>>>, Arc<L>, Arc<KeysManager>>>,
        Arc<SimpleArcChannelManager<M, T, F, L>>,
        IgnoringMessageHandler
@@ -1164,8 +1431,9 @@ pub type SimpleRefOnionMessenger<
        &'a KeysManager,
        &'a KeysManager,
        &'b L,
-       &'i DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>,
-       &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>,
+       &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>,
+       &'j DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>,
+       &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>,
        IgnoringMessageHandler
 >;
 
@@ -1203,7 +1471,7 @@ fn packet_payloads_and_keys<T: OnionMessageContents, S: secp256k1::Signing + sec
                                if let Some(ss) = prev_control_tlvs_ss.take() {
                                        payloads.push((Payload::Forward(ForwardControlTlvs::Unblinded(
                                                ForwardTlvs {
-                                                       next_hop: NextHop::NodeId(unblinded_pk_opt.unwrap()),
+                                                       next_hop: NextMessageHop::NodeId(unblinded_pk_opt.unwrap()),
                                                        next_blinding_override: None,
                                                }
                                        )), ss));
@@ -1213,7 +1481,7 @@ fn packet_payloads_and_keys<T: OnionMessageContents, S: secp256k1::Signing + sec
                        } else if let Some((intro_node_id, blinding_pt)) = intro_node_id_blinding_pt.take() {
                                if let Some(control_tlvs_ss) = prev_control_tlvs_ss.take() {
                                        payloads.push((Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs {
-                                               next_hop: NextHop::NodeId(intro_node_id),
+                                               next_hop: NextMessageHop::NodeId(intro_node_id),
                                                next_blinding_override: Some(blinding_pt),
                                        })), control_tlvs_ss));
                                }