X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fonion_message%2Fmessenger.rs;h=8d4fb044f5af14a7c15b1657f4f4344b7a8756b4;hb=d46519bbd0edb76a2d6dd8d33dac057d49576a46;hp=2c42566e733bd205157d7aeb2b882b0501512837;hpb=79f212b70a174d52cd6016d3c608e5ed9e68069b;p=rust-lightning diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 2c42566e..8d4fb044 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -18,13 +18,15 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; use crate::blinded_path::BlindedPath; use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, ReceiveTlvs}; use crate::blinded_path::utils; +use crate::events::{Event, EventHandler, EventsProvider}; use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient}; #[cfg(not(c_bindings))] use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use crate::ln::features::{InitFeatures, NodeFeatures}; -use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler}; +use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress}; use crate::ln::onion_utils; use crate::ln::peer_handler::IgnoringMessageHandler; +use crate::routing::gossip::{NetworkGraph, NodeId}; pub use super::packet::OnionMessageContents; use super::packet::ParsedOnionMessageContents; use super::offers::OffersMessageHandler; @@ -38,6 +40,8 @@ use crate::io; use crate::sync::{Arc, Mutex}; use crate::prelude::*; +pub(super) const MAX_TIMER_TICKS: usize = 2; + /// A sender, receiver and forwarder of [`OnionMessage`]s. /// /// # Handling Messages @@ -76,7 +80,15 @@ use crate::prelude::*; /// # struct FakeMessageRouter {} /// # impl MessageRouter for FakeMessageRouter { /// # fn find_path(&self, sender: PublicKey, peers: Vec, destination: Destination) -> Result { -/// # unimplemented!() +/// # let secp_ctx = Secp256k1::new(); +/// # let node_secret = SecretKey::from_slice(&>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap(); +/// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret); +/// # let hop_node_id2 = hop_node_id1; +/// # Ok(OnionMessagePath { +/// # intermediate_nodes: vec![hop_node_id1, hop_node_id2], +/// # destination, +/// # addresses: None, +/// # }) /// # } /// # } /// # let seed = [42u8; 32]; @@ -86,7 +98,7 @@ use crate::prelude::*; /// # let node_secret = SecretKey::from_slice(&>::from_hex("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap(); /// # let secp_ctx = Secp256k1::new(); /// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret); -/// # let (hop_node_id2, hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1, hop_node_id1); +/// # let (hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1); /// # let destination_node_id = hop_node_id1; /// # let message_router = Arc::new(FakeMessageRouter {}); /// # let custom_message_handler = IgnoringMessageHandler {}; @@ -113,13 +125,10 @@ use crate::prelude::*; /// } /// } /// // Send a custom onion message to a node id. -/// let path = OnionMessagePath { -/// intermediate_nodes: vec![hop_node_id1, hop_node_id2], -/// destination: Destination::Node(destination_node_id), -/// }; +/// let destination = Destination::Node(destination_node_id); /// let reply_path = None; /// # let message = YourCustomMessage {}; -/// onion_messenger.send_onion_message(path, message, reply_path); +/// onion_messenger.send_onion_message(message, destination, reply_path); /// /// // Create a blinded path to yourself, for someone to send an onion message to. /// # let your_node_id = hop_node_id1; @@ -127,13 +136,10 @@ use crate::prelude::*; /// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap(); /// /// // Send a custom onion message to a blinded path. -/// let path = OnionMessagePath { -/// intermediate_nodes: vec![hop_node_id1, hop_node_id2], -/// destination: Destination::BlindedPath(blinded_path), -/// }; +/// let destination = Destination::BlindedPath(blinded_path); /// let reply_path = None; /// # let message = YourCustomMessage {}; -/// onion_messenger.send_onion_message(path, message, reply_path); +/// onion_messenger.send_onion_message(message, destination, reply_path); /// ``` /// /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest @@ -145,7 +151,7 @@ where L::Target: Logger, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, - CMH:: Target: CustomOnionMessageHandler, + CMH::Target: CustomOnionMessageHandler, { entropy_source: ES, node_signer: NS, @@ -162,22 +168,27 @@ enum OnionMessageBuffer { /// Messages for a node connected as a peer. ConnectedPeer(VecDeque), - /// Messages for a node that is not yet connected. - PendingConnection(VecDeque), + /// Messages for a node that is not yet connected, which are dropped after [`MAX_TIMER_TICKS`] + /// and tracked here. + PendingConnection(VecDeque, Option>, usize), } impl OnionMessageBuffer { + fn pending_connection(addresses: Vec) -> Self { + Self::PendingConnection(VecDeque::new(), Some(addresses), 0) + } + fn pending_messages(&self) -> &VecDeque { match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, } } fn enqueue_message(&mut self, message: OnionMessage) { let pending_messages = match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, }; pending_messages.push_back(message); @@ -186,7 +197,7 @@ impl OnionMessageBuffer { fn dequeue_message(&mut self) -> Option { let pending_messages = match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => { + OnionMessageBuffer::PendingConnection(pending_messages, _, _) => { debug_assert!(false); pending_messages }, @@ -199,14 +210,14 @@ impl OnionMessageBuffer { fn release_pending_messages(&mut self) -> VecDeque { let pending_messages = match self { OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages) => pending_messages, + OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, }; core::mem::take(pending_messages) } fn mark_connected(&mut self) { - if let OnionMessageBuffer::PendingConnection(pending_messages) = self { + if let OnionMessageBuffer::PendingConnection(pending_messages, _, _) = self { let mut new_pending_messages = VecDeque::new(); core::mem::swap(pending_messages, &mut new_pending_messages); *self = OnionMessageBuffer::ConnectedPeer(new_pending_messages); @@ -255,16 +266,48 @@ pub trait MessageRouter { } /// A [`MessageRouter`] that can only route to a directly connected [`Destination`]. -pub struct DefaultMessageRouter; +pub struct DefaultMessageRouter>, L: Deref> +where + L::Target: Logger, +{ + network_graph: G, +} -impl MessageRouter for DefaultMessageRouter { +impl>, L: Deref> DefaultMessageRouter +where + L::Target: Logger, +{ + /// Creates a [`DefaultMessageRouter`] using the given [`NetworkGraph`]. + pub fn new(network_graph: G) -> Self { + Self { network_graph } + } +} + +impl>, L: Deref> MessageRouter for DefaultMessageRouter +where + L::Target: Logger, +{ fn find_path( &self, _sender: PublicKey, peers: Vec, destination: Destination ) -> Result { - if peers.contains(&destination.first_node()) { - Ok(OnionMessagePath { intermediate_nodes: vec![], destination }) + let first_node = destination.first_node(); + if peers.contains(&first_node) { + Ok(OnionMessagePath { intermediate_nodes: vec![], destination, addresses: None }) } else { - Err(()) + let network_graph = self.network_graph.deref().read_only(); + let node_announcement = network_graph + .node(&NodeId::from_pubkey(&first_node)) + .and_then(|node_info| node_info.announcement_info.as_ref()) + .and_then(|announcement_info| announcement_info.announcement_message.as_ref()) + .map(|node_announcement| &node_announcement.contents); + + match node_announcement { + Some(node_announcement) if node_announcement.features.supports_onion_messages() => { + let addresses = Some(node_announcement.addresses.clone()); + Ok(OnionMessagePath { intermediate_nodes: vec![], destination, addresses }) + }, + _ => Err(()), + } } } } @@ -277,6 +320,22 @@ pub struct OnionMessagePath { /// The recipient of the message. pub destination: Destination, + + /// Addresses that may be used to connect to [`OnionMessagePath::first_node`]. + /// + /// Only needs to be set if a connection to the node is required. [`OnionMessenger`] may use + /// this to initiate such a connection. + pub addresses: Option>, +} + +impl OnionMessagePath { + /// Returns the first node in the path. + pub fn first_node(&self) -> PublicKey { + self.intermediate_nodes + .first() + .copied() + .unwrap_or_else(|| self.destination.first_node()) + } } /// The destination of an onion message. @@ -304,6 +363,19 @@ impl Destination { } } +/// Result of successfully [sending an onion message]. +/// +/// [sending an onion message]: OnionMessenger::send_onion_message +#[derive(Debug, PartialEq, Eq)] +pub enum SendSuccess { + /// The message was buffered and will be sent once it is processed by + /// [`OnionMessageHandler::next_onion_message_for_peer`]. + Buffered, + /// The message was buffered and will be sent once the node is connected as a peer and it is + /// processed by [`OnionMessageHandler::next_onion_message_for_peer`]. + BufferedAwaitingConnection(PublicKey), +} + /// Errors that may occur when [sending an onion message]. /// /// [sending an onion message]: OnionMessenger::send_onion_message @@ -317,8 +389,10 @@ pub enum SendError { /// The provided [`Destination`] was an invalid [`BlindedPath`] due to not having any blinded /// hops. TooFewBlindedHops, - /// Our next-hop peer was offline or does not support onion message forwarding. - InvalidFirstHop, + /// The first hop is not a peer and doesn't have a known [`SocketAddress`]. + InvalidFirstHop(PublicKey), + /// A path from the sender to the destination could not be found by the [`MessageRouter`]. + PathNotFound, /// Onion message contents must have a TLV type >= 64. InvalidMessage, /// Our next-hop peer's buffer was full or our total outbound buffer was full. @@ -389,12 +463,12 @@ pub enum PeeledOnion { pub fn create_onion_message( entropy_source: &ES, node_signer: &NS, secp_ctx: &Secp256k1, path: OnionMessagePath, contents: T, reply_path: Option, -) -> Result<(PublicKey, OnionMessage), SendError> +) -> Result<(PublicKey, OnionMessage, Option>), SendError> where ES::Target: EntropySource, NS::Target: NodeSigner, { - let OnionMessagePath { intermediate_nodes, mut destination } = path; + let OnionMessagePath { intermediate_nodes, mut destination, addresses } = path; if let Destination::BlindedPath(BlindedPath { ref blinded_hops, .. }) = destination { if blinded_hops.is_empty() { return Err(SendError::TooFewBlindedHops); @@ -435,10 +509,8 @@ where let onion_routing_packet = construct_onion_message_packet( packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?; - Ok((first_node_id, OnionMessage { - blinding_point, - onion_routing_packet - })) + let message = OnionMessage { blinding_point, onion_routing_packet }; + Ok((first_node_id, message, addresses)) } /// Decode one layer of an incoming [`OnionMessage`]. @@ -568,16 +640,71 @@ where } } - /// Sends an [`OnionMessage`] with the given `contents` for sending to the destination of - /// `path`. + /// Sends an [`OnionMessage`] with the given `contents` to `destination`. /// /// See [`OnionMessenger`] for example usage. pub fn send_onion_message( - &self, path: OnionMessagePath, contents: T, reply_path: Option - ) -> Result<(), SendError> { - log_trace!(self.logger, "Sending onion message: {:?}", contents); + &self, contents: T, destination: Destination, reply_path: Option + ) -> Result { + self.find_path_and_enqueue_onion_message( + contents, destination, reply_path, format_args!("") + ) + } + + fn find_path_and_enqueue_onion_message( + &self, contents: T, destination: Destination, reply_path: Option, + log_suffix: fmt::Arguments + ) -> Result { + let result = self.find_path(destination) + .and_then(|path| self.enqueue_onion_message(path, contents, reply_path, log_suffix)); + + match result.as_ref() { + Err(SendError::GetNodeIdFailed) => { + log_warn!(self.logger, "Unable to retrieve node id {}", log_suffix); + }, + Err(SendError::PathNotFound) => { + log_trace!(self.logger, "Failed to find path {}", log_suffix); + }, + Err(e) => { + log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e); + }, + Ok(SendSuccess::Buffered) => { + log_trace!(self.logger, "Buffered onion message {}", log_suffix); + }, + Ok(SendSuccess::BufferedAwaitingConnection(node_id)) => { + log_trace!( + self.logger, "Buffered onion message waiting on peer connection {}: {:?}", + log_suffix, node_id + ); + }, + } + + result + } + + fn find_path(&self, destination: Destination) -> Result { + let sender = self.node_signer + .get_node_id(Recipient::Node) + .map_err(|_| SendError::GetNodeIdFailed)?; - let (first_node_id, onion_message) = create_onion_message( + let peers = self.message_buffers.lock().unwrap() + .iter() + .filter(|(_, buffer)| matches!(buffer, OnionMessageBuffer::ConnectedPeer(_))) + .map(|(node_id, _)| *node_id) + .collect(); + + self.message_router + .find_path(sender, peers, destination) + .map_err(|_| SendError::PathNotFound) + } + + fn enqueue_onion_message( + &self, path: OnionMessagePath, contents: T, reply_path: Option, + log_suffix: fmt::Arguments + ) -> Result { + log_trace!(self.logger, "Constructing onion message {}: {:?}", log_suffix, contents); + + let (first_node_id, onion_message, addresses) = create_onion_message( &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path )?; @@ -587,21 +714,35 @@ where } match message_buffers.entry(first_node_id) { - hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop), + hash_map::Entry::Vacant(e) => match addresses { + None => Err(SendError::InvalidFirstHop(first_node_id)), + Some(addresses) => { + e.insert(OnionMessageBuffer::pending_connection(addresses)) + .enqueue_message(onion_message); + Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) + }, + }, hash_map::Entry::Occupied(mut e) => { e.get_mut().enqueue_message(onion_message); - Ok(()) + Ok(SendSuccess::Buffered) }, } } + #[cfg(test)] + pub(super) fn send_onion_message_using_path( + &self, path: OnionMessagePath, contents: T, reply_path: Option + ) -> Result { + self.enqueue_onion_message(path, contents, reply_path, format_args!("")) + } + fn handle_onion_message_response( &self, response: Option, reply_path: Option, log_suffix: fmt::Arguments ) { if let Some(response) = response { match reply_path { Some(reply_path) => { - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( response, Destination::BlindedPath(reply_path), None, log_suffix ); }, @@ -612,34 +753,6 @@ where } } - fn find_path_and_enqueue_onion_message( - &self, contents: T, destination: Destination, reply_path: Option, - log_suffix: fmt::Arguments - ) { - let sender = match self.node_signer.get_node_id(Recipient::Node) { - Ok(node_id) => node_id, - Err(_) => { - log_warn!(self.logger, "Unable to retrieve node id {}", log_suffix); - return; - } - }; - - let peers = self.message_buffers.lock().unwrap().keys().copied().collect(); - let path = match self.message_router.find_path(sender, peers, destination) { - Ok(path) => path, - Err(()) => { - log_trace!(self.logger, "Failed to find path {}", log_suffix); - return; - }, - }; - - log_trace!(self.logger, "Sending onion message {}: {:?}", log_suffix, contents); - - if let Err(e) = self.send_onion_message(path, contents, reply_path) { - log_trace!(self.logger, "Failed sending onion message {}: {:?}", log_suffix, e); - } - } - #[cfg(test)] pub(super) fn release_pending_msgs(&self) -> HashMap> { let mut message_buffers = self.message_buffers.lock().unwrap(); @@ -676,6 +789,27 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap EventsProvider +for OnionMessenger +where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + CMH::Target: CustomOnionMessageHandler, +{ + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + for (node_id, recipient) in self.message_buffers.lock().unwrap().iter_mut() { + if let OnionMessageBuffer::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); + } + } + } + } +} + impl OnionMessageHandler for OnionMessenger where @@ -768,6 +902,26 @@ where } } + fn timer_tick_occurred(&self) { + let mut message_buffers = self.message_buffers.lock().unwrap(); + + // Drop any pending recipients since the last call to avoid retaining buffered messages for + // too long. + message_buffers.retain(|_, recipient| match recipient { + OnionMessageBuffer::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS, + OnionMessageBuffer::PendingConnection(_, Some(_), _) => true, + _ => true, + }); + + // Increment a timer tick for pending recipients so that their buffered messages are dropped + // at MAX_TIMER_TICKS. + for recipient in message_buffers.values_mut() { + if let OnionMessageBuffer::PendingConnection(_, None, ticks) = recipient { + *ticks += 1; + } + } + } + fn provided_node_features(&self) -> NodeFeatures { let mut features = NodeFeatures::empty(); features.set_onion_messages_optional(); @@ -790,7 +944,7 @@ where let PendingOnionMessage { contents, destination, reply_path } = message; #[cfg(c_bindings)] let (contents, destination, reply_path) = message; - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( contents, destination, reply_path, format_args!("when sending OffersMessage") ); } @@ -801,7 +955,7 @@ where let PendingOnionMessage { contents, destination, reply_path } = message; #[cfg(c_bindings)] let (contents, destination, reply_path) = message; - self.find_path_and_enqueue_onion_message( + let _ = self.find_path_and_enqueue_onion_message( contents, destination, reply_path, format_args!("when sending CustomMessage") ); } @@ -826,7 +980,7 @@ pub type SimpleArcOnionMessenger = OnionMessenger< Arc, Arc, Arc, - Arc, + Arc>>, Arc>>, Arc>, IgnoringMessageHandler >; @@ -845,7 +999,7 @@ pub type SimpleRefOnionMessenger< &'a KeysManager, &'a KeysManager, &'b L, - &'i DefaultMessageRouter, + &'i DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L>, &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, IgnoringMessageHandler >;