X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fonion_message%2Fmessenger.rs;h=9f6c5cb435276ffbc4a76793f1f9fea624986de4;hb=f5ee8c23a1a52bc51c00eb8a62b0b5226acee80b;hp=8d98e284e954b15032232b04e2fe7d4156339b7f;hpb=8412e8368c670176fbcac7e6fa1a98a3916972e6;p=rust-lightning diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 8d98e284..9f6c5cb4 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -7,8 +7,8 @@ // You may not use this file except in accordance with one or both of these // licenses. -//! LDK sends, receives, and forwards onion messages via the [`OnionMessenger`]. See its docs for -//! more information. +//! LDK sends, receives, and forwards onion messages via this [`OnionMessenger`], which lives here, +//! as well as various types, traits, and utilities that it uses. use bitcoin::hashes::{Hash, HashEngine}; use bitcoin::hashes::hmac::{Hmac, HmacEngine}; @@ -18,26 +18,35 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; use crate::blinded_path::BlindedPath; use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, ReceiveTlvs}; use crate::blinded_path::utils; -use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient}; -#[cfg(not(c_bindings))] -use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; +use crate::events::{Event, EventHandler, EventsProvider}; +use crate::sign::{EntropySource, NodeSigner, Recipient}; use crate::ln::features::{InitFeatures, NodeFeatures}; -use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler}; +use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress}; use crate::ln::onion_utils; -use crate::ln::peer_handler::IgnoringMessageHandler; -pub use super::packet::OnionMessageContents; +use crate::routing::gossip::{NetworkGraph, NodeId}; +use super::packet::OnionMessageContents; use super::packet::ParsedOnionMessageContents; use super::offers::OffersMessageHandler; use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN}; -use crate::util::logger::Logger; +use crate::util::logger::{Logger, WithContext}; use crate::util::ser::Writeable; use core::fmt; use core::ops::Deref; use crate::io; -use crate::sync::{Arc, Mutex}; +use crate::sync::Mutex; use crate::prelude::*; +#[cfg(not(c_bindings))] +use { + crate::sign::KeysManager, + crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}, + crate::ln::peer_handler::IgnoringMessageHandler, + crate::sync::Arc, +}; + +pub(super) const MAX_TIMER_TICKS: usize = 2; + /// A sender, receiver and forwarder of [`OnionMessage`]s. /// /// # Handling Messages @@ -60,11 +69,12 @@ use crate::prelude::*; /// # extern crate bitcoin; /// # use bitcoin::hashes::_export::_core::time::Duration; /// # use bitcoin::hashes::hex::FromHex; -/// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; +/// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey, self}; /// # use lightning::blinded_path::BlindedPath; -/// # use lightning::sign::KeysManager; +/// # use lightning::sign::{EntropySource, KeysManager}; /// # use lightning::ln::peer_handler::IgnoringMessageHandler; -/// # use lightning::onion_message::{OnionMessageContents, Destination, MessageRouter, OnionMessagePath, OnionMessenger}; +/// # use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath, OnionMessenger}; +/// # use lightning::onion_message::packet::OnionMessageContents; /// # use lightning::util::logger::{Logger, Record}; /// # use lightning::util::ser::{Writeable, Writer}; /// # use lightning::io; @@ -83,8 +93,14 @@ use crate::prelude::*; /// # Ok(OnionMessagePath { /// # intermediate_nodes: vec![hop_node_id1, hop_node_id2], /// # destination, +/// # first_node_addresses: None, /// # }) /// # } +/// # fn create_blinded_paths( +/// # &self, _recipient: PublicKey, _peers: Vec, _secp_ctx: &Secp256k1 +/// # ) -> Result, ()> { +/// # unreachable!() +/// # } /// # } /// # let seed = [42u8; 32]; /// # let time = Duration::from_secs(123456); @@ -146,12 +162,12 @@ where L::Target: Logger, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, - CMH:: Target: CustomOnionMessageHandler, + CMH::Target: CustomOnionMessageHandler, { entropy_source: ES, node_signer: NS, logger: L, - message_buffers: Mutex>, + message_recipients: Mutex>, secp_ctx: Secp256k1, message_router: MR, offers_handler: OMH, @@ -159,26 +175,31 @@ where } /// [`OnionMessage`]s buffered to be sent. -enum OnionMessageBuffer { +enum OnionMessageRecipient { /// Messages for a node connected as a peer. ConnectedPeer(VecDeque), - /// Messages for a node that is not yet connected. - PendingConnection(VecDeque), + /// Messages for a node that is not yet connected, which are dropped after [`MAX_TIMER_TICKS`] + /// and tracked here. + PendingConnection(VecDeque, Option>, usize), } -impl OnionMessageBuffer { +impl OnionMessageRecipient { + fn pending_connection(addresses: Vec) -> Self { + Self::PendingConnection(VecDeque::new(), Some(addresses), 0) + } + fn pending_messages(&self) -> &VecDeque { match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, } } fn enqueue_message(&mut self, message: OnionMessage) { let pending_messages = match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, }; pending_messages.push_back(message); @@ -186,8 +207,8 @@ impl OnionMessageBuffer { fn dequeue_message(&mut self) -> Option { let pending_messages = match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => { debug_assert!(false); pending_messages }, @@ -199,18 +220,25 @@ impl OnionMessageBuffer { #[cfg(test)] fn release_pending_messages(&mut self) -> VecDeque { let pending_messages = match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, }; core::mem::take(pending_messages) } fn mark_connected(&mut self) { - if let OnionMessageBuffer::PendingConnection(pending_messages) = self { + if let OnionMessageRecipient::PendingConnection(pending_messages, _, _) = self { let mut new_pending_messages = VecDeque::new(); core::mem::swap(pending_messages, &mut new_pending_messages); - *self = OnionMessageBuffer::ConnectedPeer(new_pending_messages); + *self = OnionMessageRecipient::ConnectedPeer(new_pending_messages); + } + } + + fn is_connected(&self) -> bool { + match self { + OnionMessageRecipient::ConnectedPeer(..) => true, + OnionMessageRecipient::PendingConnection(..) => false, } } } @@ -236,7 +264,7 @@ pub struct PendingOnionMessage { /// /// These are obtained when released from [`OnionMessenger`]'s handlers after which they are /// enqueued for sending. -pub type PendingOnionMessage = (T, Destination, Option); +pub type PendingOnionMessage = (T, Destination, Option); pub(crate) fn new_pending_onion_message( contents: T, destination: Destination, reply_path: Option @@ -253,19 +281,119 @@ pub trait MessageRouter { fn find_path( &self, sender: PublicKey, peers: Vec, destination: Destination ) -> Result; + + /// Creates [`BlindedPath`]s to the `recipient` node. The nodes in `peers` are assumed to be + /// direct peers with the `recipient`. + fn create_blinded_paths< + T: secp256k1::Signing + secp256k1::Verification + >( + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()>; } /// A [`MessageRouter`] that can only route to a directly connected [`Destination`]. -pub struct DefaultMessageRouter; +pub struct DefaultMessageRouter>, L: Deref, ES: Deref> +where + L::Target: Logger, + ES::Target: EntropySource, +{ + network_graph: G, + entropy_source: ES, +} -impl MessageRouter for DefaultMessageRouter { +impl>, L: Deref, ES: Deref> DefaultMessageRouter +where + L::Target: Logger, + ES::Target: EntropySource, +{ + /// Creates a [`DefaultMessageRouter`] using the given [`NetworkGraph`]. + pub fn new(network_graph: G, entropy_source: ES) -> Self { + Self { network_graph, entropy_source } + } +} + +impl>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter +where + L::Target: Logger, + ES::Target: EntropySource, +{ fn find_path( &self, _sender: PublicKey, peers: Vec, destination: Destination ) -> Result { - if peers.contains(&destination.first_node()) { - Ok(OnionMessagePath { intermediate_nodes: vec![], destination }) + let first_node = destination.first_node(); + if peers.contains(&first_node) { + Ok(OnionMessagePath { + intermediate_nodes: vec![], destination, first_node_addresses: None + }) } else { - Err(()) + let network_graph = self.network_graph.deref().read_only(); + let node_announcement = network_graph + .node(&NodeId::from_pubkey(&first_node)) + .and_then(|node_info| node_info.announcement_info.as_ref()) + .and_then(|announcement_info| announcement_info.announcement_message.as_ref()) + .map(|node_announcement| &node_announcement.contents); + + match node_announcement { + Some(node_announcement) if node_announcement.features.supports_onion_messages() => { + let first_node_addresses = Some(node_announcement.addresses.clone()); + Ok(OnionMessagePath { + intermediate_nodes: vec![], destination, first_node_addresses + }) + }, + _ => Err(()), + } + } + } + + fn create_blinded_paths< + T: secp256k1::Signing + secp256k1::Verification + >( + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()> { + // Limit the number of blinded paths that are computed. + const MAX_PATHS: usize = 3; + + // Ensure peers have at least three channels so that it is more difficult to infer the + // recipient's node_id. + const MIN_PEER_CHANNELS: usize = 3; + + let network_graph = self.network_graph.deref().read_only(); + let is_recipient_announced = + network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient)); + + let mut peer_info = peers.iter() + // Limit to peers with announced channels + .filter_map(|pubkey| + network_graph + .node(&NodeId::from_pubkey(pubkey)) + .filter(|info| info.channels.len() >= MIN_PEER_CHANNELS) + .map(|info| (*pubkey, info.is_tor_only(), info.channels.len())) + ) + // Exclude Tor-only nodes when the recipient is announced. + .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced)) + .collect::>(); + + // Prefer using non-Tor nodes with the most channels as the introduction node. + peer_info.sort_unstable_by(|(_, a_tor_only, a_channels), (_, b_tor_only, b_channels)| { + a_tor_only.cmp(b_tor_only).then(a_channels.cmp(b_channels).reverse()) + }); + + let paths = peer_info.into_iter() + .map(|(pubkey, _, _)| vec![pubkey, recipient]) + .map(|node_pks| BlindedPath::new_for_message(&node_pks, &*self.entropy_source, secp_ctx)) + .take(MAX_PATHS) + .collect::, _>>(); + + match paths { + Ok(paths) if !paths.is_empty() => Ok(paths), + _ => { + if is_recipient_announced { + BlindedPath::one_hop_for_message(recipient, &*self.entropy_source, secp_ctx) + .map(|path| vec![path]) + } else { + Err(()) + } + }, } } } @@ -278,10 +406,26 @@ pub struct OnionMessagePath { /// The recipient of the message. pub destination: Destination, + + /// Addresses that may be used to connect to [`OnionMessagePath::first_node`]. + /// + /// Only needs to be set if a connection to the node is required. [`OnionMessenger`] may use + /// this to initiate such a connection. + pub first_node_addresses: Option>, +} + +impl OnionMessagePath { + /// Returns the first node in the path. + pub fn first_node(&self) -> PublicKey { + self.intermediate_nodes + .first() + .copied() + .unwrap_or_else(|| self.destination.first_node()) + } } /// The destination of an onion message. -#[derive(Clone)] +#[derive(Clone, Hash, Debug, PartialEq, Eq)] pub enum Destination { /// We're sending this onion message to a node. Node(PublicKey), @@ -308,17 +452,20 @@ impl Destination { /// Result of successfully [sending an onion message]. /// /// [sending an onion message]: OnionMessenger::send_onion_message -#[derive(Debug, PartialEq, Eq)] +#[derive(Clone, Hash, Debug, PartialEq, Eq)] pub enum SendSuccess { /// The message was buffered and will be sent once it is processed by /// [`OnionMessageHandler::next_onion_message_for_peer`]. Buffered, + /// The message was buffered and will be sent once the node is connected as a peer and it is + /// processed by [`OnionMessageHandler::next_onion_message_for_peer`]. + BufferedAwaitingConnection(PublicKey), } /// Errors that may occur when [sending an onion message]. /// /// [sending an onion message]: OnionMessenger::send_onion_message -#[derive(Debug, PartialEq, Eq)] +#[derive(Clone, Hash, Debug, PartialEq, Eq)] pub enum SendError { /// Errored computing onion message packet keys. Secp256k1(secp256k1::Error), @@ -328,8 +475,8 @@ pub enum SendError { /// The provided [`Destination`] was an invalid [`BlindedPath`] due to not having any blinded /// hops. TooFewBlindedHops, - /// Our next-hop peer was offline or does not support onion message forwarding. - InvalidFirstHop, + /// The first hop is not a peer and doesn't have a known [`SocketAddress`]. + InvalidFirstHop(PublicKey), /// A path from the sender to the destination could not be found by the [`MessageRouter`]. PathNotFound, /// Onion message contents must have a TLV type >= 64. @@ -388,6 +535,7 @@ pub trait CustomOnionMessageHandler { /// A processed incoming onion message, containing either a Forward (another onion message) /// or a Receive payload with decrypted contents. +#[derive(Debug)] pub enum PeeledOnion { /// Forwarded onion, with the next node id and a new onion Forward(PublicKey, OnionMessage), @@ -398,16 +546,17 @@ pub enum PeeledOnion { /// Creates an [`OnionMessage`] with the given `contents` for sending to the destination of /// `path`. /// -/// Returns both the node id of the peer to send the message to and the message itself. +/// Returns the node id of the peer to send the message to, the message itself, and any addresses +/// need to connect to the first node. pub fn create_onion_message( entropy_source: &ES, node_signer: &NS, secp_ctx: &Secp256k1, path: OnionMessagePath, contents: T, reply_path: Option, -) -> Result<(PublicKey, OnionMessage), SendError> +) -> Result<(PublicKey, OnionMessage, Option>), SendError> where ES::Target: EntropySource, NS::Target: NodeSigner, { - let OnionMessagePath { intermediate_nodes, mut destination } = path; + let OnionMessagePath { intermediate_nodes, mut destination, first_node_addresses } = path; if let Destination::BlindedPath(BlindedPath { ref blinded_hops, .. }) = destination { if blinded_hops.is_empty() { return Err(SendError::TooFewBlindedHops); @@ -448,10 +597,8 @@ where let onion_routing_packet = construct_onion_message_packet( packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?; - Ok((first_node_id, OnionMessage { - blinding_point, - onion_routing_packet - })) + let message = OnionMessage { blinding_point, onion_routing_packet }; + Ok((first_node_id, message, first_node_addresses)) } /// Decode one layer of an incoming [`OnionMessage`]. @@ -572,7 +719,7 @@ where OnionMessenger { entropy_source, node_signer, - message_buffers: Mutex::new(HashMap::new()), + message_recipients: Mutex::new(new_hash_map()), secp_ctx, logger, message_router, @@ -581,6 +728,11 @@ where } } + #[cfg(test)] + pub(crate) fn set_offers_handler(&mut self, offers_handler: OMH) { + self.offers_handler = offers_handler; + } + /// Sends an [`OnionMessage`] with the given `contents` to `destination`. /// /// See [`OnionMessenger`] for example usage. @@ -596,21 +748,33 @@ where &self, contents: T, destination: Destination, reply_path: Option, log_suffix: fmt::Arguments ) -> Result { + let mut logger = WithContext::from(&self.logger, None, None); let result = self.find_path(destination) - .and_then(|path| self.enqueue_onion_message(path, contents, reply_path, log_suffix)); + .and_then(|path| { + let first_hop = path.intermediate_nodes.get(0).map(|p| *p); + logger = WithContext::from(&self.logger, first_hop, None); + self.enqueue_onion_message(path, contents, reply_path, log_suffix) + }); match result.as_ref() { Err(SendError::GetNodeIdFailed) => { - log_warn!(self.logger, "Unable to retrieve node id {}", log_suffix); + log_warn!(logger, "Unable to retrieve node id {}", log_suffix); }, Err(SendError::PathNotFound) => { - log_trace!(self.logger, "Failed to find path {}", log_suffix); + log_trace!(logger, "Failed to find path {}", log_suffix); }, Err(e) => { - log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e); + log_trace!(logger, "Failed sending onion message {}: {:?}", log_suffix, e); }, Ok(SendSuccess::Buffered) => { - log_trace!(self.logger, "Buffered onion message {}", log_suffix); + log_trace!(logger, "Buffered onion message {}", log_suffix); + }, + Ok(SendSuccess::BufferedAwaitingConnection(node_id)) => { + log_trace!( + logger, + "Buffered onion message waiting on peer connection {}: {}", + log_suffix, node_id + ); }, } @@ -622,9 +786,9 @@ where .get_node_id(Recipient::Node) .map_err(|_| SendError::GetNodeIdFailed)?; - let peers = self.message_buffers.lock().unwrap() + let peers = self.message_recipients.lock().unwrap() .iter() - .filter(|(_, buffer)| matches!(buffer, OnionMessageBuffer::ConnectedPeer(_))) + .filter(|(_, recipient)| matches!(recipient, OnionMessageRecipient::ConnectedPeer(_))) .map(|(node_id, _)| *node_id) .collect(); @@ -639,31 +803,50 @@ where ) -> Result { log_trace!(self.logger, "Constructing onion message {}: {:?}", log_suffix, contents); - let (first_node_id, onion_message) = create_onion_message( + let (first_node_id, onion_message, addresses) = create_onion_message( &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path )?; - let mut message_buffers = self.message_buffers.lock().unwrap(); - if outbound_buffer_full(&first_node_id, &message_buffers) { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&first_node_id, &message_recipients) { return Err(SendError::BufferFull); } - match message_buffers.entry(first_node_id) { - hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop), + match message_recipients.entry(first_node_id) { + hash_map::Entry::Vacant(e) => match addresses { + None => Err(SendError::InvalidFirstHop(first_node_id)), + Some(addresses) => { + e.insert(OnionMessageRecipient::pending_connection(addresses)) + .enqueue_message(onion_message); + Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) + }, + }, hash_map::Entry::Occupied(mut e) => { e.get_mut().enqueue_message(onion_message); - Ok(SendSuccess::Buffered) + if e.get().is_connected() { + Ok(SendSuccess::Buffered) + } else { + Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) + } }, } } - #[cfg(test)] - pub(super) fn send_onion_message_using_path( + #[cfg(any(test, feature = "_test_utils"))] + pub fn send_onion_message_using_path( &self, path: OnionMessagePath, contents: T, reply_path: Option ) -> Result { self.enqueue_onion_message(path, contents, reply_path, format_args!("")) } + pub(crate) fn peel_onion_message( + &self, msg: &OnionMessage + ) -> Result::Target as CustomOnionMessageHandler>::CustomMessage>, ()> { + peel_onion_message( + msg, &self.secp_ctx, &*self.node_signer, &*self.logger, &*self.custom_handler + ) + } + fn handle_onion_message_response( &self, response: Option, reply_path: Option, log_suffix: fmt::Arguments ) { @@ -683,18 +866,18 @@ where #[cfg(test)] pub(super) fn release_pending_msgs(&self) -> HashMap> { - let mut message_buffers = self.message_buffers.lock().unwrap(); - let mut msgs = HashMap::new(); + let mut message_recipients = self.message_recipients.lock().unwrap(); + let mut msgs = new_hash_map(); // We don't want to disconnect the peers by removing them entirely from the original map, so we // release the pending message buffers individually. - for (peer_node_id, buffer) in &mut *message_buffers { - msgs.insert(*peer_node_id, buffer.release_pending_messages()); + for (node_id, recipient) in &mut *message_recipients { + msgs.insert(*node_id, recipient.release_pending_messages()); } msgs } } -fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { +fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128; const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256; let mut total_buffered_bytes = 0; @@ -717,6 +900,27 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap EventsProvider +for OnionMessenger +where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + CMH::Target: CustomOnionMessageHandler, +{ + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); + } + } + } + } +} + impl OnionMessageHandler for OnionMessenger where @@ -727,14 +931,13 @@ where OMH::Target: OffersMessageHandler, CMH::Target: CustomOnionMessageHandler, { - fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &OnionMessage) { - match peel_onion_message( - msg, &self.secp_ctx, &*self.node_signer, &*self.logger, &*self.custom_handler - ) { + fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) { + let logger = WithContext::from(&self.logger, Some(*peer_node_id), None); + match self.peel_onion_message(msg) { Ok(PeeledOnion::Receive(message, path_id, reply_path)) => { log_trace!( - self.logger, - "Received an onion message with path_id {:02x?} and {} reply_path: {:?}", + logger, + "Received an onion message with path_id {:02x?} and {} reply_path: {:?}", path_id, if reply_path.is_some() { "a" } else { "no" }, message); match message { @@ -759,53 +962,80 @@ where } }, Ok(PeeledOnion::Forward(next_node_id, onion_message)) => { - let mut message_buffers = self.message_buffers.lock().unwrap(); - if outbound_buffer_full(&next_node_id, &message_buffers) { - log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id); + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&next_node_id, &message_recipients) { + log_trace!( + logger, + "Dropping forwarded onion message to peer {}: outbound buffer full", + next_node_id); return } #[cfg(fuzzing)] - message_buffers + message_recipients .entry(next_node_id) - .or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new())); + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())); - match message_buffers.entry(next_node_id) { + match message_recipients.entry(next_node_id) { hash_map::Entry::Occupied(mut e) if matches!( - e.get(), OnionMessageBuffer::ConnectedPeer(..) + e.get(), OnionMessageRecipient::ConnectedPeer(..) ) => { e.get_mut().enqueue_message(onion_message); - log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id); + log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id); }, _ => { - log_trace!(self.logger, "Dropping forwarded onion message to disconnected peer {:?}", next_node_id); + log_trace!( + logger, + "Dropping forwarded onion message to disconnected peer {}", + next_node_id); return }, } }, Err(e) => { - log_error!(self.logger, "Failed to process onion message {:?}", e); + log_error!(logger, "Failed to process onion message {:?}", e); } } } fn peer_connected(&self, their_node_id: &PublicKey, init: &msgs::Init, _inbound: bool) -> Result<(), ()> { if init.features.supports_onion_messages() { - self.message_buffers.lock().unwrap() + self.message_recipients.lock().unwrap() .entry(*their_node_id) - .or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new())) + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); } else { - self.message_buffers.lock().unwrap().remove(their_node_id); + self.message_recipients.lock().unwrap().remove(their_node_id); } Ok(()) } fn peer_disconnected(&self, their_node_id: &PublicKey) { - match self.message_buffers.lock().unwrap().remove(their_node_id) { - Some(OnionMessageBuffer::ConnectedPeer(..)) => {}, - _ => debug_assert!(false), + match self.message_recipients.lock().unwrap().remove(their_node_id) { + Some(OnionMessageRecipient::ConnectedPeer(..)) => {}, + Some(_) => debug_assert!(false), + None => {}, + } + } + + fn timer_tick_occurred(&self) { + let mut message_recipients = self.message_recipients.lock().unwrap(); + + // Drop any pending recipients since the last call to avoid retaining buffered messages for + // too long. + message_recipients.retain(|_, recipient| match recipient { + OnionMessageRecipient::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS, + OnionMessageRecipient::PendingConnection(_, Some(_), _) => true, + _ => true, + }); + + // Increment a timer tick for pending recipients so that their buffered messages are dropped + // at MAX_TIMER_TICKS. + for recipient in message_recipients.values_mut() { + if let OnionMessageRecipient::PendingConnection(_, None, ticks) = recipient { + *ticks += 1; + } } } @@ -847,7 +1077,7 @@ where ); } - self.message_buffers.lock().unwrap() + self.message_recipients.lock().unwrap() .get_mut(&peer_node_id) .and_then(|buffer| buffer.dequeue_message()) } @@ -867,7 +1097,7 @@ pub type SimpleArcOnionMessenger = OnionMessenger< Arc, Arc, Arc, - Arc, + Arc>>, Arc, Arc>>, Arc>, IgnoringMessageHandler >; @@ -886,7 +1116,7 @@ pub type SimpleRefOnionMessenger< &'a KeysManager, &'a KeysManager, &'b L, - &'i DefaultMessageRouter, + &'i DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>, &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, IgnoringMessageHandler >;