X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fonion_message%2Fmessenger.rs;h=21012bd7fed2f471c4601b6047feb17cc3107c8a;hb=969578703db85ba62410e121e61fb512be000f0d;hp=aa5f658949254fd98dd8de656c11eefba93c56d3;hpb=993cd1e5253d62abb3ef23a3e58de121139e8fce;p=rust-lightning diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index aa5f6589..21012bd7 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -24,6 +24,9 @@ use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress}; use crate::ln::onion_utils; use crate::routing::gossip::{NetworkGraph, NodeId, ReadOnlyNetworkGraph}; +use super::async_payments::AsyncPaymentsMessageHandler; +#[cfg(async_payments)] +use super::async_payments::AsyncPaymentsMessage; use super::packet::OnionMessageContents; use super::packet::ParsedOnionMessageContents; use super::offers::OffersMessageHandler; @@ -47,6 +50,77 @@ use { pub(super) const MAX_TIMER_TICKS: usize = 2; +/// A trivial trait which describes any [`OnionMessenger`]. +/// +/// This is not exported to bindings users as general cover traits aren't useful in other +/// languages. +pub trait AOnionMessenger { + /// A type implementing [`EntropySource`] + type EntropySource: EntropySource + ?Sized; + /// A type that may be dereferenced to [`Self::EntropySource`] + type ES: Deref; + /// A type implementing [`NodeSigner`] + type NodeSigner: NodeSigner + ?Sized; + /// A type that may be dereferenced to [`Self::NodeSigner`] + type NS: Deref; + /// A type implementing [`Logger`] + type Logger: Logger + ?Sized; + /// A type that may be dereferenced to [`Self::Logger`] + type L: Deref; + /// A type implementing [`NodeIdLookUp`] + type NodeIdLookUp: NodeIdLookUp + ?Sized; + /// A type that may be dereferenced to [`Self::NodeIdLookUp`] + type NL: Deref; + /// A type implementing [`MessageRouter`] + type MessageRouter: MessageRouter + ?Sized; + /// A type that may be dereferenced to [`Self::MessageRouter`] + type MR: Deref; + /// A type implementing [`OffersMessageHandler`] + type OffersMessageHandler: OffersMessageHandler + ?Sized; + /// A type that may be dereferenced to [`Self::OffersMessageHandler`] + type OMH: Deref; + /// A type implementing [`AsyncPaymentsMessageHandler`] + type AsyncPaymentsMessageHandler: AsyncPaymentsMessageHandler + ?Sized; + /// A type that may be dereferenced to [`Self::AsyncPaymentsMessageHandler`] + type APH: Deref; + /// A type implementing [`CustomOnionMessageHandler`] + type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized; + /// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`] + type CMH: Deref; + /// Returns a reference to the actual [`OnionMessenger`] object. + fn get_om(&self) -> &OnionMessenger; +} + +impl AOnionMessenger +for OnionMessenger where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + NL::Target: NodeIdLookUp, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + APH:: Target: AsyncPaymentsMessageHandler, + CMH::Target: CustomOnionMessageHandler, +{ + type EntropySource = ES::Target; + type ES = ES; + type NodeSigner = NS::Target; + type NS = NS; + type Logger = L::Target; + type L = L; + type NodeIdLookUp = NL::Target; + type NL = NL; + type MessageRouter = MR::Target; + type MR = MR; + type OffersMessageHandler = OMH::Target; + type OMH = OMH; + type AsyncPaymentsMessageHandler = APH::Target; + type APH = APH; + type CustomOnionMessageHandler = CMH::Target; + type CMH = CMH; + fn get_om(&self) -> &OnionMessenger { self } +} + /// A sender, receiver and forwarder of [`OnionMessage`]s. /// /// # Handling Messages @@ -98,7 +172,7 @@ pub(super) const MAX_TIMER_TICKS: usize = 2; /// # }) /// # } /// # fn create_blinded_paths( -/// # &self, _recipient: PublicKey, _peers: Vec, _secp_ctx: &Secp256k1 +/// # &self, _recipient: PublicKey, _peers: Vec, _secp_ctx: &Secp256k1 /// # ) -> Result, ()> { /// # unreachable!() /// # } @@ -116,11 +190,12 @@ pub(super) const MAX_TIMER_TICKS: usize = 2; /// # let message_router = Arc::new(FakeMessageRouter {}); /// # let custom_message_handler = IgnoringMessageHandler {}; /// # let offers_message_handler = IgnoringMessageHandler {}; +/// # let async_payments_message_handler = IgnoringMessageHandler {}; /// // Create the onion messenger. This must use the same `keys_manager` as is passed to your /// // ChannelManager. /// let onion_messenger = OnionMessenger::new( /// &keys_manager, &keys_manager, logger, &node_id_lookup, message_router, -/// &offers_message_handler, &custom_message_handler +/// &offers_message_handler, &async_payments_message_handler, &custom_message_handler /// ); /// # #[derive(Debug)] @@ -161,14 +236,16 @@ pub(super) const MAX_TIMER_TICKS: usize = 2; /// /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest /// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice -pub struct OnionMessenger -where +pub struct OnionMessenger< + ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref +> where ES::Target: EntropySource, NS::Target: NodeSigner, L::Target: Logger, NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, + APH::Target: AsyncPaymentsMessageHandler, CMH::Target: CustomOnionMessageHandler, { entropy_source: ES, @@ -179,9 +256,16 @@ where node_id_lookup: NL, message_router: MR, offers_handler: OMH, + #[allow(unused)] + async_payments_handler: APH, custom_handler: CMH, intercept_messages_for_offline_peers: bool, - pending_events: Mutex>, + pending_events: Mutex, +} + +struct PendingEvents { + intercepted_msgs: Vec, + peer_connecteds: Vec, } /// [`OnionMessage`]s buffered to be sent. @@ -256,12 +340,18 @@ impl OnionMessageRecipient { /// The `Responder` struct creates an appropriate [`ResponseInstruction`] /// for responding to a message. +#[derive(Clone, Debug, Eq, PartialEq)] pub struct Responder { /// The path along which a response can be sent. reply_path: BlindedPath, path_id: Option<[u8; 32]> } +impl_writeable_tlv_based!(Responder, { + (0, reply_path, required), + (2, path_id, option), +}); + impl Responder { /// Creates a new [`Responder`] instance with the provided reply path. pub(super) fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self { @@ -357,11 +447,43 @@ pub trait MessageRouter { fn create_blinded_paths< T: secp256k1::Signing + secp256k1::Verification >( - &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, ) -> Result, ()>; + + /// Creates compact [`BlindedPath`]s to the `recipient` node. The nodes in `peers` are assumed + /// to be direct peers with the `recipient`. + /// + /// Compact blinded paths use short channel ids instead of pubkeys for a smaller serialization, + /// which is beneficial when a QR code is used to transport the data. The SCID is passed using a + /// [`ForwardNode`] but may be `None` for graceful degradation. + /// + /// Implementations using additional intermediate nodes are responsible for using a + /// [`ForwardNode`] with `Some` short channel id, if possible. Similarly, implementations should + /// call [`BlindedPath::use_compact_introduction_node`]. + /// + /// The provided implementation simply delegates to [`MessageRouter::create_blinded_paths`], + /// ignoring the short channel ids. + fn create_compact_blinded_paths< + T: secp256k1::Signing + secp256k1::Verification + >( + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()> { + let peers = peers + .into_iter() + .map(|ForwardNode { node_id, short_channel_id: _ }| node_id) + .collect(); + self.create_blinded_paths(recipient, peers, secp_ctx) + } } /// A [`MessageRouter`] that can only route to a directly connected [`Destination`]. +/// +/// # Privacy +/// +/// Creating [`BlindedPath`]s may affect privacy since, if a suitable path cannot be found, it will +/// create a one-hop path using the recipient as the introduction node if it is a announced node. +/// Otherwise, there is no way to find a path to the introduction node in order to send a message, +/// and thus an `Err` is returned. pub struct DefaultMessageRouter>, L: Deref, ES: Deref> where L::Target: Logger, @@ -380,51 +502,12 @@ where pub fn new(network_graph: G, entropy_source: ES) -> Self { Self { network_graph, entropy_source } } -} -impl>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter -where - L::Target: Logger, - ES::Target: EntropySource, -{ - fn find_path( - &self, sender: PublicKey, peers: Vec, mut destination: Destination - ) -> Result { - let network_graph = self.network_graph.deref().read_only(); - destination.resolve(&network_graph); - - let first_node = match destination.first_node() { - Some(first_node) => first_node, - None => return Err(()), - }; - - if peers.contains(&first_node) || sender == first_node { - Ok(OnionMessagePath { - intermediate_nodes: vec![], destination, first_node_addresses: None - }) - } else { - let node_announcement = network_graph - .node(&NodeId::from_pubkey(&first_node)) - .and_then(|node_info| node_info.announcement_info.as_ref()) - .and_then(|announcement_info| announcement_info.announcement_message.as_ref()) - .map(|node_announcement| &node_announcement.contents); - - match node_announcement { - Some(node_announcement) if node_announcement.features.supports_onion_messages() => { - let first_node_addresses = Some(node_announcement.addresses.clone()); - Ok(OnionMessagePath { - intermediate_nodes: vec![], destination, first_node_addresses - }) - }, - _ => Err(()), - } - } - } - - fn create_blinded_paths< + fn create_blinded_paths_from_iter< + I: ExactSizeIterator, T: secp256k1::Signing + secp256k1::Verification >( - &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + &self, recipient: PublicKey, peers: I, secp_ctx: &Secp256k1, compact_paths: bool ) -> Result, ()> { // Limit the number of blinded paths that are computed. const MAX_PATHS: usize = 3; @@ -437,13 +520,20 @@ where let is_recipient_announced = network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient)); - let mut peer_info = peers.into_iter() - // Limit to peers with announced channels + let has_one_peer = peers.len() == 1; + let mut peer_info = peers + // Limit to peers with announced channels unless the recipient is unannounced. .filter_map(|peer| network_graph .node(&NodeId::from_pubkey(&peer.node_id)) - .filter(|info| info.channels.len() >= MIN_PEER_CHANNELS) + .filter(|info| + !is_recipient_announced || info.channels.len() >= MIN_PEER_CHANNELS + ) .map(|info| (peer, info.is_tor_only(), info.channels.len())) + // Allow messages directly with the only peer when unannounced. + .or_else(|| (!is_recipient_announced && has_one_peer) + .then(|| (peer, false, 0)) + ) ) // Exclude Tor-only nodes when the recipient is announced. .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced)) @@ -472,14 +562,75 @@ where } }, }?; - for path in &mut paths { - path.use_compact_introduction_node(&network_graph); + + if compact_paths { + for path in &mut paths { + path.use_compact_introduction_node(&network_graph); + } } Ok(paths) } } +impl>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter +where + L::Target: Logger, + ES::Target: EntropySource, +{ + fn find_path( + &self, sender: PublicKey, peers: Vec, mut destination: Destination + ) -> Result { + let network_graph = self.network_graph.deref().read_only(); + destination.resolve(&network_graph); + + let first_node = match destination.first_node() { + Some(first_node) => first_node, + None => return Err(()), + }; + + if peers.contains(&first_node) || sender == first_node { + Ok(OnionMessagePath { + intermediate_nodes: vec![], destination, first_node_addresses: None + }) + } else { + let node_details = network_graph + .node(&NodeId::from_pubkey(&first_node)) + .and_then(|node_info| node_info.announcement_info.as_ref()) + .map(|announcement_info| (announcement_info.features(), announcement_info.addresses())); + + match node_details { + Some((features, addresses)) if features.supports_onion_messages() && addresses.len() > 0 => { + let first_node_addresses = Some(addresses.clone()); + Ok(OnionMessagePath { + intermediate_nodes: vec![], destination, first_node_addresses + }) + }, + _ => Err(()), + } + } + } + + fn create_blinded_paths< + T: secp256k1::Signing + secp256k1::Verification + >( + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()> { + let peers = peers + .into_iter() + .map(|node_id| ForwardNode { node_id, short_channel_id: None }); + self.create_blinded_paths_from_iter(recipient, peers, secp_ctx, false) + } + + fn create_compact_blinded_paths< + T: secp256k1::Signing + secp256k1::Verification + >( + &self, recipient: PublicKey, peers: Vec, secp_ctx: &Secp256k1, + ) -> Result, ()> { + self.create_blinded_paths_from_iter(recipient, peers.into_iter(), secp_ctx, true) + } +} + /// A path for sending an [`OnionMessage`]. #[derive(Clone)] pub struct OnionMessagePath { @@ -857,8 +1008,8 @@ where } } -impl -OnionMessenger +impl +OnionMessenger where ES::Target: EntropySource, NS::Target: NodeSigner, @@ -866,17 +1017,18 @@ where NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, + APH::Target: AsyncPaymentsMessageHandler, CMH::Target: CustomOnionMessageHandler, { /// Constructs a new `OnionMessenger` to send, forward, and delegate received onion messages to /// their respective handlers. pub fn new( entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, - offers_handler: OMH, custom_handler: CMH + offers_handler: OMH, async_payments_handler: APH, custom_handler: CMH ) -> Self { Self::new_inner( entropy_source, node_signer, logger, node_id_lookup, message_router, - offers_handler, custom_handler, false + offers_handler, async_payments_handler, custom_handler, false ) } @@ -903,17 +1055,17 @@ where /// peers. pub fn new_with_offline_peer_interception( entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, - message_router: MR, offers_handler: OMH, custom_handler: CMH + message_router: MR, offers_handler: OMH, async_payments_handler: APH, custom_handler: CMH ) -> Self { Self::new_inner( entropy_source, node_signer, logger, node_id_lookup, message_router, - offers_handler, custom_handler, true + offers_handler, async_payments_handler, custom_handler, true ) } fn new_inner( entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, - message_router: MR, offers_handler: OMH, custom_handler: CMH, + message_router: MR, offers_handler: OMH, async_payments_handler: APH, custom_handler: CMH, intercept_messages_for_offline_peers: bool ) -> Self { let mut secp_ctx = Secp256k1::new(); @@ -927,9 +1079,13 @@ where node_id_lookup, message_router, offers_handler, + async_payments_handler, custom_handler, intercept_messages_for_offline_peers, - pending_events: Mutex::new(Vec::new()), + pending_events: Mutex::new(PendingEvents { + intercepted_msgs: Vec::new(), + peer_connecteds: Vec::new(), + }), } } @@ -1010,10 +1166,7 @@ where let peers = self.message_recipients.lock().unwrap() .iter() .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_))) - .map(|(node_id, _ )| ForwardNode { - node_id: *node_id, - short_channel_id: None, - }) + .map(|(node_id, _ )| *node_id) .collect::>(); self.message_router @@ -1150,18 +1303,61 @@ where msgs } - fn enqueue_event(&self, event: Event) { + fn enqueue_intercepted_event(&self, event: Event) { const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256; let mut pending_events = self.pending_events.lock().unwrap(); - let total_buffered_bytes: usize = pending_events - .iter() - .map(|ev| ev.serialized_length()) - .sum(); + let total_buffered_bytes: usize = + pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum(); if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE { log_trace!(self.logger, "Dropping event {:?}: buffer full", event); return } - pending_events.push(event); + pending_events.intercepted_msgs.push(event); + } + + /// Processes any events asynchronously using the given handler. + /// + /// Note that the event handler is called in the order each event was generated, however + /// futures are polled in parallel for some events to allow for parallelism where events do not + /// have an ordering requirement. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + pub async fn process_pending_events_async + core::marker::Unpin, H: Fn(Event) -> Future>( + &self, handler: H + ) { + let mut intercepted_msgs = Vec::new(); + let mut peer_connecteds = Vec::new(); + { + let mut pending_events = self.pending_events.lock().unwrap(); + core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs); + core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds); + } + + let mut futures = Vec::with_capacity(intercepted_msgs.len()); + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }))); + } + } + } + + for ev in intercepted_msgs { + if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } + futures.push(Some(handler(ev))); + } + // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds + crate::util::async_poll::MultiFuturePoller(futures).await; + + if peer_connecteds.len() <= 1 { + for event in peer_connecteds { handler(event).await; } + } else { + let mut futures = Vec::new(); + for event in peer_connecteds { + futures.push(Some(handler(event))); + } + crate::util::async_poll::MultiFuturePoller(futures).await; + } } } @@ -1188,8 +1384,8 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap EventsProvider -for OnionMessenger +impl EventsProvider +for OnionMessenger where ES::Target: EntropySource, NS::Target: NodeSigner, @@ -1197,6 +1393,7 @@ where NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, + APH::Target: AsyncPaymentsMessageHandler, CMH::Target: CustomOnionMessageHandler, { fn process_pending_events(&self, handler: H) where H::Target: EventHandler { @@ -1208,15 +1405,28 @@ where } } let mut events = Vec::new(); - core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events); + { + let mut pending_events = self.pending_events.lock().unwrap(); + #[cfg(debug_assertions)] { + for ev in pending_events.intercepted_msgs.iter() { + if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); } + } + for ev in pending_events.peer_connecteds.iter() { + if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } + } + } + core::mem::swap(&mut pending_events.intercepted_msgs, &mut events); + events.append(&mut pending_events.peer_connecteds); + pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage + } for ev in events { handler.handle_event(ev); } } } -impl OnionMessageHandler -for OnionMessenger +impl OnionMessageHandler +for OnionMessenger where ES::Target: EntropySource, NS::Target: NodeSigner, @@ -1224,6 +1434,7 @@ where NL::Target: NodeIdLookUp, MR::Target: MessageRouter, OMH::Target: OffersMessageHandler, + APH::Target: AsyncPaymentsMessageHandler, CMH::Target: CustomOnionMessageHandler, { fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) { @@ -1235,18 +1446,26 @@ where "Received an onion message with path_id {:02x?} and {} reply_path: {:?}", path_id, if reply_path.is_some() { "a" } else { "no" }, message); + let responder = reply_path.map( + |reply_path| Responder::new(reply_path, path_id) + ); match message { ParsedOnionMessageContents::Offers(msg) => { - let responder = reply_path.map( - |reply_path| Responder::new(reply_path, path_id) - ); let response_instructions = self.offers_handler.handle_message(msg, responder); let _ = self.handle_onion_message_response(response_instructions); }, - ParsedOnionMessageContents::Custom(msg) => { - let responder = reply_path.map( - |reply_path| Responder::new(reply_path, path_id) + #[cfg(async_payments)] + ParsedOnionMessageContents::AsyncPayments(AsyncPaymentsMessage::HeldHtlcAvailable(msg)) => { + let response_instructions = self.async_payments_handler.held_htlc_available( + msg, responder ); + let _ = self.handle_onion_message_response(response_instructions); + }, + #[cfg(async_payments)] + ParsedOnionMessageContents::AsyncPayments(AsyncPaymentsMessage::ReleaseHeldHtlc(msg)) => { + self.async_payments_handler.release_held_htlc(msg); + }, + ParsedOnionMessageContents::Custom(msg) => { let response_instructions = self.custom_handler.handle_custom_message(msg, responder); let _ = self.handle_onion_message_response(response_instructions); }, @@ -1286,7 +1505,7 @@ where log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id); }, _ if self.intercept_messages_for_offline_peers => { - self.enqueue_event( + self.enqueue_intercepted_event( Event::OnionMessageIntercepted { peer_node_id: next_node_id, message: onion_message } @@ -1314,7 +1533,7 @@ where .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); if self.intercept_messages_for_offline_peers { - self.enqueue_event( + self.pending_events.lock().unwrap().peer_connecteds.push( Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } ); } @@ -1414,6 +1633,7 @@ pub type SimpleArcOnionMessenger = OnionMessenger< Arc>, Arc>>, Arc, Arc>>, Arc>, + Arc>, IgnoringMessageHandler >; @@ -1434,6 +1654,7 @@ pub type SimpleRefOnionMessenger< &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, &'j DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>, &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, + &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>, IgnoringMessageHandler >;