X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fonion_message%2Fmessenger.rs;h=213ac7fc9e76730e655efbe7a8af79cb0c43e7ac;hb=7213458b824cfa7f4ab95df2df1331e979d0e544;hp=1d7a730fa3625126097fd6d25de150e5ba46c742;hpb=59778dac488cff735004671cdefb3f4ac1f920fd;p=rust-lightning diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 1d7a730f..213ac7fc 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -175,6 +175,8 @@ where message_router: MR, offers_handler: OMH, custom_handler: CMH, + intercept_messages_for_offline_peers: bool, + pending_events: Mutex>, } /// [`OnionMessage`]s buffered to be sent. @@ -796,6 +798,28 @@ where pub fn new( entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, offers_handler: OMH, custom_handler: CMH + ) -> Self { + Self::new_inner( + entropy_source, node_signer, logger, node_id_lookup, message_router, + offers_handler, custom_handler, false + ) + } + + /// + pub fn new_with_offline_peer_interception( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, + message_router: MR, offers_handler: OMH, custom_handler: CMH + ) -> Self { + Self::new_inner( + entropy_source, node_signer, logger, node_id_lookup, message_router, + offers_handler, custom_handler, true + ) + } + + fn new_inner( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, + message_router: MR, offers_handler: OMH, custom_handler: CMH, + intercept_messages_for_offline_peers: bool ) -> Self { let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); @@ -809,6 +833,8 @@ where message_router, offers_handler, custom_handler, + intercept_messages_for_offline_peers, + pending_events: Mutex::new(Vec::new()), } } @@ -917,6 +943,27 @@ where } } + /// Forwards an [`OnionMessage`] to `peer_node_id`. Useful if we initialized + /// the [`OnionMessenger`] with [`Self::new_with_offline_peer_interception`] + /// and want to forward a previously intercepted onion message to a peer that + /// has just come online. + pub fn forward_onion_message( + &self, message: OnionMessage, peer_node_id: &PublicKey + ) -> Result<(), SendError> { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&peer_node_id, &message_recipients) { + return Err(SendError::BufferFull); + } + + match message_recipients.entry(*peer_node_id) { + hash_map::Entry::Occupied(mut e) if e.get().is_connected() => { + e.get_mut().enqueue_message(message); + Ok(()) + }, + _ => Err(SendError::InvalidFirstHop(*peer_node_id)) + } + } + #[cfg(any(test, feature = "_test_utils"))] pub fn send_onion_message_using_path( &self, path: OnionMessagePath, contents: T, reply_path: Option @@ -1004,6 +1051,11 @@ where } } } + let mut events = Vec::new(); + core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events); + for ev in events { + handler.handle_event(ev); + } } } @@ -1081,6 +1133,13 @@ where e.get_mut().enqueue_message(onion_message); log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id); }, + _ if self.intercept_messages_for_offline_peers => { + self.pending_events.lock().unwrap().push( + Event::OnionMessageIntercepted { + peer_node_id: next_node_id, message: onion_message + } + ); + }, _ => { log_trace!( logger, @@ -1102,6 +1161,11 @@ where .entry(*their_node_id) .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); + if self.intercept_messages_for_offline_peers { + self.pending_events.lock().unwrap().push( + Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } + ); + } } else { self.message_recipients.lock().unwrap().remove(their_node_id); }