X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fonion_message%2Fmessenger.rs;h=7bea65ab5af53eb3d9f7e7014454393d892dae86;hb=3abec4f03194d818b72bf377b2086563c56f1812;hp=5be5ba83e3abcb432a09ab58b794c5ecfec48a7f;hpb=32a5139eb599f6ff565f4422e4da6e9421d74094;p=rust-lightning diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 5be5ba83..7bea65ab 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -15,8 +15,8 @@ use bitcoin::hashes::hmac::{Hmac, HmacEngine}; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; -use crate::blinded_path::{BlindedPath, IntroductionNode}; -use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, NextHop, ReceiveTlvs}; +use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp}; +use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, ReceiveTlvs}; use crate::blinded_path::utils; use crate::events::{Event, EventHandler, EventsProvider}; use crate::sign::{EntropySource, NodeSigner, Recipient}; @@ -47,6 +47,70 @@ use { pub(super) const MAX_TIMER_TICKS: usize = 2; +/// A trivial trait which describes any [`OnionMessenger`]. +/// +/// This is not exported to bindings users as general cover traits aren't useful in other +/// languages. +pub trait AOnionMessenger { + /// A type implementing [`EntropySource`] + type EntropySource: EntropySource + ?Sized; + /// A type that may be dereferenced to [`Self::EntropySource`] + type ES: Deref; + /// A type implementing [`NodeSigner`] + type NodeSigner: NodeSigner + ?Sized; + /// A type that may be dereferenced to [`Self::NodeSigner`] + type NS: Deref; + /// A type implementing [`Logger`] + type Logger: Logger + ?Sized; + /// A type that may be dereferenced to [`Self::Logger`] + type L: Deref; + /// A type implementing [`NodeIdLookUp`] + type NodeIdLookUp: NodeIdLookUp + ?Sized; + /// A type that may be dereferenced to [`Self::NodeIdLookUp`] + type NL: Deref; + /// A type implementing [`MessageRouter`] + type MessageRouter: MessageRouter + ?Sized; + /// A type that may be dereferenced to [`Self::MessageRouter`] + type MR: Deref; + /// A type implementing [`OffersMessageHandler`] + type OffersMessageHandler: OffersMessageHandler + ?Sized; + /// A type that may be dereferenced to [`Self::OffersMessageHandler`] + type OMH: Deref; + /// A type implementing [`CustomOnionMessageHandler`] + type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized; + /// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`] + type CMH: Deref; + /// Returns a reference to the actual [`OnionMessenger`] object. + fn get_om(&self) -> &OnionMessenger; +} + +impl AOnionMessenger +for OnionMessenger where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + NL::Target: NodeIdLookUp, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + CMH::Target: CustomOnionMessageHandler, +{ + type EntropySource = ES::Target; + type ES = ES; + type NodeSigner = NS::Target; + type NS = NS; + type Logger = L::Target; + type L = L; + type NodeIdLookUp = NL::Target; + type NL = NL; + type MessageRouter = MR::Target; + type MR = MR; + type OffersMessageHandler = OMH::Target; + type OMH = OMH; + type CustomOnionMessageHandler = CMH::Target; + type CMH = CMH; + fn get_om(&self) -> &OnionMessenger { self } +} + /// A sender, receiver and forwarder of [`OnionMessage`]s. /// /// # Handling Messages @@ -70,7 +134,7 @@ pub(super) const MAX_TIMER_TICKS: usize = 2; /// # use bitcoin::hashes::_export::_core::time::Duration; /// # use bitcoin::hashes::hex::FromHex; /// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey, self}; -/// # use lightning::blinded_path::BlindedPath; +/// # use lightning::blinded_path::{BlindedPath, EmptyNodeIdLookUp}; /// # use lightning::sign::{EntropySource, KeysManager}; /// # use lightning::ln::peer_handler::IgnoringMessageHandler; /// # use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath, OnionMessenger}; @@ -111,14 +175,15 @@ pub(super) const MAX_TIMER_TICKS: usize = 2; /// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret); /// # let (hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1); /// # let destination_node_id = hop_node_id1; +/// # let node_id_lookup = EmptyNodeIdLookUp {}; /// # let message_router = Arc::new(FakeMessageRouter {}); /// # let custom_message_handler = IgnoringMessageHandler {}; /// # let offers_message_handler = IgnoringMessageHandler {}; /// // Create the onion messenger. This must use the same `keys_manager` as is passed to your /// // ChannelManager. /// let onion_messenger = OnionMessenger::new( -/// &keys_manager, &keys_manager, logger, message_router, &offers_message_handler, -/// &custom_message_handler +/// &keys_manager, &keys_manager, logger, &node_id_lookup, message_router, +/// &offers_message_handler, &custom_message_handler /// ); /// # #[derive(Debug)] @@ -134,6 +199,7 @@ pub(super) const MAX_TIMER_TICKS: usize = 2; /// # let your_custom_message_type = 42; /// your_custom_message_type /// } +/// fn msg_type(&self) -> &'static str { "YourCustomMessageType" } /// } /// // Send a custom onion message to a node id. /// let destination = Destination::Node(destination_node_id); @@ -155,11 +221,12 @@ pub(super) const MAX_TIMER_TICKS: usize = 2; /// /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest /// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice -pub struct OnionMessenger +pub struct OnionMessenger where ES::Target: EntropySource, NS::Target: NodeSigner, L::Target: Logger, + NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, CMH::Target: CustomOnionMessageHandler, @@ -169,9 +236,17 @@ where logger: L, message_recipients: Mutex>, secp_ctx: Secp256k1, + node_id_lookup: NL, message_router: MR, offers_handler: OMH, custom_handler: CMH, + intercept_messages_for_offline_peers: bool, + pending_events: Mutex, +} + +struct PendingEvents { + intercepted_msgs: Vec, + peer_connecteds: Vec, } /// [`OnionMessage`]s buffered to be sent. @@ -243,6 +318,50 @@ impl OnionMessageRecipient { } } + +/// The `Responder` struct creates an appropriate [`ResponseInstruction`] +/// for responding to a message. +pub struct Responder { + /// The path along which a response can be sent. + reply_path: BlindedPath, + path_id: Option<[u8; 32]> +} + +impl Responder { + /// Creates a new [`Responder`] instance with the provided reply path. + fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self { + Responder { + reply_path, + path_id, + } + } + + /// Creates the appropriate [`ResponseInstruction`] for a given response. + pub fn respond(self, response: T) -> ResponseInstruction { + ResponseInstruction::WithoutReplyPath(OnionMessageResponse { + message: response, + reply_path: self.reply_path, + path_id: self.path_id, + }) + } +} + +/// This struct contains the information needed to reply to a received message. +pub struct OnionMessageResponse { + message: T, + reply_path: BlindedPath, + path_id: Option<[u8; 32]>, +} + +/// `ResponseInstruction` represents instructions for responding to received messages. +pub enum ResponseInstruction { + /// Indicates that a response should be sent without including a reply path + /// for the recipient to respond back. + WithoutReplyPath(OnionMessageResponse), + /// Indicates that there's no response to send back. + NoResponse, +} + /// An [`OnionMessage`] for [`OnionMessenger`] to send. /// /// These are obtained when released from [`OnionMessenger`]'s handlers after which they are @@ -543,7 +662,7 @@ pub trait CustomOnionMessageHandler { /// Called with the custom message that was received, returning a response to send, if any. /// /// The returned [`Self::CustomMessage`], if any, is enqueued to be sent by [`OnionMessenger`]. - fn handle_custom_message(&self, msg: Self::CustomMessage) -> Option; + fn handle_custom_message(&self, message: Self::CustomMessage, responder: Option) -> ResponseInstruction; /// Read a custom message of type `message_type` from `buffer`, returning `Ok(None)` if the /// message type is unknown. @@ -566,26 +685,59 @@ pub trait CustomOnionMessageHandler { /// A processed incoming onion message, containing either a Forward (another onion message) /// or a Receive payload with decrypted contents. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum PeeledOnion { /// Forwarded onion, with the next node id and a new onion - Forward(NextHop, OnionMessage), + Forward(NextMessageHop, OnionMessage), /// Received onion message, with decrypted contents, path_id, and reply path Receive(ParsedOnionMessageContents, Option<[u8; 32]>, Option) } + +/// Creates an [`OnionMessage`] with the given `contents` for sending to the destination of +/// `path`, first calling [`Destination::resolve`] on `path.destination` with the given +/// [`ReadOnlyNetworkGraph`]. +/// +/// Returns the node id of the peer to send the message to, the message itself, and any addresses +/// needed to connect to the first node. +pub fn create_onion_message_resolving_destination< + ES: Deref, NS: Deref, NL: Deref, T: OnionMessageContents +>( + entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL, + network_graph: &ReadOnlyNetworkGraph, secp_ctx: &Secp256k1, + mut path: OnionMessagePath, contents: T, reply_path: Option, +) -> Result<(PublicKey, OnionMessage, Option>), SendError> +where + ES::Target: EntropySource, + NS::Target: NodeSigner, + NL::Target: NodeIdLookUp, +{ + path.destination.resolve(network_graph); + create_onion_message( + entropy_source, node_signer, node_id_lookup, secp_ctx, path, contents, reply_path, + ) +} + /// Creates an [`OnionMessage`] with the given `contents` for sending to the destination of /// `path`. /// /// Returns the node id of the peer to send the message to, the message itself, and any addresses -/// need to connect to the first node. -pub fn create_onion_message( - entropy_source: &ES, node_signer: &NS, secp_ctx: &Secp256k1, - path: OnionMessagePath, contents: T, reply_path: Option, +/// needed to connect to the first node. +/// +/// Returns [`SendError::UnresolvedIntroductionNode`] if: +/// - `destination` contains a blinded path with an [`IntroductionNode::DirectedShortChannelId`], +/// - unless it can be resolved by [`NodeIdLookUp::next_node_id`]. +/// Use [`create_onion_message_resolving_destination`] instead to resolve the introduction node +/// first with a [`ReadOnlyNetworkGraph`]. +pub fn create_onion_message( + entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL, + secp_ctx: &Secp256k1, path: OnionMessagePath, contents: T, + reply_path: Option, ) -> Result<(PublicKey, OnionMessage, Option>), SendError> where ES::Target: EntropySource, NS::Target: NodeSigner, + NL::Target: NodeIdLookUp, { let OnionMessagePath { intermediate_nodes, mut destination, first_node_addresses } = path; if let Destination::BlindedPath(BlindedPath { ref blinded_hops, .. }) = destination { @@ -600,16 +752,19 @@ where // advance the blinded path by 1 hop so the second hop is the new introduction node. if intermediate_nodes.len() == 0 { if let Destination::BlindedPath(ref mut blinded_path) = destination { + let our_node_id = node_signer.get_node_id(Recipient::Node) + .map_err(|()| SendError::GetNodeIdFailed)?; let introduction_node_id = match blinded_path.introduction_node { IntroductionNode::NodeId(pubkey) => pubkey, - IntroductionNode::DirectedShortChannelId(..) => { - return Err(SendError::UnresolvedIntroductionNode); + IntroductionNode::DirectedShortChannelId(direction, scid) => { + match node_id_lookup.next_node_id(scid) { + Some(next_node_id) => *direction.select_pubkey(&our_node_id, &next_node_id), + None => return Err(SendError::UnresolvedIntroductionNode), + } }, }; - let our_node_id = node_signer.get_node_id(Recipient::Node) - .map_err(|()| SendError::GetNodeIdFailed)?; if introduction_node_id == our_node_id { - advance_path_by_one(blinded_path, node_signer, &secp_ctx) + advance_path_by_one(blinded_path, node_signer, node_id_lookup, &secp_ctx) .map_err(|()| SendError::BlindedPathAdvanceFailed)?; } } @@ -741,12 +896,13 @@ where } } -impl -OnionMessenger +impl +OnionMessenger where ES::Target: EntropySource, NS::Target: NodeSigner, L::Target: Logger, + NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, CMH::Target: CustomOnionMessageHandler, @@ -754,8 +910,50 @@ where /// Constructs a new `OnionMessenger` to send, forward, and delegate received onion messages to /// their respective handlers. pub fn new( - entropy_source: ES, node_signer: NS, logger: L, message_router: MR, offers_handler: OMH, - custom_handler: CMH + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, + offers_handler: OMH, custom_handler: CMH + ) -> Self { + Self::new_inner( + entropy_source, node_signer, logger, node_id_lookup, message_router, + offers_handler, custom_handler, false + ) + } + + /// Similar to [`Self::new`], but rather than dropping onion messages that are + /// intended to be forwarded to offline peers, we will intercept them for + /// later forwarding. + /// + /// Interception flow: + /// 1. If an onion message for an offline peer is received, `OnionMessenger` will + /// generate an [`Event::OnionMessageIntercepted`]. Event handlers can + /// then choose to persist this onion message for later forwarding, or drop + /// it. + /// 2. When the offline peer later comes back online, `OnionMessenger` will + /// generate an [`Event::OnionMessagePeerConnected`]. Event handlers will + /// then fetch all previously intercepted onion messages for this peer. + /// 3. Once the stored onion messages are fetched, they can finally be + /// forwarded to the now-online peer via [`Self::forward_onion_message`]. + /// + /// # Note + /// + /// LDK will not rate limit how many [`Event::OnionMessageIntercepted`]s + /// are generated, so it is the caller's responsibility to limit how many + /// onion messages are persisted and only persist onion messages for relevant + /// peers. + pub fn new_with_offline_peer_interception( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, + message_router: MR, offers_handler: OMH, custom_handler: CMH + ) -> Self { + Self::new_inner( + entropy_source, node_signer, logger, node_id_lookup, message_router, + offers_handler, custom_handler, true + ) + } + + fn new_inner( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, + message_router: MR, offers_handler: OMH, custom_handler: CMH, + intercept_messages_for_offline_peers: bool ) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); @@ -765,9 +963,15 @@ where message_recipients: Mutex::new(new_hash_map()), secp_ctx, logger, + node_id_lookup, message_router, offers_handler, custom_handler, + intercept_messages_for_offline_peers, + pending_events: Mutex::new(PendingEvents { + intercepted_msgs: Vec::new(), + peer_connecteds: Vec::new(), + }), } } @@ -791,13 +995,12 @@ where &self, contents: T, destination: Destination, reply_path: Option, log_suffix: fmt::Arguments ) -> Result { - let mut logger = WithContext::from(&self.logger, None, None); - let result = self.find_path(destination) - .and_then(|path| { - let first_hop = path.intermediate_nodes.get(0).map(|p| *p); - logger = WithContext::from(&self.logger, first_hop, None); - self.enqueue_onion_message(path, contents, reply_path, log_suffix) - }); + let mut logger = WithContext::from(&self.logger, None, None, None); + let result = self.find_path(destination).and_then(|path| { + let first_hop = path.intermediate_nodes.get(0).map(|p| *p); + logger = WithContext::from(&self.logger, first_hop, None, None); + self.enqueue_onion_message(path, contents, reply_path, log_suffix) + }); match result.as_ref() { Err(SendError::GetNodeIdFailed) => { @@ -847,7 +1050,8 @@ where log_trace!(self.logger, "Constructing onion message {}: {:?}", log_suffix, contents); let (first_node_id, onion_message, addresses) = create_onion_message( - &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path + &self.entropy_source, &self.node_signer, &self.node_id_lookup, &self.secp_ctx, path, + contents, reply_path, )?; let mut message_recipients = self.message_recipients.lock().unwrap(); @@ -875,6 +1079,27 @@ where } } + /// Forwards an [`OnionMessage`] to `peer_node_id`. Useful if we initialized + /// the [`OnionMessenger`] with [`Self::new_with_offline_peer_interception`] + /// and want to forward a previously intercepted onion message to a peer that + /// has just come online. + pub fn forward_onion_message( + &self, message: OnionMessage, peer_node_id: &PublicKey + ) -> Result<(), SendError> { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&peer_node_id, &message_recipients) { + return Err(SendError::BufferFull); + } + + match message_recipients.entry(*peer_node_id) { + hash_map::Entry::Occupied(mut e) if e.get().is_connected() => { + e.get_mut().enqueue_message(message); + Ok(()) + }, + _ => Err(SendError::InvalidFirstHop(*peer_node_id)) + } + } + #[cfg(any(test, feature = "_test_utils"))] pub fn send_onion_message_using_path( &self, path: OnionMessagePath, contents: T, reply_path: Option @@ -891,19 +1116,18 @@ where } fn handle_onion_message_response( - &self, response: Option, reply_path: Option, log_suffix: fmt::Arguments + &self, response: ResponseInstruction ) { - if let Some(response) = response { - match reply_path { - Some(reply_path) => { - let _ = self.find_path_and_enqueue_onion_message( - response, Destination::BlindedPath(reply_path), None, log_suffix - ); - }, - None => { - log_trace!(self.logger, "Missing reply path {}", log_suffix); - }, - } + if let ResponseInstruction::WithoutReplyPath(response) = response { + let message_type = response.message.msg_type(); + let _ = self.find_path_and_enqueue_onion_message( + response.message, Destination::BlindedPath(response.reply_path), None, + format_args!( + "when responding with {} to an onion message with path_id {:02x?}", + message_type, + response.path_id + ) + ); } } @@ -918,6 +1142,63 @@ where } msgs } + + fn enqueue_intercepted_event(&self, event: Event) { + const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256; + let mut pending_events = self.pending_events.lock().unwrap(); + let total_buffered_bytes: usize = + pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum(); + if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE { + log_trace!(self.logger, "Dropping event {:?}: buffer full", event); + return + } + pending_events.intercepted_msgs.push(event); + } + + /// Processes any events asynchronously using the given handler. + /// + /// Note that the event handler is called in the order each event was generated, however + /// futures are polled in parallel for some events to allow for parallelism where events do not + /// have an ordering requirement. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + pub async fn process_pending_events_async + core::marker::Unpin, H: Fn(Event) -> Future>( + &self, handler: H + ) { + let mut intercepted_msgs = Vec::new(); + let mut peer_connecteds = Vec::new(); + { + let mut pending_events = self.pending_events.lock().unwrap(); + core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs); + core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds); + } + + let mut futures = Vec::with_capacity(intercepted_msgs.len()); + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }))); + } + } + } + + for ev in intercepted_msgs { + if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } + futures.push(Some(handler(ev))); + } + // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds + crate::util::async_poll::MultiFuturePoller(futures).await; + + if peer_connecteds.len() <= 1 { + for event in peer_connecteds { handler(event).await; } + } else { + let mut futures = Vec::new(); + for event in peer_connecteds { + futures.push(Some(handler(event))); + } + crate::util::async_poll::MultiFuturePoller(futures).await; + } + } } fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { @@ -943,12 +1224,13 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap EventsProvider -for OnionMessenger +impl EventsProvider +for OnionMessenger where ES::Target: EntropySource, NS::Target: NodeSigner, L::Target: Logger, + NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, CMH::Target: CustomOnionMessageHandler, @@ -961,21 +1243,40 @@ where } } } + let mut events = Vec::new(); + { + let mut pending_events = self.pending_events.lock().unwrap(); + #[cfg(debug_assertions)] { + for ev in pending_events.intercepted_msgs.iter() { + if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); } + } + for ev in pending_events.peer_connecteds.iter() { + if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } + } + } + core::mem::swap(&mut pending_events.intercepted_msgs, &mut events); + events.append(&mut pending_events.peer_connecteds); + pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage + } + for ev in events { + handler.handle_event(ev); + } } } -impl OnionMessageHandler -for OnionMessenger +impl OnionMessageHandler +for OnionMessenger where ES::Target: EntropySource, NS::Target: NodeSigner, L::Target: Logger, + NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, CMH::Target: CustomOnionMessageHandler, { fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) { - let logger = WithContext::from(&self.logger, Some(*peer_node_id), None); + let logger = WithContext::from(&self.logger, Some(*peer_node_id), None, None); match self.peel_onion_message(msg) { Ok(PeeledOnion::Receive(message, path_id, reply_path)) => { log_trace!( @@ -985,29 +1286,31 @@ where match message { ParsedOnionMessageContents::Offers(msg) => { - let response = self.offers_handler.handle_message(msg); - self.handle_onion_message_response( - response, reply_path, format_args!( - "when responding to Offers onion message with path_id {:02x?}", - path_id - ) + let responder = reply_path.map( + |reply_path| Responder::new(reply_path, path_id) ); + let response_instructions = self.offers_handler.handle_message(msg, responder); + self.handle_onion_message_response(response_instructions); }, ParsedOnionMessageContents::Custom(msg) => { - let response = self.custom_handler.handle_custom_message(msg); - self.handle_onion_message_response( - response, reply_path, format_args!( - "when responding to Custom onion message with path_id {:02x?}", - path_id - ) + let responder = reply_path.map( + |reply_path| Responder::new(reply_path, path_id) ); + let response_instructions = self.custom_handler.handle_custom_message(msg, responder); + self.handle_onion_message_response(response_instructions); }, } }, Ok(PeeledOnion::Forward(next_hop, onion_message)) => { let next_node_id = match next_hop { - NextHop::NodeId(pubkey) => pubkey, - NextHop::ShortChannelId(_) => todo!(), + NextMessageHop::NodeId(pubkey) => pubkey, + NextMessageHop::ShortChannelId(scid) => match self.node_id_lookup.next_node_id(scid) { + Some(pubkey) => pubkey, + None => { + log_trace!(self.logger, "Dropping forwarded onion messager: unable to resolve next hop using SCID {}", scid); + return + }, + }, }; let mut message_recipients = self.message_recipients.lock().unwrap(); @@ -1031,6 +1334,13 @@ where e.get_mut().enqueue_message(onion_message); log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id); }, + _ if self.intercept_messages_for_offline_peers => { + self.enqueue_intercepted_event( + Event::OnionMessageIntercepted { + peer_node_id: next_node_id, message: onion_message + } + ); + }, _ => { log_trace!( logger, @@ -1052,6 +1362,11 @@ where .entry(*their_node_id) .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); + if self.intercept_messages_for_offline_peers { + self.pending_events.lock().unwrap().peer_connecteds.push( + Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } + ); + } } else { self.message_recipients.lock().unwrap().remove(their_node_id); } @@ -1145,6 +1460,7 @@ pub type SimpleArcOnionMessenger = OnionMessenger< Arc, Arc, Arc, + Arc>, Arc>>, Arc, Arc>>, Arc>, IgnoringMessageHandler @@ -1164,8 +1480,9 @@ pub type SimpleRefOnionMessenger< &'a KeysManager, &'a KeysManager, &'b L, - &'i DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>, - &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, + &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, + &'j DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>, + &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, IgnoringMessageHandler >; @@ -1203,7 +1520,7 @@ fn packet_payloads_and_keys