X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fonion_message%2Fmessenger.rs;h=9fd6fd95f95a1ea3cdd82bb3ebaea9b0848f8ed4;hb=d792afb08cdc20a30786728c3bfe816dd3b59e47;hp=e87229d66ad9486815c1af690fc0ed6024019ae4;hpb=15d016ac520077b01604f2ede57be83d65e08e59;p=rust-lightning diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index e87229d6..9fd6fd95 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -15,8 +15,8 @@ use bitcoin::hashes::hmac::{Hmac, HmacEngine}; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; -use crate::blinded_path::{BlindedPath, IntroductionNode, NodeIdLookUp}; -use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, NextHop, ReceiveTlvs}; +use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp}; +use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, ReceiveTlvs}; use crate::blinded_path::utils; use crate::events::{Event, EventHandler, EventsProvider}; use crate::sign::{EntropySource, NodeSigner, Recipient}; @@ -71,6 +71,7 @@ pub(super) const MAX_TIMER_TICKS: usize = 2; /// # use bitcoin::hashes::hex::FromHex; /// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey, self}; /// # use lightning::blinded_path::{BlindedPath, EmptyNodeIdLookUp}; +/// # use lightning::blinded_path::message::ForwardNode; /// # use lightning::sign::{EntropySource, KeysManager}; /// # use lightning::ln::peer_handler::IgnoringMessageHandler; /// # use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath, OnionMessenger}; @@ -145,8 +146,11 @@ pub(super) const MAX_TIMER_TICKS: usize = 2; /// /// // Create a blinded path to yourself, for someone to send an onion message to. /// # let your_node_id = hop_node_id1; -/// let hops = [hop_node_id3, hop_node_id4, your_node_id]; -/// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap(); +/// let hops = [ +/// ForwardNode { node_id: hop_node_id3, short_channel_id: None }, +/// ForwardNode { node_id: hop_node_id4, short_channel_id: None }, +/// ]; +/// let blinded_path = BlindedPath::new_for_message(&hops, your_node_id, &keys_manager, &secp_ctx).unwrap(); /// /// // Send a custom onion message to a blinded path. /// let destination = Destination::BlindedPath(blinded_path); @@ -176,6 +180,8 @@ where message_router: MR, offers_handler: OMH, custom_handler: CMH, + intercept_messages_for_offline_peers: bool, + pending_events: Mutex>, } /// [`OnionMessage`]s buffered to be sent. @@ -433,8 +439,12 @@ where }); let paths = peer_info.into_iter() - .map(|(pubkey, _, _)| vec![pubkey, recipient]) - .map(|node_pks| BlindedPath::new_for_message(&node_pks, &*self.entropy_source, secp_ctx)) + .map(|(node_id, _, _)| vec![ForwardNode { node_id, short_channel_id: None }]) + .map(|intermediate_nodes| { + BlindedPath::new_for_message( + &intermediate_nodes, recipient, &*self.entropy_source, secp_ctx + ) + }) .take(MAX_PATHS) .collect::, _>>(); @@ -614,10 +624,10 @@ pub trait CustomOnionMessageHandler { /// A processed incoming onion message, containing either a Forward (another onion message) /// or a Receive payload with decrypted contents. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum PeeledOnion { /// Forwarded onion, with the next node id and a new onion - Forward(NextHop, OnionMessage), + Forward(NextMessageHop, OnionMessage), /// Received onion message, with decrypted contents, path_id, and reply path Receive(ParsedOnionMessageContents, Option<[u8; 32]>, Option) } @@ -841,6 +851,48 @@ where pub fn new( entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, offers_handler: OMH, custom_handler: CMH + ) -> Self { + Self::new_inner( + entropy_source, node_signer, logger, node_id_lookup, message_router, + offers_handler, custom_handler, false + ) + } + + /// Similar to [`Self::new`], but rather than dropping onion messages that are + /// intended to be forwarded to offline peers, we will intercept them for + /// later forwarding. + /// + /// Interception flow: + /// 1. If an onion message for an offline peer is received, `OnionMessenger` will + /// generate an [`Event::OnionMessageIntercepted`]. Event handlers can + /// then choose to persist this onion message for later forwarding, or drop + /// it. + /// 2. When the offline peer later comes back online, `OnionMessenger` will + /// generate an [`Event::OnionMessagePeerConnected`]. Event handlers will + /// then fetch all previously intercepted onion messages for this peer. + /// 3. Once the stored onion messages are fetched, they can finally be + /// forwarded to the now-online peer via [`Self::forward_onion_message`]. + /// + /// # Note + /// + /// LDK will not rate limit how many [`Event::OnionMessageIntercepted`]s + /// are generated, so it is the caller's responsibility to limit how many + /// onion messages are persisted and only persist onion messages for relevant + /// peers. + pub fn new_with_offline_peer_interception( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, + message_router: MR, offers_handler: OMH, custom_handler: CMH + ) -> Self { + Self::new_inner( + entropy_source, node_signer, logger, node_id_lookup, message_router, + offers_handler, custom_handler, true + ) + } + + fn new_inner( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, + message_router: MR, offers_handler: OMH, custom_handler: CMH, + intercept_messages_for_offline_peers: bool ) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); @@ -854,6 +906,8 @@ where message_router, offers_handler, custom_handler, + intercept_messages_for_offline_peers, + pending_events: Mutex::new(Vec::new()), } } @@ -877,13 +931,12 @@ where &self, contents: T, destination: Destination, reply_path: Option, log_suffix: fmt::Arguments ) -> Result { - let mut logger = WithContext::from(&self.logger, None, None); - let result = self.find_path(destination) - .and_then(|path| { - let first_hop = path.intermediate_nodes.get(0).map(|p| *p); - logger = WithContext::from(&self.logger, first_hop, None); - self.enqueue_onion_message(path, contents, reply_path, log_suffix) - }); + let mut logger = WithContext::from(&self.logger, None, None, None); + let result = self.find_path(destination).and_then(|path| { + let first_hop = path.intermediate_nodes.get(0).map(|p| *p); + logger = WithContext::from(&self.logger, first_hop, None, None); + self.enqueue_onion_message(path, contents, reply_path, log_suffix) + }); match result.as_ref() { Err(SendError::GetNodeIdFailed) => { @@ -962,6 +1015,27 @@ where } } + /// Forwards an [`OnionMessage`] to `peer_node_id`. Useful if we initialized + /// the [`OnionMessenger`] with [`Self::new_with_offline_peer_interception`] + /// and want to forward a previously intercepted onion message to a peer that + /// has just come online. + pub fn forward_onion_message( + &self, message: OnionMessage, peer_node_id: &PublicKey + ) -> Result<(), SendError> { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&peer_node_id, &message_recipients) { + return Err(SendError::BufferFull); + } + + match message_recipients.entry(*peer_node_id) { + hash_map::Entry::Occupied(mut e) if e.get().is_connected() => { + e.get_mut().enqueue_message(message); + Ok(()) + }, + _ => Err(SendError::InvalidFirstHop(*peer_node_id)) + } + } + #[cfg(any(test, feature = "_test_utils"))] pub fn send_onion_message_using_path( &self, path: OnionMessagePath, contents: T, reply_path: Option @@ -1004,6 +1078,20 @@ where } msgs } + + fn enqueue_event(&self, event: Event) { + const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256; + let mut pending_events = self.pending_events.lock().unwrap(); + let total_buffered_bytes: usize = pending_events + .iter() + .map(|ev| ev.serialized_length()) + .sum(); + if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE { + log_trace!(self.logger, "Dropping event {:?}: buffer full", event); + return + } + pending_events.push(event); + } } fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { @@ -1048,6 +1136,11 @@ where } } } + let mut events = Vec::new(); + core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events); + for ev in events { + handler.handle_event(ev); + } } } @@ -1063,7 +1156,7 @@ where CMH::Target: CustomOnionMessageHandler, { fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) { - let logger = WithContext::from(&self.logger, Some(*peer_node_id), None); + let logger = WithContext::from(&self.logger, Some(*peer_node_id), None, None); match self.peel_onion_message(msg) { Ok(PeeledOnion::Receive(message, path_id, reply_path)) => { log_trace!( @@ -1090,8 +1183,8 @@ where }, Ok(PeeledOnion::Forward(next_hop, onion_message)) => { let next_node_id = match next_hop { - NextHop::NodeId(pubkey) => pubkey, - NextHop::ShortChannelId(scid) => match self.node_id_lookup.next_node_id(scid) { + NextMessageHop::NodeId(pubkey) => pubkey, + NextMessageHop::ShortChannelId(scid) => match self.node_id_lookup.next_node_id(scid) { Some(pubkey) => pubkey, None => { log_trace!(self.logger, "Dropping forwarded onion messager: unable to resolve next hop using SCID {}", scid); @@ -1121,6 +1214,13 @@ where e.get_mut().enqueue_message(onion_message); log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id); }, + _ if self.intercept_messages_for_offline_peers => { + self.enqueue_event( + Event::OnionMessageIntercepted { + peer_node_id: next_node_id, message: onion_message + } + ); + }, _ => { log_trace!( logger, @@ -1142,6 +1242,11 @@ where .entry(*their_node_id) .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); + if self.intercept_messages_for_offline_peers { + self.enqueue_event( + Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } + ); + } } else { self.message_recipients.lock().unwrap().remove(their_node_id); } @@ -1295,7 +1400,7 @@ fn packet_payloads_and_keys