X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fonion_message%2Fmessenger.rs;h=7bea65ab5af53eb3d9f7e7014454393d892dae86;hb=3abec4f03194d818b72bf377b2086563c56f1812;hp=2c4e86bd6a92753ca92dcae778b12c0c7f29992b;hpb=3fb329b004aa5107750ef87dfd2d1766badc705e;p=rust-lightning diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 2c4e86bd..7bea65ab 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -47,6 +47,70 @@ use { pub(super) const MAX_TIMER_TICKS: usize = 2; +/// A trivial trait which describes any [`OnionMessenger`]. +/// +/// This is not exported to bindings users as general cover traits aren't useful in other +/// languages. +pub trait AOnionMessenger { + /// A type implementing [`EntropySource`] + type EntropySource: EntropySource + ?Sized; + /// A type that may be dereferenced to [`Self::EntropySource`] + type ES: Deref; + /// A type implementing [`NodeSigner`] + type NodeSigner: NodeSigner + ?Sized; + /// A type that may be dereferenced to [`Self::NodeSigner`] + type NS: Deref; + /// A type implementing [`Logger`] + type Logger: Logger + ?Sized; + /// A type that may be dereferenced to [`Self::Logger`] + type L: Deref; + /// A type implementing [`NodeIdLookUp`] + type NodeIdLookUp: NodeIdLookUp + ?Sized; + /// A type that may be dereferenced to [`Self::NodeIdLookUp`] + type NL: Deref; + /// A type implementing [`MessageRouter`] + type MessageRouter: MessageRouter + ?Sized; + /// A type that may be dereferenced to [`Self::MessageRouter`] + type MR: Deref; + /// A type implementing [`OffersMessageHandler`] + type OffersMessageHandler: OffersMessageHandler + ?Sized; + /// A type that may be dereferenced to [`Self::OffersMessageHandler`] + type OMH: Deref; + /// A type implementing [`CustomOnionMessageHandler`] + type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized; + /// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`] + type CMH: Deref; + /// Returns a reference to the actual [`OnionMessenger`] object. + fn get_om(&self) -> &OnionMessenger; +} + +impl AOnionMessenger +for OnionMessenger where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + NL::Target: NodeIdLookUp, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + CMH::Target: CustomOnionMessageHandler, +{ + type EntropySource = ES::Target; + type ES = ES; + type NodeSigner = NS::Target; + type NS = NS; + type Logger = L::Target; + type L = L; + type NodeIdLookUp = NL::Target; + type NL = NL; + type MessageRouter = MR::Target; + type MR = MR; + type OffersMessageHandler = OMH::Target; + type OMH = OMH; + type CustomOnionMessageHandler = CMH::Target; + type CMH = CMH; + fn get_om(&self) -> &OnionMessenger { self } +} + /// A sender, receiver and forwarder of [`OnionMessage`]s. /// /// # Handling Messages @@ -176,6 +240,13 @@ where message_router: MR, offers_handler: OMH, custom_handler: CMH, + intercept_messages_for_offline_peers: bool, + pending_events: Mutex, +} + +struct PendingEvents { + intercepted_msgs: Vec, + peer_connecteds: Vec, } /// [`OnionMessage`]s buffered to be sent. @@ -841,6 +912,48 @@ where pub fn new( entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, offers_handler: OMH, custom_handler: CMH + ) -> Self { + Self::new_inner( + entropy_source, node_signer, logger, node_id_lookup, message_router, + offers_handler, custom_handler, false + ) + } + + /// Similar to [`Self::new`], but rather than dropping onion messages that are + /// intended to be forwarded to offline peers, we will intercept them for + /// later forwarding. + /// + /// Interception flow: + /// 1. If an onion message for an offline peer is received, `OnionMessenger` will + /// generate an [`Event::OnionMessageIntercepted`]. Event handlers can + /// then choose to persist this onion message for later forwarding, or drop + /// it. + /// 2. When the offline peer later comes back online, `OnionMessenger` will + /// generate an [`Event::OnionMessagePeerConnected`]. Event handlers will + /// then fetch all previously intercepted onion messages for this peer. + /// 3. Once the stored onion messages are fetched, they can finally be + /// forwarded to the now-online peer via [`Self::forward_onion_message`]. + /// + /// # Note + /// + /// LDK will not rate limit how many [`Event::OnionMessageIntercepted`]s + /// are generated, so it is the caller's responsibility to limit how many + /// onion messages are persisted and only persist onion messages for relevant + /// peers. + pub fn new_with_offline_peer_interception( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, + message_router: MR, offers_handler: OMH, custom_handler: CMH + ) -> Self { + Self::new_inner( + entropy_source, node_signer, logger, node_id_lookup, message_router, + offers_handler, custom_handler, true + ) + } + + fn new_inner( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, + message_router: MR, offers_handler: OMH, custom_handler: CMH, + intercept_messages_for_offline_peers: bool ) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); @@ -854,6 +967,11 @@ where message_router, offers_handler, custom_handler, + intercept_messages_for_offline_peers, + pending_events: Mutex::new(PendingEvents { + intercepted_msgs: Vec::new(), + peer_connecteds: Vec::new(), + }), } } @@ -961,6 +1079,27 @@ where } } + /// Forwards an [`OnionMessage`] to `peer_node_id`. Useful if we initialized + /// the [`OnionMessenger`] with [`Self::new_with_offline_peer_interception`] + /// and want to forward a previously intercepted onion message to a peer that + /// has just come online. + pub fn forward_onion_message( + &self, message: OnionMessage, peer_node_id: &PublicKey + ) -> Result<(), SendError> { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&peer_node_id, &message_recipients) { + return Err(SendError::BufferFull); + } + + match message_recipients.entry(*peer_node_id) { + hash_map::Entry::Occupied(mut e) if e.get().is_connected() => { + e.get_mut().enqueue_message(message); + Ok(()) + }, + _ => Err(SendError::InvalidFirstHop(*peer_node_id)) + } + } + #[cfg(any(test, feature = "_test_utils"))] pub fn send_onion_message_using_path( &self, path: OnionMessagePath, contents: T, reply_path: Option @@ -1003,6 +1142,63 @@ where } msgs } + + fn enqueue_intercepted_event(&self, event: Event) { + const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256; + let mut pending_events = self.pending_events.lock().unwrap(); + let total_buffered_bytes: usize = + pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum(); + if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE { + log_trace!(self.logger, "Dropping event {:?}: buffer full", event); + return + } + pending_events.intercepted_msgs.push(event); + } + + /// Processes any events asynchronously using the given handler. + /// + /// Note that the event handler is called in the order each event was generated, however + /// futures are polled in parallel for some events to allow for parallelism where events do not + /// have an ordering requirement. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + pub async fn process_pending_events_async + core::marker::Unpin, H: Fn(Event) -> Future>( + &self, handler: H + ) { + let mut intercepted_msgs = Vec::new(); + let mut peer_connecteds = Vec::new(); + { + let mut pending_events = self.pending_events.lock().unwrap(); + core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs); + core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds); + } + + let mut futures = Vec::with_capacity(intercepted_msgs.len()); + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }))); + } + } + } + + for ev in intercepted_msgs { + if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } + futures.push(Some(handler(ev))); + } + // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds + crate::util::async_poll::MultiFuturePoller(futures).await; + + if peer_connecteds.len() <= 1 { + for event in peer_connecteds { handler(event).await; } + } else { + let mut futures = Vec::new(); + for event in peer_connecteds { + futures.push(Some(handler(event))); + } + crate::util::async_poll::MultiFuturePoller(futures).await; + } + } } fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { @@ -1047,6 +1243,24 @@ where } } } + let mut events = Vec::new(); + { + let mut pending_events = self.pending_events.lock().unwrap(); + #[cfg(debug_assertions)] { + for ev in pending_events.intercepted_msgs.iter() { + if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); } + } + for ev in pending_events.peer_connecteds.iter() { + if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } + } + } + core::mem::swap(&mut pending_events.intercepted_msgs, &mut events); + events.append(&mut pending_events.peer_connecteds); + pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage + } + for ev in events { + handler.handle_event(ev); + } } } @@ -1120,6 +1334,13 @@ where e.get_mut().enqueue_message(onion_message); log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id); }, + _ if self.intercept_messages_for_offline_peers => { + self.enqueue_intercepted_event( + Event::OnionMessageIntercepted { + peer_node_id: next_node_id, message: onion_message + } + ); + }, _ => { log_trace!( logger, @@ -1141,6 +1362,11 @@ where .entry(*their_node_id) .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); + if self.intercept_messages_for_offline_peers { + self.pending_events.lock().unwrap().peer_connecteds.push( + Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } + ); + } } else { self.message_recipients.lock().unwrap().remove(their_node_id); }