X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fonion_message%2Fmessenger.rs;h=c991c5bccc574cf3b0cd866670bcbc8d22572c92;hb=445ec8d50840c9a3377bba80351c09df46527060;hp=9043eccbe74013dfc91b5d58183e46d1f7bf65ca;hpb=9ef61748e419e3e993784ee1c7f785ed434f5c67;p=rust-lightning diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 9043eccb..c991c5bc 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -16,7 +16,7 @@ use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp}; -use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, ReceiveTlvs}; +use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, ReceiveTlvs}; use crate::blinded_path::utils; use crate::events::{Event, EventHandler, EventsProvider}; use crate::sign::{EntropySource, NodeSigner, Recipient}; @@ -135,6 +135,7 @@ for OnionMessenger where /// # use bitcoin::hashes::hex::FromHex; /// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey, self}; /// # use lightning::blinded_path::{BlindedPath, EmptyNodeIdLookUp}; +/// # use lightning::blinded_path::message::ForwardNode; /// # use lightning::sign::{EntropySource, KeysManager}; /// # use lightning::ln::peer_handler::IgnoringMessageHandler; /// # use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath, OnionMessenger}; @@ -209,8 +210,11 @@ for OnionMessenger where /// /// // Create a blinded path to yourself, for someone to send an onion message to. /// # let your_node_id = hop_node_id1; -/// let hops = [hop_node_id3, hop_node_id4, your_node_id]; -/// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap(); +/// let hops = [ +/// ForwardNode { node_id: hop_node_id3, short_channel_id: None }, +/// ForwardNode { node_id: hop_node_id4, short_channel_id: None }, +/// ]; +/// let blinded_path = BlindedPath::new_for_message(&hops, your_node_id, &keys_manager, &secp_ctx).unwrap(); /// /// // Send a custom onion message to a blinded path. /// let destination = Destination::BlindedPath(blinded_path); @@ -321,22 +325,30 @@ impl OnionMessageRecipient { /// The `Responder` struct creates an appropriate [`ResponseInstruction`] /// for responding to a message. +#[derive(Clone, Debug, Eq, PartialEq)] pub struct Responder { /// The path along which a response can be sent. reply_path: BlindedPath, path_id: Option<[u8; 32]> } +impl_writeable_tlv_based!(Responder, { + (0, reply_path, required), + (2, path_id, option), +}); + impl Responder { /// Creates a new [`Responder`] instance with the provided reply path. - fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self { + pub(super) fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self { Responder { reply_path, path_id, } } - /// Creates the appropriate [`ResponseInstruction`] for a given response. + /// Creates a [`ResponseInstruction::WithoutReplyPath`] for a given response. + /// + /// Use when the recipient doesn't need to send back a reply to us. pub fn respond(self, response: T) -> ResponseInstruction { ResponseInstruction::WithoutReplyPath(OnionMessageResponse { message: response, @@ -344,6 +356,17 @@ impl Responder { path_id: self.path_id, }) } + + /// Creates a [`ResponseInstruction::WithReplyPath`] for a given response. + /// + /// Use when the recipient needs to send back a reply to us. + pub fn respond_with_reply_path(self, response: T) -> ResponseInstruction { + ResponseInstruction::WithReplyPath(OnionMessageResponse { + message: response, + reply_path: self.reply_path, + path_id: self.path_id, + }) + } } /// This struct contains the information needed to reply to a received message. @@ -355,6 +378,9 @@ pub struct OnionMessageResponse { /// `ResponseInstruction` represents instructions for responding to received messages. pub enum ResponseInstruction { + /// Indicates that a response should be sent including a reply path for + /// the recipient to respond back. + WithReplyPath(OnionMessageResponse), /// Indicates that a response should be sent without including a reply path /// for the recipient to respond back. WithoutReplyPath(OnionMessageResponse), @@ -408,9 +434,41 @@ pub trait MessageRouter { >( &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, ) -> Result, ()>; + + /// Creates compact [`BlindedPath`]s to the `recipient` node. The nodes in `peers` are assumed + /// to be direct peers with the `recipient`. + /// + /// Compact blinded paths use short channel ids instead of pubkeys for a smaller serialization, + /// which is beneficial when a QR code is used to transport the data. The SCID is passed using a + /// [`ForwardNode`] but may be `None` for graceful degradation. + /// + /// Implementations using additional intermediate nodes are responsible for using a + /// [`ForwardNode`] with `Some` short channel id, if possible. Similarly, implementations should + /// call [`BlindedPath::use_compact_introduction_node`]. + /// + /// The provided implementation simply delegates to [`MessageRouter::create_blinded_paths`], + /// ignoring the short channel ids. + fn create_compact_blinded_paths< + T: secp256k1::Signing + secp256k1::Verification + >( + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()> { + let peers = peers + .into_iter() + .map(|ForwardNode { node_id, short_channel_id: _ }| node_id) + .collect(); + self.create_blinded_paths(recipient, peers, secp_ctx) + } } /// A [`MessageRouter`] that can only route to a directly connected [`Destination`]. +/// +/// # Privacy +/// +/// Creating [`BlindedPath`]s may affect privacy since, if a suitable path cannot be found, it will +/// create a one-hop path using the recipient as the introduction node if it is a announced node. +/// Otherwise, there is no way to find a path to the introduction node in order to send a message, +/// and thus an `Err` is returned. pub struct DefaultMessageRouter>, L: Deref, ES: Deref> where L::Target: Logger, @@ -429,51 +487,12 @@ where pub fn new(network_graph: G, entropy_source: ES) -> Self { Self { network_graph, entropy_source } } -} - -impl>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter -where - L::Target: Logger, - ES::Target: EntropySource, -{ - fn find_path( - &self, sender: PublicKey, peers: Vec, mut destination: Destination - ) -> Result { - let network_graph = self.network_graph.deref().read_only(); - destination.resolve(&network_graph); - - let first_node = match destination.first_node() { - Some(first_node) => first_node, - None => return Err(()), - }; - - if peers.contains(&first_node) || sender == first_node { - Ok(OnionMessagePath { - intermediate_nodes: vec![], destination, first_node_addresses: None - }) - } else { - let node_announcement = network_graph - .node(&NodeId::from_pubkey(&first_node)) - .and_then(|node_info| node_info.announcement_info.as_ref()) - .and_then(|announcement_info| announcement_info.announcement_message.as_ref()) - .map(|node_announcement| &node_announcement.contents); - - match node_announcement { - Some(node_announcement) if node_announcement.features.supports_onion_messages() => { - let first_node_addresses = Some(node_announcement.addresses.clone()); - Ok(OnionMessagePath { - intermediate_nodes: vec![], destination, first_node_addresses - }) - }, - _ => Err(()), - } - } - } - fn create_blinded_paths< + fn create_blinded_paths_from_iter< + I: Iterator, T: secp256k1::Signing + secp256k1::Verification >( - &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + &self, recipient: PublicKey, peers: I, secp_ctx: &Secp256k1, compact_paths: bool ) -> Result, ()> { // Limit the number of blinded paths that are computed. const MAX_PATHS: usize = 3; @@ -486,13 +505,13 @@ where let is_recipient_announced = network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient)); - let mut peer_info = peers.iter() + let mut peer_info = peers // Limit to peers with announced channels - .filter_map(|pubkey| + .filter_map(|peer| network_graph - .node(&NodeId::from_pubkey(pubkey)) + .node(&NodeId::from_pubkey(&peer.node_id)) .filter(|info| info.channels.len() >= MIN_PEER_CHANNELS) - .map(|info| (*pubkey, info.is_tor_only(), info.channels.len())) + .map(|info| (peer, info.is_tor_only(), info.channels.len())) ) // Exclude Tor-only nodes when the recipient is announced. .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced)) @@ -504,12 +523,13 @@ where }); let paths = peer_info.into_iter() - .map(|(pubkey, _, _)| vec![pubkey, recipient]) - .map(|node_pks| BlindedPath::new_for_message(&node_pks, &*self.entropy_source, secp_ctx)) + .map(|(peer, _, _)| { + BlindedPath::new_for_message(&[peer], recipient, &*self.entropy_source, secp_ctx) + }) .take(MAX_PATHS) .collect::, _>>(); - match paths { + let mut paths = match paths { Ok(paths) if !paths.is_empty() => Ok(paths), _ => { if is_recipient_announced { @@ -519,8 +539,74 @@ where Err(()) } }, + }?; + + if compact_paths { + for path in &mut paths { + path.use_compact_introduction_node(&network_graph); + } + } + + Ok(paths) + } +} + +impl>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter +where + L::Target: Logger, + ES::Target: EntropySource, +{ + fn find_path( + &self, sender: PublicKey, peers: Vec, mut destination: Destination + ) -> Result { + let network_graph = self.network_graph.deref().read_only(); + destination.resolve(&network_graph); + + let first_node = match destination.first_node() { + Some(first_node) => first_node, + None => return Err(()), + }; + + if peers.contains(&first_node) || sender == first_node { + Ok(OnionMessagePath { + intermediate_nodes: vec![], destination, first_node_addresses: None + }) + } else { + let node_details = network_graph + .node(&NodeId::from_pubkey(&first_node)) + .and_then(|node_info| node_info.announcement_info.as_ref()) + .map(|announcement_info| (announcement_info.features(), announcement_info.addresses())); + + match node_details { + Some((features, addresses)) if features.supports_onion_messages() && addresses.len() > 0 => { + let first_node_addresses = Some(addresses.clone()); + Ok(OnionMessagePath { + intermediate_nodes: vec![], destination, first_node_addresses + }) + }, + _ => Err(()), + } } } + + fn create_blinded_paths< + T: secp256k1::Signing + secp256k1::Verification + >( + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()> { + let peers = peers + .into_iter() + .map(|node_id| ForwardNode { node_id, short_channel_id: None }); + self.create_blinded_paths_from_iter(recipient, peers, secp_ctx, false) + } + + fn create_compact_blinded_paths< + T: secp256k1::Signing + secp256k1::Verification + >( + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()> { + self.create_blinded_paths_from_iter(recipient, peers.into_iter(), secp_ctx, true) + } } /// A path for sending an [`OnionMessage`]. @@ -623,7 +709,11 @@ pub enum SendError { TooFewBlindedHops, /// The first hop is not a peer and doesn't have a known [`SocketAddress`]. InvalidFirstHop(PublicKey), - /// A path from the sender to the destination could not be found by the [`MessageRouter`]. + /// Indicates that a path could not be found by the [`MessageRouter`]. + /// + /// This occurs when either: + /// - No path from the sender to the destination was found to send the onion message + /// - No reply path to the sender could be created when responding to an onion message PathNotFound, /// Onion message contents must have a TLV type >= 64. InvalidMessage, @@ -1043,6 +1133,24 @@ where .map_err(|_| SendError::PathNotFound) } + fn create_blinded_path(&self) -> Result { + let recipient = self.node_signer + .get_node_id(Recipient::Node) + .map_err(|_| SendError::GetNodeIdFailed)?; + let secp_ctx = &self.secp_ctx; + + let peers = self.message_recipients.lock().unwrap() + .iter() + .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_))) + .map(|(node_id, _ )| *node_id) + .collect::>(); + + self.message_router + .create_blinded_paths(recipient, peers, secp_ctx) + .and_then(|paths| paths.into_iter().next().ok_or(())) + .map_err(|_| SendError::PathNotFound) + } + fn enqueue_onion_message( &self, path: OnionMessagePath, contents: T, reply_path: Option, log_suffix: fmt::Arguments @@ -1115,20 +1223,48 @@ where ) } - fn handle_onion_message_response( + /// Handles the response to an [`OnionMessage`] based on its [`ResponseInstruction`], + /// enqueueing any response for sending. + /// + /// This function is useful for asynchronous handling of [`OnionMessage`]s. + /// Handlers have the option to return [`ResponseInstruction::NoResponse`], indicating that + /// no immediate response should be sent. Then, they can transfer the associated [`Responder`] + /// to another task responsible for generating the response asynchronously. Subsequently, when + /// the response is prepared and ready for sending, that task can invoke this method to enqueue + /// the response for delivery. + pub fn handle_onion_message_response( &self, response: ResponseInstruction - ) { - if let ResponseInstruction::WithoutReplyPath(response) = response { - let message_type = response.message.msg_type(); - let _ = self.find_path_and_enqueue_onion_message( - response.message, Destination::BlindedPath(response.reply_path), None, - format_args!( - "when responding with {} to an onion message with path_id {:02x?}", - message_type, - response.path_id - ) - ); - } + ) -> Result, SendError> { + let (response, create_reply_path) = match response { + ResponseInstruction::WithReplyPath(response) => (response, true), + ResponseInstruction::WithoutReplyPath(response) => (response, false), + ResponseInstruction::NoResponse => return Ok(None), + }; + + let message_type = response.message.msg_type(); + let reply_path = if create_reply_path { + match self.create_blinded_path() { + Ok(reply_path) => Some(reply_path), + Err(err) => { + log_trace!( + self.logger, + "Failed to create reply path when responding with {} to an onion message \ + with path_id {:02x?}: {:?}", + message_type, response.path_id, err + ); + return Err(err); + } + } + } else { None }; + + self.find_path_and_enqueue_onion_message( + response.message, Destination::BlindedPath(response.reply_path), reply_path, + format_args!( + "when responding with {} to an onion message with path_id {:02x?}", + message_type, + response.path_id + ) + ).map(|result| Some(result)) } #[cfg(test)] @@ -1154,6 +1290,51 @@ where } pending_events.intercepted_msgs.push(event); } + + /// Processes any events asynchronously using the given handler. + /// + /// Note that the event handler is called in the order each event was generated, however + /// futures are polled in parallel for some events to allow for parallelism where events do not + /// have an ordering requirement. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + pub async fn process_pending_events_async + core::marker::Unpin, H: Fn(Event) -> Future>( + &self, handler: H + ) { + let mut intercepted_msgs = Vec::new(); + let mut peer_connecteds = Vec::new(); + { + let mut pending_events = self.pending_events.lock().unwrap(); + core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs); + core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds); + } + + let mut futures = Vec::with_capacity(intercepted_msgs.len()); + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }))); + } + } + } + + for ev in intercepted_msgs { + if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } + futures.push(Some(handler(ev))); + } + // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds + crate::util::async_poll::MultiFuturePoller(futures).await; + + if peer_connecteds.len() <= 1 { + for event in peer_connecteds { handler(event).await; } + } else { + let mut futures = Vec::new(); + for event in peer_connecteds { + futures.push(Some(handler(event))); + } + crate::util::async_poll::MultiFuturePoller(futures).await; + } + } } fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { @@ -1245,14 +1426,15 @@ where |reply_path| Responder::new(reply_path, path_id) ); let response_instructions = self.offers_handler.handle_message(msg, responder); - self.handle_onion_message_response(response_instructions); + let _ = self.handle_onion_message_response(response_instructions); }, + ParsedOnionMessageContents::AsyncPayments(_msg) => todo!(), ParsedOnionMessageContents::Custom(msg) => { let responder = reply_path.map( |reply_path| Responder::new(reply_path, path_id) ); let response_instructions = self.custom_handler.handle_custom_message(msg, responder); - self.handle_onion_message_response(response_instructions); + let _ = self.handle_onion_message_response(response_instructions); }, } },