pub(super) const MAX_TIMER_TICKS: usize = 2;
+/// A trivial trait which describes any [`OnionMessenger`].
+///
+/// This is not exported to bindings users as general cover traits aren't useful in other
+/// languages.
+pub trait AOnionMessenger {
+ /// A type implementing [`EntropySource`]
+ type EntropySource: EntropySource + ?Sized;
+ /// A type that may be dereferenced to [`Self::EntropySource`]
+ type ES: Deref<Target = Self::EntropySource>;
+ /// A type implementing [`NodeSigner`]
+ type NodeSigner: NodeSigner + ?Sized;
+ /// A type that may be dereferenced to [`Self::NodeSigner`]
+ type NS: Deref<Target = Self::NodeSigner>;
+ /// A type implementing [`Logger`]
+ type Logger: Logger + ?Sized;
+ /// A type that may be dereferenced to [`Self::Logger`]
+ type L: Deref<Target = Self::Logger>;
+ /// A type implementing [`NodeIdLookUp`]
+ type NodeIdLookUp: NodeIdLookUp + ?Sized;
+ /// A type that may be dereferenced to [`Self::NodeIdLookUp`]
+ type NL: Deref<Target = Self::NodeIdLookUp>;
+ /// A type implementing [`MessageRouter`]
+ type MessageRouter: MessageRouter + ?Sized;
+ /// A type that may be dereferenced to [`Self::MessageRouter`]
+ type MR: Deref<Target = Self::MessageRouter>;
+ /// A type implementing [`OffersMessageHandler`]
+ type OffersMessageHandler: OffersMessageHandler + ?Sized;
+ /// A type that may be dereferenced to [`Self::OffersMessageHandler`]
+ type OMH: Deref<Target = Self::OffersMessageHandler>;
+ /// A type implementing [`CustomOnionMessageHandler`]
+ type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized;
+ /// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`]
+ type CMH: Deref<Target = Self::CustomOnionMessageHandler>;
+ /// Returns a reference to the actual [`OnionMessenger`] object.
+ fn get_om(&self) -> &OnionMessenger<Self::ES, Self::NS, Self::L, Self::NL, Self::MR, Self::OMH, Self::CMH>;
+}
+
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> AOnionMessenger
+for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> where
+ ES::Target: EntropySource,
+ NS::Target: NodeSigner,
+ L::Target: Logger,
+ NL::Target: NodeIdLookUp,
+ MR::Target: MessageRouter,
+ OMH::Target: OffersMessageHandler,
+ CMH::Target: CustomOnionMessageHandler,
+{
+ type EntropySource = ES::Target;
+ type ES = ES;
+ type NodeSigner = NS::Target;
+ type NS = NS;
+ type Logger = L::Target;
+ type L = L;
+ type NodeIdLookUp = NL::Target;
+ type NL = NL;
+ type MessageRouter = MR::Target;
+ type MR = MR;
+ type OffersMessageHandler = OMH::Target;
+ type OMH = OMH;
+ type CustomOnionMessageHandler = CMH::Target;
+ type CMH = CMH;
+ fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> { self }
+}
+
/// A sender, receiver and forwarder of [`OnionMessage`]s.
///
/// # Handling Messages
message_router: MR,
offers_handler: OMH,
custom_handler: CMH,
+ intercept_messages_for_offline_peers: bool,
+ pending_events: Mutex<PendingEvents>,
+}
+
+struct PendingEvents {
+ intercepted_msgs: Vec<Event>,
+ peer_connecteds: Vec<Event>,
}
/// [`OnionMessage`]s buffered to be sent.
pub fn new(
entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
offers_handler: OMH, custom_handler: CMH
+ ) -> Self {
+ Self::new_inner(
+ entropy_source, node_signer, logger, node_id_lookup, message_router,
+ offers_handler, custom_handler, false
+ )
+ }
+
+ /// Similar to [`Self::new`], but rather than dropping onion messages that are
+ /// intended to be forwarded to offline peers, we will intercept them for
+ /// later forwarding.
+ ///
+ /// Interception flow:
+ /// 1. If an onion message for an offline peer is received, `OnionMessenger` will
+ /// generate an [`Event::OnionMessageIntercepted`]. Event handlers can
+ /// then choose to persist this onion message for later forwarding, or drop
+ /// it.
+ /// 2. When the offline peer later comes back online, `OnionMessenger` will
+ /// generate an [`Event::OnionMessagePeerConnected`]. Event handlers will
+ /// then fetch all previously intercepted onion messages for this peer.
+ /// 3. Once the stored onion messages are fetched, they can finally be
+ /// forwarded to the now-online peer via [`Self::forward_onion_message`].
+ ///
+ /// # Note
+ ///
+ /// LDK will not rate limit how many [`Event::OnionMessageIntercepted`]s
+ /// are generated, so it is the caller's responsibility to limit how many
+ /// onion messages are persisted and only persist onion messages for relevant
+ /// peers.
+ pub fn new_with_offline_peer_interception(
+ entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
+ message_router: MR, offers_handler: OMH, custom_handler: CMH
+ ) -> Self {
+ Self::new_inner(
+ entropy_source, node_signer, logger, node_id_lookup, message_router,
+ offers_handler, custom_handler, true
+ )
+ }
+
+ fn new_inner(
+ entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
+ message_router: MR, offers_handler: OMH, custom_handler: CMH,
+ intercept_messages_for_offline_peers: bool
) -> Self {
let mut secp_ctx = Secp256k1::new();
secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes());
message_router,
offers_handler,
custom_handler,
+ intercept_messages_for_offline_peers,
+ pending_events: Mutex::new(PendingEvents {
+ intercepted_msgs: Vec::new(),
+ peer_connecteds: Vec::new(),
+ }),
}
}
}
}
+ /// Forwards an [`OnionMessage`] to `peer_node_id`. Useful if we initialized
+ /// the [`OnionMessenger`] with [`Self::new_with_offline_peer_interception`]
+ /// and want to forward a previously intercepted onion message to a peer that
+ /// has just come online.
+ pub fn forward_onion_message(
+ &self, message: OnionMessage, peer_node_id: &PublicKey
+ ) -> Result<(), SendError> {
+ let mut message_recipients = self.message_recipients.lock().unwrap();
+ if outbound_buffer_full(&peer_node_id, &message_recipients) {
+ return Err(SendError::BufferFull);
+ }
+
+ match message_recipients.entry(*peer_node_id) {
+ hash_map::Entry::Occupied(mut e) if e.get().is_connected() => {
+ e.get_mut().enqueue_message(message);
+ Ok(())
+ },
+ _ => Err(SendError::InvalidFirstHop(*peer_node_id))
+ }
+ }
+
#[cfg(any(test, feature = "_test_utils"))]
pub fn send_onion_message_using_path<T: OnionMessageContents>(
&self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>
}
msgs
}
+
+ fn enqueue_intercepted_event(&self, event: Event) {
+ const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
+ let mut pending_events = self.pending_events.lock().unwrap();
+ let total_buffered_bytes: usize =
+ pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
+ if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
+ log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
+ return
+ }
+ pending_events.intercepted_msgs.push(event);
+ }
+
+ /// Processes any events asynchronously using the given handler.
+ ///
+ /// Note that the event handler is called in the order each event was generated, however
+ /// futures are polled in parallel for some events to allow for parallelism where events do not
+ /// have an ordering requirement.
+ ///
+ /// See the trait-level documentation of [`EventsProvider`] for requirements.
+ pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
+ &self, handler: H
+ ) {
+ let mut intercepted_msgs = Vec::new();
+ let mut peer_connecteds = Vec::new();
+ {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
+ core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
+ }
+
+ let mut futures = Vec::with_capacity(intercepted_msgs.len());
+ for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
+ if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
+ if let Some(addresses) = addresses.take() {
+ futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
+ }
+ }
+ }
+
+ for ev in intercepted_msgs {
+ if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
+ futures.push(Some(handler(ev)));
+ }
+ // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
+ crate::util::async_poll::MultiFuturePoller(futures).await;
+
+ if peer_connecteds.len() <= 1 {
+ for event in peer_connecteds { handler(event).await; }
+ } else {
+ let mut futures = Vec::new();
+ for event in peer_connecteds {
+ futures.push(Some(handler(event)));
+ }
+ crate::util::async_poll::MultiFuturePoller(futures).await;
+ }
+ }
}
fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, OnionMessageRecipient>) -> bool {
}
}
}
+ let mut events = Vec::new();
+ {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ #[cfg(debug_assertions)] {
+ for ev in pending_events.intercepted_msgs.iter() {
+ if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
+ }
+ for ev in pending_events.peer_connecteds.iter() {
+ if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
+ }
+ }
+ core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
+ events.append(&mut pending_events.peer_connecteds);
+ pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
+ }
+ for ev in events {
+ handler.handle_event(ev);
+ }
}
}
e.get_mut().enqueue_message(onion_message);
log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
},
+ _ if self.intercept_messages_for_offline_peers => {
+ self.enqueue_intercepted_event(
+ Event::OnionMessageIntercepted {
+ peer_node_id: next_node_id, message: onion_message
+ }
+ );
+ },
_ => {
log_trace!(
logger,
.entry(*their_node_id)
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
.mark_connected();
+ if self.intercept_messages_for_offline_peers {
+ self.pending_events.lock().unwrap().peer_connecteds.push(
+ Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
+ );
+ }
} else {
self.message_recipients.lock().unwrap().remove(their_node_id);
}