X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fonion_message%2Fmessenger.rs;h=a8ffcc02466f5afa918bf299e7a41572d24b72fc;hb=d792afb08cdc20a30786728c3bfe816dd3b59e47;hp=cf0f9e086b5c41a12c2e9b1b5e27f3a03949fae7;hpb=b6f3d0a5fa6cf6036d317d3ff47e5252be47bc40;p=rust-lightning diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index cf0f9e08..9fd6fd95 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -7,37 +7,46 @@ // You may not use this file except in accordance with one or both of these // licenses. -//! LDK sends, receives, and forwards onion messages via the [`OnionMessenger`]. See its docs for -//! more information. +//! LDK sends, receives, and forwards onion messages via this [`OnionMessenger`], which lives here, +//! as well as various types, traits, and utilities that it uses. use bitcoin::hashes::{Hash, HashEngine}; use bitcoin::hashes::hmac::{Hmac, HmacEngine}; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; -use crate::blinded_path::BlindedPath; -use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, ReceiveTlvs}; +use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp}; +use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, ReceiveTlvs}; use crate::blinded_path::utils; -use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient}; -#[cfg(not(c_bindings))] -use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; +use crate::events::{Event, EventHandler, EventsProvider}; +use crate::sign::{EntropySource, NodeSigner, Recipient}; use crate::ln::features::{InitFeatures, NodeFeatures}; -use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler}; +use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress}; use crate::ln::onion_utils; -use crate::ln::peer_handler::IgnoringMessageHandler; -pub use super::packet::OnionMessageContents; +use crate::routing::gossip::{NetworkGraph, NodeId, ReadOnlyNetworkGraph}; +use super::packet::OnionMessageContents; use super::packet::ParsedOnionMessageContents; use super::offers::OffersMessageHandler; use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN}; -use crate::util::logger::Logger; +use crate::util::logger::{Logger, WithContext}; use crate::util::ser::Writeable; use core::fmt; use core::ops::Deref; use crate::io; -use crate::sync::{Arc, Mutex}; +use crate::sync::Mutex; use crate::prelude::*; +#[cfg(not(c_bindings))] +use { + crate::sign::KeysManager, + crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}, + crate::ln::peer_handler::IgnoringMessageHandler, + crate::sync::Arc, +}; + +pub(super) const MAX_TIMER_TICKS: usize = 2; + /// A sender, receiver and forwarder of [`OnionMessage`]s. /// /// # Handling Messages @@ -59,44 +68,62 @@ use crate::prelude::*; /// ``` /// # extern crate bitcoin; /// # use bitcoin::hashes::_export::_core::time::Duration; -/// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; -/// # use lightning::blinded_path::BlindedPath; -/// # use lightning::sign::KeysManager; +/// # use bitcoin::hashes::hex::FromHex; +/// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey, self}; +/// # use lightning::blinded_path::{BlindedPath, EmptyNodeIdLookUp}; +/// # use lightning::blinded_path::message::ForwardNode; +/// # use lightning::sign::{EntropySource, KeysManager}; /// # use lightning::ln::peer_handler::IgnoringMessageHandler; -/// # use lightning::onion_message::{OnionMessageContents, Destination, MessageRouter, OnionMessagePath, OnionMessenger}; +/// # use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath, OnionMessenger}; +/// # use lightning::onion_message::packet::OnionMessageContents; /// # use lightning::util::logger::{Logger, Record}; /// # use lightning::util::ser::{Writeable, Writer}; /// # use lightning::io; /// # use std::sync::Arc; /// # struct FakeLogger; /// # impl Logger for FakeLogger { -/// # fn log(&self, record: &Record) { unimplemented!() } +/// # fn log(&self, record: Record) { println!("{:?}" , record); } /// # } /// # struct FakeMessageRouter {} /// # impl MessageRouter for FakeMessageRouter { /// # fn find_path(&self, sender: PublicKey, peers: Vec, destination: Destination) -> Result { -/// # unimplemented!() +/// # let secp_ctx = Secp256k1::new(); +/// # let node_secret = SecretKey::from_slice(&>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap(); +/// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret); +/// # let hop_node_id2 = hop_node_id1; +/// # Ok(OnionMessagePath { +/// # intermediate_nodes: vec![hop_node_id1, hop_node_id2], +/// # destination, +/// # first_node_addresses: None, +/// # }) +/// # } +/// # fn create_blinded_paths( +/// # &self, _recipient: PublicKey, _peers: Vec, _secp_ctx: &Secp256k1 +/// # ) -> Result, ()> { +/// # unreachable!() /// # } /// # } /// # let seed = [42u8; 32]; /// # let time = Duration::from_secs(123456); /// # let keys_manager = KeysManager::new(&seed, time.as_secs(), time.subsec_nanos()); /// # let logger = Arc::new(FakeLogger {}); -/// # let node_secret = SecretKey::from_slice(&hex::decode("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap(); +/// # let node_secret = SecretKey::from_slice(&>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap(); /// # let secp_ctx = Secp256k1::new(); /// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret); -/// # let (hop_node_id2, hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1, hop_node_id1); +/// # let (hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1); /// # let destination_node_id = hop_node_id1; +/// # let node_id_lookup = EmptyNodeIdLookUp {}; /// # let message_router = Arc::new(FakeMessageRouter {}); /// # let custom_message_handler = IgnoringMessageHandler {}; /// # let offers_message_handler = IgnoringMessageHandler {}; /// // Create the onion messenger. This must use the same `keys_manager` as is passed to your /// // ChannelManager. /// let onion_messenger = OnionMessenger::new( -/// &keys_manager, &keys_manager, logger, message_router, &offers_message_handler, -/// &custom_message_handler +/// &keys_manager, &keys_manager, logger, &node_id_lookup, message_router, +/// &offers_message_handler, &custom_message_handler /// ); -/// + +/// # #[derive(Debug)] /// # struct YourCustomMessage {} /// impl Writeable for YourCustomMessage { /// fn write(&self, w: &mut W) -> Result<(), io::Error> { @@ -109,50 +136,165 @@ use crate::prelude::*; /// # let your_custom_message_type = 42; /// your_custom_message_type /// } +/// fn msg_type(&self) -> &'static str { "YourCustomMessageType" } /// } /// // Send a custom onion message to a node id. -/// let path = OnionMessagePath { -/// intermediate_nodes: vec![hop_node_id1, hop_node_id2], -/// destination: Destination::Node(destination_node_id), -/// }; +/// let destination = Destination::Node(destination_node_id); /// let reply_path = None; /// # let message = YourCustomMessage {}; -/// onion_messenger.send_onion_message(path, message, reply_path); +/// onion_messenger.send_onion_message(message, destination, reply_path); /// /// // Create a blinded path to yourself, for someone to send an onion message to. /// # let your_node_id = hop_node_id1; -/// let hops = [hop_node_id3, hop_node_id4, your_node_id]; -/// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap(); +/// let hops = [ +/// ForwardNode { node_id: hop_node_id3, short_channel_id: None }, +/// ForwardNode { node_id: hop_node_id4, short_channel_id: None }, +/// ]; +/// let blinded_path = BlindedPath::new_for_message(&hops, your_node_id, &keys_manager, &secp_ctx).unwrap(); /// /// // Send a custom onion message to a blinded path. -/// let path = OnionMessagePath { -/// intermediate_nodes: vec![hop_node_id1, hop_node_id2], -/// destination: Destination::BlindedPath(blinded_path), -/// }; +/// let destination = Destination::BlindedPath(blinded_path); /// let reply_path = None; /// # let message = YourCustomMessage {}; -/// onion_messenger.send_onion_message(path, message, reply_path); +/// onion_messenger.send_onion_message(message, destination, reply_path); /// ``` /// /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest /// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice -pub struct OnionMessenger +pub struct OnionMessenger where ES::Target: EntropySource, NS::Target: NodeSigner, L::Target: Logger, + NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, - CMH:: Target: CustomOnionMessageHandler, + CMH::Target: CustomOnionMessageHandler, { entropy_source: ES, node_signer: NS, logger: L, - pending_messages: Mutex>>, + message_recipients: Mutex>, secp_ctx: Secp256k1, + node_id_lookup: NL, message_router: MR, offers_handler: OMH, custom_handler: CMH, + intercept_messages_for_offline_peers: bool, + pending_events: Mutex>, +} + +/// [`OnionMessage`]s buffered to be sent. +enum OnionMessageRecipient { + /// Messages for a node connected as a peer. + ConnectedPeer(VecDeque), + + /// Messages for a node that is not yet connected, which are dropped after [`MAX_TIMER_TICKS`] + /// and tracked here. + PendingConnection(VecDeque, Option>, usize), +} + +impl OnionMessageRecipient { + fn pending_connection(addresses: Vec) -> Self { + Self::PendingConnection(VecDeque::new(), Some(addresses), 0) + } + + fn pending_messages(&self) -> &VecDeque { + match self { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, + } + } + + fn enqueue_message(&mut self, message: OnionMessage) { + let pending_messages = match self { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, + }; + + pending_messages.push_back(message); + } + + fn dequeue_message(&mut self) -> Option { + let pending_messages = match self { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => { + debug_assert!(false); + pending_messages + }, + }; + + pending_messages.pop_front() + } + + #[cfg(test)] + fn release_pending_messages(&mut self) -> VecDeque { + let pending_messages = match self { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, + }; + + core::mem::take(pending_messages) + } + + fn mark_connected(&mut self) { + if let OnionMessageRecipient::PendingConnection(pending_messages, _, _) = self { + let mut new_pending_messages = VecDeque::new(); + core::mem::swap(pending_messages, &mut new_pending_messages); + *self = OnionMessageRecipient::ConnectedPeer(new_pending_messages); + } + } + + fn is_connected(&self) -> bool { + match self { + OnionMessageRecipient::ConnectedPeer(..) => true, + OnionMessageRecipient::PendingConnection(..) => false, + } + } +} + + +/// The `Responder` struct creates an appropriate [`ResponseInstruction`] +/// for responding to a message. +pub struct Responder { + /// The path along which a response can be sent. + reply_path: BlindedPath, + path_id: Option<[u8; 32]> +} + +impl Responder { + /// Creates a new [`Responder`] instance with the provided reply path. + fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self { + Responder { + reply_path, + path_id, + } + } + + /// Creates the appropriate [`ResponseInstruction`] for a given response. + pub fn respond(self, response: T) -> ResponseInstruction { + ResponseInstruction::WithoutReplyPath(OnionMessageResponse { + message: response, + reply_path: self.reply_path, + path_id: self.path_id, + }) + } +} + +/// This struct contains the information needed to reply to a received message. +pub struct OnionMessageResponse { + message: T, + reply_path: BlindedPath, + path_id: Option<[u8; 32]>, +} + +/// `ResponseInstruction` represents instructions for responding to received messages. +pub enum ResponseInstruction { + /// Indicates that a response should be sent without including a reply path + /// for the recipient to respond back. + WithoutReplyPath(OnionMessageResponse), + /// Indicates that there's no response to send back. + NoResponse, } /// An [`OnionMessage`] for [`OnionMessenger`] to send. @@ -176,7 +318,7 @@ pub struct PendingOnionMessage { /// /// These are obtained when released from [`OnionMessenger`]'s handlers after which they are /// enqueued for sending. -pub type PendingOnionMessage = (T, Destination, Option); +pub type PendingOnionMessage = (T, Destination, Option); pub(crate) fn new_pending_onion_message( contents: T, destination: Destination, reply_path: Option @@ -193,19 +335,129 @@ pub trait MessageRouter { fn find_path( &self, sender: PublicKey, peers: Vec, destination: Destination ) -> Result; + + /// Creates [`BlindedPath`]s to the `recipient` node. The nodes in `peers` are assumed to be + /// direct peers with the `recipient`. + fn create_blinded_paths< + T: secp256k1::Signing + secp256k1::Verification + >( + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()>; } /// A [`MessageRouter`] that can only route to a directly connected [`Destination`]. -pub struct DefaultMessageRouter; +pub struct DefaultMessageRouter>, L: Deref, ES: Deref> +where + L::Target: Logger, + ES::Target: EntropySource, +{ + network_graph: G, + entropy_source: ES, +} + +impl>, L: Deref, ES: Deref> DefaultMessageRouter +where + L::Target: Logger, + ES::Target: EntropySource, +{ + /// Creates a [`DefaultMessageRouter`] using the given [`NetworkGraph`]. + pub fn new(network_graph: G, entropy_source: ES) -> Self { + Self { network_graph, entropy_source } + } +} -impl MessageRouter for DefaultMessageRouter { +impl>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter +where + L::Target: Logger, + ES::Target: EntropySource, +{ fn find_path( - &self, _sender: PublicKey, peers: Vec, destination: Destination + &self, sender: PublicKey, peers: Vec, mut destination: Destination ) -> Result { - if peers.contains(&destination.first_node()) { - Ok(OnionMessagePath { intermediate_nodes: vec![], destination }) + let network_graph = self.network_graph.deref().read_only(); + destination.resolve(&network_graph); + + let first_node = match destination.first_node() { + Some(first_node) => first_node, + None => return Err(()), + }; + + if peers.contains(&first_node) || sender == first_node { + Ok(OnionMessagePath { + intermediate_nodes: vec![], destination, first_node_addresses: None + }) } else { - Err(()) + let node_announcement = network_graph + .node(&NodeId::from_pubkey(&first_node)) + .and_then(|node_info| node_info.announcement_info.as_ref()) + .and_then(|announcement_info| announcement_info.announcement_message.as_ref()) + .map(|node_announcement| &node_announcement.contents); + + match node_announcement { + Some(node_announcement) if node_announcement.features.supports_onion_messages() => { + let first_node_addresses = Some(node_announcement.addresses.clone()); + Ok(OnionMessagePath { + intermediate_nodes: vec![], destination, first_node_addresses + }) + }, + _ => Err(()), + } + } + } + + fn create_blinded_paths< + T: secp256k1::Signing + secp256k1::Verification + >( + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()> { + // Limit the number of blinded paths that are computed. + const MAX_PATHS: usize = 3; + + // Ensure peers have at least three channels so that it is more difficult to infer the + // recipient's node_id. + const MIN_PEER_CHANNELS: usize = 3; + + let network_graph = self.network_graph.deref().read_only(); + let is_recipient_announced = + network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient)); + + let mut peer_info = peers.iter() + // Limit to peers with announced channels + .filter_map(|pubkey| + network_graph + .node(&NodeId::from_pubkey(pubkey)) + .filter(|info| info.channels.len() >= MIN_PEER_CHANNELS) + .map(|info| (*pubkey, info.is_tor_only(), info.channels.len())) + ) + // Exclude Tor-only nodes when the recipient is announced. + .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced)) + .collect::>(); + + // Prefer using non-Tor nodes with the most channels as the introduction node. + peer_info.sort_unstable_by(|(_, a_tor_only, a_channels), (_, b_tor_only, b_channels)| { + a_tor_only.cmp(b_tor_only).then(a_channels.cmp(b_channels).reverse()) + }); + + let paths = peer_info.into_iter() + .map(|(node_id, _, _)| vec![ForwardNode { node_id, short_channel_id: None }]) + .map(|intermediate_nodes| { + BlindedPath::new_for_message( + &intermediate_nodes, recipient, &*self.entropy_source, secp_ctx + ) + }) + .take(MAX_PATHS) + .collect::, _>>(); + + match paths { + Ok(paths) if !paths.is_empty() => Ok(paths), + _ => { + if is_recipient_announced { + BlindedPath::one_hop_for_message(recipient, &*self.entropy_source, secp_ctx) + .map(|path| vec![path]) + } else { + Err(()) + } + }, } } } @@ -218,10 +470,26 @@ pub struct OnionMessagePath { /// The recipient of the message. pub destination: Destination, + + /// Addresses that may be used to connect to [`OnionMessagePath::first_node`]. + /// + /// Only needs to be set if a connection to the node is required. [`OnionMessenger`] may use + /// this to initiate such a connection. + pub first_node_addresses: Option>, +} + +impl OnionMessagePath { + /// Returns the first node in the path. + pub fn first_node(&self) -> Option { + self.intermediate_nodes + .first() + .copied() + .or_else(|| self.destination.first_node()) + } } /// The destination of an onion message. -#[derive(Clone)] +#[derive(Clone, Hash, Debug, PartialEq, Eq)] pub enum Destination { /// We're sending this onion message to a node. Node(PublicKey), @@ -230,6 +498,22 @@ pub enum Destination { } impl Destination { + /// Attempts to resolve the [`IntroductionNode::DirectedShortChannelId`] of a + /// [`Destination::BlindedPath`] to a [`IntroductionNode::NodeId`], if applicable, using the + /// provided [`ReadOnlyNetworkGraph`]. + pub fn resolve(&mut self, network_graph: &ReadOnlyNetworkGraph) { + if let Destination::BlindedPath(path) = self { + if let IntroductionNode::DirectedShortChannelId(..) = path.introduction_node { + if let Some(pubkey) = path + .public_introduction_node_id(network_graph) + .and_then(|node_id| node_id.as_pubkey().ok()) + { + path.introduction_node = IntroductionNode::NodeId(pubkey); + } + } + } + } + pub(super) fn num_hops(&self) -> usize { match self { Destination::Node(_) => 1, @@ -237,18 +521,36 @@ impl Destination { } } - fn first_node(&self) -> PublicKey { + fn first_node(&self) -> Option { match self { - Destination::Node(node_id) => *node_id, - Destination::BlindedPath(BlindedPath { introduction_node_id: node_id, .. }) => *node_id, + Destination::Node(node_id) => Some(*node_id), + Destination::BlindedPath(BlindedPath { introduction_node, .. }) => { + match introduction_node { + IntroductionNode::NodeId(pubkey) => Some(*pubkey), + IntroductionNode::DirectedShortChannelId(..) => None, + } + }, } } } +/// Result of successfully [sending an onion message]. +/// +/// [sending an onion message]: OnionMessenger::send_onion_message +#[derive(Clone, Hash, Debug, PartialEq, Eq)] +pub enum SendSuccess { + /// The message was buffered and will be sent once it is processed by + /// [`OnionMessageHandler::next_onion_message_for_peer`]. + Buffered, + /// The message was buffered and will be sent once the node is connected as a peer and it is + /// processed by [`OnionMessageHandler::next_onion_message_for_peer`]. + BufferedAwaitingConnection(PublicKey), +} + /// Errors that may occur when [sending an onion message]. /// /// [sending an onion message]: OnionMessenger::send_onion_message -#[derive(Debug, PartialEq, Eq)] +#[derive(Clone, Hash, Debug, PartialEq, Eq)] pub enum SendError { /// Errored computing onion message packet keys. Secp256k1(secp256k1::Error), @@ -258,8 +560,10 @@ pub enum SendError { /// The provided [`Destination`] was an invalid [`BlindedPath`] due to not having any blinded /// hops. TooFewBlindedHops, - /// Our next-hop peer was offline or does not support onion message forwarding. - InvalidFirstHop, + /// The first hop is not a peer and doesn't have a known [`SocketAddress`]. + InvalidFirstHop(PublicKey), + /// A path from the sender to the destination could not be found by the [`MessageRouter`]. + PathNotFound, /// Onion message contents must have a TLV type >= 64. InvalidMessage, /// Our next-hop peer's buffer was full or our total outbound buffer was full. @@ -268,6 +572,10 @@ pub enum SendError { /// /// [`NodeSigner`]: crate::sign::NodeSigner GetNodeIdFailed, + /// The provided [`Destination`] has a blinded path with an unresolved introduction node. An + /// attempt to resolve it in the [`MessageRouter`] when finding an [`OnionMessagePath`] likely + /// failed. + UnresolvedIntroductionNode, /// We attempted to send to a blinded path where we are the introduction node, and failed to /// advance the blinded path to make the second hop the new introduction node. Either /// [`NodeSigner::ecdh`] failed, we failed to tweak the current blinding point to get the @@ -293,7 +601,7 @@ pub trait CustomOnionMessageHandler { /// Called with the custom message that was received, returning a response to send, if any. /// /// The returned [`Self::CustomMessage`], if any, is enqueued to be sent by [`OnionMessenger`]. - fn handle_custom_message(&self, msg: Self::CustomMessage) -> Option; + fn handle_custom_message(&self, message: Self::CustomMessage, responder: Option) -> ResponseInstruction; /// Read a custom message of type `message_type` from `buffer`, returning `Ok(None)` if the /// message type is unknown. @@ -316,26 +624,61 @@ pub trait CustomOnionMessageHandler { /// A processed incoming onion message, containing either a Forward (another onion message) /// or a Receive payload with decrypted contents. +#[derive(Clone, Debug)] pub enum PeeledOnion { /// Forwarded onion, with the next node id and a new onion - Forward(PublicKey, OnionMessage), + Forward(NextMessageHop, OnionMessage), /// Received onion message, with decrypted contents, path_id, and reply path Receive(ParsedOnionMessageContents, Option<[u8; 32]>, Option) } + +/// Creates an [`OnionMessage`] with the given `contents` for sending to the destination of +/// `path`, first calling [`Destination::resolve`] on `path.destination` with the given +/// [`ReadOnlyNetworkGraph`]. +/// +/// Returns the node id of the peer to send the message to, the message itself, and any addresses +/// needed to connect to the first node. +pub fn create_onion_message_resolving_destination< + ES: Deref, NS: Deref, NL: Deref, T: OnionMessageContents +>( + entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL, + network_graph: &ReadOnlyNetworkGraph, secp_ctx: &Secp256k1, + mut path: OnionMessagePath, contents: T, reply_path: Option, +) -> Result<(PublicKey, OnionMessage, Option>), SendError> +where + ES::Target: EntropySource, + NS::Target: NodeSigner, + NL::Target: NodeIdLookUp, +{ + path.destination.resolve(network_graph); + create_onion_message( + entropy_source, node_signer, node_id_lookup, secp_ctx, path, contents, reply_path, + ) +} + /// Creates an [`OnionMessage`] with the given `contents` for sending to the destination of /// `path`. /// -/// Returns both the node id of the peer to send the message to and the message itself. -pub fn create_onion_message( - entropy_source: &ES, node_signer: &NS, secp_ctx: &Secp256k1, - path: OnionMessagePath, contents: T, reply_path: Option, -) -> Result<(PublicKey, OnionMessage), SendError> +/// Returns the node id of the peer to send the message to, the message itself, and any addresses +/// needed to connect to the first node. +/// +/// Returns [`SendError::UnresolvedIntroductionNode`] if: +/// - `destination` contains a blinded path with an [`IntroductionNode::DirectedShortChannelId`], +/// - unless it can be resolved by [`NodeIdLookUp::next_node_id`]. +/// Use [`create_onion_message_resolving_destination`] instead to resolve the introduction node +/// first with a [`ReadOnlyNetworkGraph`]. +pub fn create_onion_message( + entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL, + secp_ctx: &Secp256k1, path: OnionMessagePath, contents: T, + reply_path: Option, +) -> Result<(PublicKey, OnionMessage, Option>), SendError> where ES::Target: EntropySource, NS::Target: NodeSigner, + NL::Target: NodeIdLookUp, { - let OnionMessagePath { intermediate_nodes, mut destination } = path; + let OnionMessagePath { intermediate_nodes, mut destination, first_node_addresses } = path; if let Destination::BlindedPath(BlindedPath { ref blinded_hops, .. }) = destination { if blinded_hops.is_empty() { return Err(SendError::TooFewBlindedHops); @@ -350,8 +693,17 @@ where if let Destination::BlindedPath(ref mut blinded_path) = destination { let our_node_id = node_signer.get_node_id(Recipient::Node) .map_err(|()| SendError::GetNodeIdFailed)?; - if blinded_path.introduction_node_id == our_node_id { - advance_path_by_one(blinded_path, node_signer, &secp_ctx) + let introduction_node_id = match blinded_path.introduction_node { + IntroductionNode::NodeId(pubkey) => pubkey, + IntroductionNode::DirectedShortChannelId(direction, scid) => { + match node_id_lookup.next_node_id(scid) { + Some(next_node_id) => *direction.select_pubkey(&our_node_id, &next_node_id), + None => return Err(SendError::UnresolvedIntroductionNode), + } + }, + }; + if introduction_node_id == our_node_id { + advance_path_by_one(blinded_path, node_signer, node_id_lookup, &secp_ctx) .map_err(|()| SendError::BlindedPathAdvanceFailed)?; } } @@ -362,24 +714,28 @@ where let (first_node_id, blinding_point) = if let Some(first_node_id) = intermediate_nodes.first() { (*first_node_id, PublicKey::from_secret_key(&secp_ctx, &blinding_secret)) } else { - match destination { - Destination::Node(pk) => (pk, PublicKey::from_secret_key(&secp_ctx, &blinding_secret)), - Destination::BlindedPath(BlindedPath { introduction_node_id, blinding_point, .. }) => - (introduction_node_id, blinding_point), + match &destination { + Destination::Node(pk) => (*pk, PublicKey::from_secret_key(&secp_ctx, &blinding_secret)), + Destination::BlindedPath(BlindedPath { introduction_node, blinding_point, .. }) => { + match introduction_node { + IntroductionNode::NodeId(pubkey) => (*pubkey, *blinding_point), + IntroductionNode::DirectedShortChannelId(..) => { + return Err(SendError::UnresolvedIntroductionNode); + }, + } + } } }; let (packet_payloads, packet_keys) = packet_payloads_and_keys( - &secp_ctx, &intermediate_nodes, destination, contents, reply_path, &blinding_secret) - .map_err(|e| SendError::Secp256k1(e))?; + &secp_ctx, &intermediate_nodes, destination, contents, reply_path, &blinding_secret + )?; let prng_seed = entropy_source.get_secure_random_bytes(); let onion_routing_packet = construct_onion_message_packet( packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?; - Ok((first_node_id, OnionMessage { - blinding_point, - onion_routing_packet - })) + let message = OnionMessage { blinding_point, onion_routing_packet }; + Ok((first_node_id, message, first_node_addresses)) } /// Decode one layer of an incoming [`OnionMessage`]. @@ -406,7 +762,7 @@ where let blinding_factor = { let mut hmac = HmacEngine::::new(b"blinded_node_id"); hmac.input(control_tlvs_ss.as_ref()); - Hmac::from_engine(hmac).into_inner() + Hmac::from_engine(hmac).to_byte_array() }; match node_signer.ecdh(Recipient::Node, &msg.onion_routing_packet.public_key, Some(&Scalar::from_be_bytes(blinding_factor).unwrap())) @@ -428,9 +784,9 @@ where Ok(PeeledOnion::Receive(message, path_id, reply_path)) }, Ok((Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs { - next_node_id, next_blinding_override + next_hop, next_blinding_override })), Some((next_hop_hmac, new_packet_bytes)))) => { - // TODO: we need to check whether `next_node_id` is our node, in which case this is a dummy + // TODO: we need to check whether `next_hop` is our node, in which case this is a dummy // blinded hop and this onion message is destined for us. In this situation, we should keep // unwrapping the onion layers to get to the final payload. Since we don't have the option // of creating blinded paths with dummy hops currently, we should be ok to not handle this @@ -466,7 +822,7 @@ where onion_routing_packet: outgoing_packet, }; - Ok(PeeledOnion::Forward(next_node_id, onion_message)) + Ok(PeeledOnion::Forward(next_hop, onion_message)) }, Err(e) => { log_trace!(logger, "Errored decoding onion message packet: {:?}", e); @@ -479,12 +835,13 @@ where } } -impl -OnionMessenger +impl +OnionMessenger where ES::Target: EntropySource, NS::Target: NodeSigner, L::Target: Logger, + NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, CMH::Target: CustomOnionMessageHandler, @@ -492,111 +849,258 @@ where /// Constructs a new `OnionMessenger` to send, forward, and delegate received onion messages to /// their respective handlers. pub fn new( - entropy_source: ES, node_signer: NS, logger: L, message_router: MR, offers_handler: OMH, - custom_handler: CMH + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, + offers_handler: OMH, custom_handler: CMH + ) -> Self { + Self::new_inner( + entropy_source, node_signer, logger, node_id_lookup, message_router, + offers_handler, custom_handler, false + ) + } + + /// Similar to [`Self::new`], but rather than dropping onion messages that are + /// intended to be forwarded to offline peers, we will intercept them for + /// later forwarding. + /// + /// Interception flow: + /// 1. If an onion message for an offline peer is received, `OnionMessenger` will + /// generate an [`Event::OnionMessageIntercepted`]. Event handlers can + /// then choose to persist this onion message for later forwarding, or drop + /// it. + /// 2. When the offline peer later comes back online, `OnionMessenger` will + /// generate an [`Event::OnionMessagePeerConnected`]. Event handlers will + /// then fetch all previously intercepted onion messages for this peer. + /// 3. Once the stored onion messages are fetched, they can finally be + /// forwarded to the now-online peer via [`Self::forward_onion_message`]. + /// + /// # Note + /// + /// LDK will not rate limit how many [`Event::OnionMessageIntercepted`]s + /// are generated, so it is the caller's responsibility to limit how many + /// onion messages are persisted and only persist onion messages for relevant + /// peers. + pub fn new_with_offline_peer_interception( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, + message_router: MR, offers_handler: OMH, custom_handler: CMH + ) -> Self { + Self::new_inner( + entropy_source, node_signer, logger, node_id_lookup, message_router, + offers_handler, custom_handler, true + ) + } + + fn new_inner( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, + message_router: MR, offers_handler: OMH, custom_handler: CMH, + intercept_messages_for_offline_peers: bool ) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); OnionMessenger { entropy_source, node_signer, - pending_messages: Mutex::new(HashMap::new()), + message_recipients: Mutex::new(new_hash_map()), secp_ctx, logger, + node_id_lookup, message_router, offers_handler, custom_handler, + intercept_messages_for_offline_peers, + pending_events: Mutex::new(Vec::new()), } } - /// Sends an [`OnionMessage`] with the given `contents` for sending to the destination of - /// `path`. + #[cfg(test)] + pub(crate) fn set_offers_handler(&mut self, offers_handler: OMH) { + self.offers_handler = offers_handler; + } + + /// Sends an [`OnionMessage`] with the given `contents` to `destination`. /// /// See [`OnionMessenger`] for example usage. pub fn send_onion_message( - &self, path: OnionMessagePath, contents: T, reply_path: Option - ) -> Result<(), SendError> { - let (first_node_id, onion_msg) = create_onion_message( - &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path - )?; + &self, contents: T, destination: Destination, reply_path: Option + ) -> Result { + self.find_path_and_enqueue_onion_message( + contents, destination, reply_path, format_args!("") + ) + } - let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap(); - if outbound_buffer_full(&first_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) } - match pending_per_peer_msgs.entry(first_node_id) { - hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop), - hash_map::Entry::Occupied(mut e) => { - e.get_mut().push_back(onion_msg); - Ok(()) - } + fn find_path_and_enqueue_onion_message( + &self, contents: T, destination: Destination, reply_path: Option, + log_suffix: fmt::Arguments + ) -> Result { + let mut logger = WithContext::from(&self.logger, None, None, None); + let result = self.find_path(destination).and_then(|path| { + let first_hop = path.intermediate_nodes.get(0).map(|p| *p); + logger = WithContext::from(&self.logger, first_hop, None, None); + self.enqueue_onion_message(path, contents, reply_path, log_suffix) + }); + + match result.as_ref() { + Err(SendError::GetNodeIdFailed) => { + log_warn!(logger, "Unable to retrieve node id {}", log_suffix); + }, + Err(SendError::PathNotFound) => { + log_trace!(logger, "Failed to find path {}", log_suffix); + }, + Err(e) => { + log_trace!(logger, "Failed sending onion message {}: {:?}", log_suffix, e); + }, + Ok(SendSuccess::Buffered) => { + log_trace!(logger, "Buffered onion message {}", log_suffix); + }, + Ok(SendSuccess::BufferedAwaitingConnection(node_id)) => { + log_trace!( + logger, + "Buffered onion message waiting on peer connection {}: {}", + log_suffix, node_id + ); + }, } + + result } - fn handle_onion_message_response( - &self, response: Option, reply_path: Option, log_suffix: fmt::Arguments - ) { - if let Some(response) = response { - match reply_path { - Some(reply_path) => { - self.find_path_and_enqueue_onion_message( - response, Destination::BlindedPath(reply_path), None, log_suffix - ); - }, - None => { - log_trace!(self.logger, "Missing reply path {}", log_suffix); + fn find_path(&self, destination: Destination) -> Result { + let sender = self.node_signer + .get_node_id(Recipient::Node) + .map_err(|_| SendError::GetNodeIdFailed)?; + + let peers = self.message_recipients.lock().unwrap() + .iter() + .filter(|(_, recipient)| matches!(recipient, OnionMessageRecipient::ConnectedPeer(_))) + .map(|(node_id, _)| *node_id) + .collect(); + + self.message_router + .find_path(sender, peers, destination) + .map_err(|_| SendError::PathNotFound) + } + + fn enqueue_onion_message( + &self, path: OnionMessagePath, contents: T, reply_path: Option, + log_suffix: fmt::Arguments + ) -> Result { + log_trace!(self.logger, "Constructing onion message {}: {:?}", log_suffix, contents); + + let (first_node_id, onion_message, addresses) = create_onion_message( + &self.entropy_source, &self.node_signer, &self.node_id_lookup, &self.secp_ctx, path, + contents, reply_path, + )?; + + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&first_node_id, &message_recipients) { + return Err(SendError::BufferFull); + } + + match message_recipients.entry(first_node_id) { + hash_map::Entry::Vacant(e) => match addresses { + None => Err(SendError::InvalidFirstHop(first_node_id)), + Some(addresses) => { + e.insert(OnionMessageRecipient::pending_connection(addresses)) + .enqueue_message(onion_message); + Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) }, - } + }, + hash_map::Entry::Occupied(mut e) => { + e.get_mut().enqueue_message(onion_message); + if e.get().is_connected() { + Ok(SendSuccess::Buffered) + } else { + Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) + } + }, } } - fn find_path_and_enqueue_onion_message( - &self, contents: T, destination: Destination, reply_path: Option, - log_suffix: fmt::Arguments - ) { - let sender = match self.node_signer.get_node_id(Recipient::Node) { - Ok(node_id) => node_id, - Err(_) => { - log_warn!(self.logger, "Unable to retrieve node id {}", log_suffix); - return; - } - }; + /// Forwards an [`OnionMessage`] to `peer_node_id`. Useful if we initialized + /// the [`OnionMessenger`] with [`Self::new_with_offline_peer_interception`] + /// and want to forward a previously intercepted onion message to a peer that + /// has just come online. + pub fn forward_onion_message( + &self, message: OnionMessage, peer_node_id: &PublicKey + ) -> Result<(), SendError> { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&peer_node_id, &message_recipients) { + return Err(SendError::BufferFull); + } - let peers = self.pending_messages.lock().unwrap().keys().copied().collect(); - let path = match self.message_router.find_path(sender, peers, destination) { - Ok(path) => path, - Err(()) => { - log_trace!(self.logger, "Failed to find path {}", log_suffix); - return; + match message_recipients.entry(*peer_node_id) { + hash_map::Entry::Occupied(mut e) if e.get().is_connected() => { + e.get_mut().enqueue_message(message); + Ok(()) }, - }; + _ => Err(SendError::InvalidFirstHop(*peer_node_id)) + } + } - log_trace!(self.logger, "Sending onion message {}", log_suffix); + #[cfg(any(test, feature = "_test_utils"))] + pub fn send_onion_message_using_path( + &self, path: OnionMessagePath, contents: T, reply_path: Option + ) -> Result { + self.enqueue_onion_message(path, contents, reply_path, format_args!("")) + } - if let Err(e) = self.send_onion_message(path, contents, reply_path) { - log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e); - return; + pub(crate) fn peel_onion_message( + &self, msg: &OnionMessage + ) -> Result::Target as CustomOnionMessageHandler>::CustomMessage>, ()> { + peel_onion_message( + msg, &self.secp_ctx, &*self.node_signer, &*self.logger, &*self.custom_handler + ) + } + + fn handle_onion_message_response( + &self, response: ResponseInstruction + ) { + if let ResponseInstruction::WithoutReplyPath(response) = response { + let message_type = response.message.msg_type(); + let _ = self.find_path_and_enqueue_onion_message( + response.message, Destination::BlindedPath(response.reply_path), None, + format_args!( + "when responding with {} to an onion message with path_id {:02x?}", + message_type, + response.path_id + ) + ); } } #[cfg(test)] pub(super) fn release_pending_msgs(&self) -> HashMap> { - let mut pending_msgs = self.pending_messages.lock().unwrap(); - let mut msgs = HashMap::new(); + let mut message_recipients = self.message_recipients.lock().unwrap(); + let mut msgs = new_hash_map(); // We don't want to disconnect the peers by removing them entirely from the original map, so we - // swap the pending message buffers individually. - for (peer_node_id, pending_messages) in &mut *pending_msgs { - msgs.insert(*peer_node_id, core::mem::take(pending_messages)); + // release the pending message buffers individually. + for (node_id, recipient) in &mut *message_recipients { + msgs.insert(*node_id, recipient.release_pending_messages()); } msgs } + + fn enqueue_event(&self, event: Event) { + const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256; + let mut pending_events = self.pending_events.lock().unwrap(); + let total_buffered_bytes: usize = pending_events + .iter() + .map(|ev| ev.serialized_length()) + .sum(); + if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE { + log_trace!(self.logger, "Dropping event {:?}: buffer full", event); + return + } + pending_events.push(event); + } } -fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap>) -> bool { +fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128; const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256; let mut total_buffered_bytes = 0; let mut peer_buffered_bytes = 0; for (pk, peer_buf) in buffer { - for om in peer_buf { + for om in peer_buf.pending_messages() { let om_len = om.serialized_length(); if pk == peer_node_id { peer_buffered_bytes += om_len; @@ -613,84 +1117,169 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap OnionMessageHandler -for OnionMessenger +impl EventsProvider +for OnionMessenger where ES::Target: EntropySource, NS::Target: NodeSigner, L::Target: Logger, + NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, CMH::Target: CustomOnionMessageHandler, { - fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &OnionMessage) { - match peel_onion_message( - msg, &self.secp_ctx, &*self.node_signer, &*self.logger, &*self.custom_handler - ) { + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); + } + } + } + let mut events = Vec::new(); + core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events); + for ev in events { + handler.handle_event(ev); + } + } +} + +impl OnionMessageHandler +for OnionMessenger +where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + NL::Target: NodeIdLookUp, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + CMH::Target: CustomOnionMessageHandler, +{ + fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) { + let logger = WithContext::from(&self.logger, Some(*peer_node_id), None, None); + match self.peel_onion_message(msg) { Ok(PeeledOnion::Receive(message, path_id, reply_path)) => { - log_trace!(self.logger, - "Received an onion message with path_id {:02x?} and {} reply_path", - path_id, if reply_path.is_some() { "a" } else { "no" }); + log_trace!( + logger, + "Received an onion message with path_id {:02x?} and {} reply_path: {:?}", + path_id, if reply_path.is_some() { "a" } else { "no" }, message); match message { ParsedOnionMessageContents::Offers(msg) => { - let response = self.offers_handler.handle_message(msg); - self.handle_onion_message_response( - response, reply_path, format_args!( - "when responding to Offers onion message with path_id {:02x?}", - path_id - ) + let responder = reply_path.map( + |reply_path| Responder::new(reply_path, path_id) ); + let response_instructions = self.offers_handler.handle_message(msg, responder); + self.handle_onion_message_response(response_instructions); }, ParsedOnionMessageContents::Custom(msg) => { - let response = self.custom_handler.handle_custom_message(msg); - self.handle_onion_message_response( - response, reply_path, format_args!( - "when responding to Custom onion message with path_id {:02x?}", - path_id - ) + let responder = reply_path.map( + |reply_path| Responder::new(reply_path, path_id) ); + let response_instructions = self.custom_handler.handle_custom_message(msg, responder); + self.handle_onion_message_response(response_instructions); }, } }, - Ok(PeeledOnion::Forward(next_node_id, onion_message)) => { - let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap(); - if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) { - log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id); + Ok(PeeledOnion::Forward(next_hop, onion_message)) => { + let next_node_id = match next_hop { + NextMessageHop::NodeId(pubkey) => pubkey, + NextMessageHop::ShortChannelId(scid) => match self.node_id_lookup.next_node_id(scid) { + Some(pubkey) => pubkey, + None => { + log_trace!(self.logger, "Dropping forwarded onion messager: unable to resolve next hop using SCID {}", scid); + return + }, + }, + }; + + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&next_node_id, &message_recipients) { + log_trace!( + logger, + "Dropping forwarded onion message to peer {}: outbound buffer full", + next_node_id); return } #[cfg(fuzzing)] - pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new); - - match pending_per_peer_msgs.entry(next_node_id) { - hash_map::Entry::Vacant(_) => { - log_trace!(self.logger, "Dropping forwarded onion message to disconnected peer {:?}", next_node_id); + message_recipients + .entry(next_node_id) + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())); + + match message_recipients.entry(next_node_id) { + hash_map::Entry::Occupied(mut e) if matches!( + e.get(), OnionMessageRecipient::ConnectedPeer(..) + ) => { + e.get_mut().enqueue_message(onion_message); + log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id); + }, + _ if self.intercept_messages_for_offline_peers => { + self.enqueue_event( + Event::OnionMessageIntercepted { + peer_node_id: next_node_id, message: onion_message + } + ); + }, + _ => { + log_trace!( + logger, + "Dropping forwarded onion message to disconnected peer {}", + next_node_id); return }, - hash_map::Entry::Occupied(mut e) => { - e.get_mut().push_back(onion_message); - log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id); - } } }, Err(e) => { - log_error!(self.logger, "Failed to process onion message {:?}", e); + log_error!(logger, "Failed to process onion message {:?}", e); } } } fn peer_connected(&self, their_node_id: &PublicKey, init: &msgs::Init, _inbound: bool) -> Result<(), ()> { if init.features.supports_onion_messages() { - let mut peers = self.pending_messages.lock().unwrap(); - peers.insert(their_node_id.clone(), VecDeque::new()); + self.message_recipients.lock().unwrap() + .entry(*their_node_id) + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) + .mark_connected(); + if self.intercept_messages_for_offline_peers { + self.enqueue_event( + Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } + ); + } + } else { + self.message_recipients.lock().unwrap().remove(their_node_id); } + Ok(()) } fn peer_disconnected(&self, their_node_id: &PublicKey) { - let mut pending_msgs = self.pending_messages.lock().unwrap(); - pending_msgs.remove(their_node_id); + match self.message_recipients.lock().unwrap().remove(their_node_id) { + Some(OnionMessageRecipient::ConnectedPeer(..)) => {}, + Some(_) => debug_assert!(false), + None => {}, + } + } + + fn timer_tick_occurred(&self) { + let mut message_recipients = self.message_recipients.lock().unwrap(); + + // Drop any pending recipients since the last call to avoid retaining buffered messages for + // too long. + message_recipients.retain(|_, recipient| match recipient { + OnionMessageRecipient::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS, + OnionMessageRecipient::PendingConnection(_, Some(_), _) => true, + _ => true, + }); + + // Increment a timer tick for pending recipients so that their buffered messages are dropped + // at MAX_TIMER_TICKS. + for recipient in message_recipients.values_mut() { + if let OnionMessageRecipient::PendingConnection(_, None, ticks) = recipient { + *ticks += 1; + } + } } fn provided_node_features(&self) -> NodeFeatures { @@ -715,7 +1304,7 @@ where let PendingOnionMessage { contents, destination, reply_path } = message; #[cfg(c_bindings)] let (contents, destination, reply_path) = message; - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( contents, destination, reply_path, format_args!("when sending OffersMessage") ); } @@ -726,16 +1315,14 @@ where let PendingOnionMessage { contents, destination, reply_path } = message; #[cfg(c_bindings)] let (contents, destination, reply_path) = message; - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( contents, destination, reply_path, format_args!("when sending CustomMessage") ); } - let mut pending_msgs = self.pending_messages.lock().unwrap(); - if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) { - return msgs.pop_front() - } - None + self.message_recipients.lock().unwrap() + .get_mut(&peer_node_id) + .and_then(|buffer| buffer.dequeue_message()) } } @@ -753,7 +1340,8 @@ pub type SimpleArcOnionMessenger = OnionMessenger< Arc, Arc, Arc, - Arc, + Arc>, + Arc>>, Arc, Arc>>, Arc>, IgnoringMessageHandler >; @@ -772,8 +1360,9 @@ pub type SimpleRefOnionMessenger< &'a KeysManager, &'a KeysManager, &'b L, - &'i DefaultMessageRouter, - &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, + &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, + &'j DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>, + &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, IgnoringMessageHandler >; @@ -782,14 +1371,23 @@ pub type SimpleRefOnionMessenger< fn packet_payloads_and_keys( secp_ctx: &Secp256k1, unblinded_path: &[PublicKey], destination: Destination, message: T, mut reply_path: Option, session_priv: &SecretKey -) -> Result<(Vec<(Payload, [u8; 32])>, Vec), secp256k1::Error> { +) -> Result<(Vec<(Payload, [u8; 32])>, Vec), SendError> { let num_hops = unblinded_path.len() + destination.num_hops(); let mut payloads = Vec::with_capacity(num_hops); let mut onion_packet_keys = Vec::with_capacity(num_hops); - let (mut intro_node_id_blinding_pt, num_blinded_hops) = if let Destination::BlindedPath(BlindedPath { - introduction_node_id, blinding_point, blinded_hops }) = &destination { - (Some((*introduction_node_id, *blinding_point)), blinded_hops.len()) } else { (None, 0) }; + let (mut intro_node_id_blinding_pt, num_blinded_hops) = match &destination { + Destination::Node(_) => (None, 0), + Destination::BlindedPath(BlindedPath { introduction_node, blinding_point, blinded_hops }) => { + let introduction_node_id = match introduction_node { + IntroductionNode::NodeId(pubkey) => pubkey, + IntroductionNode::DirectedShortChannelId(..) => { + return Err(SendError::UnresolvedIntroductionNode); + }, + }; + (Some((*introduction_node_id, *blinding_point)), blinded_hops.len()) + }, + }; let num_unblinded_hops = num_hops - num_blinded_hops; let mut unblinded_path_idx = 0; @@ -802,7 +1400,7 @@ fn packet_payloads_and_keys