add peer_(dis)connected to custom message handler
[rust-lightning] / lightning / src / onion_message / messenger.rs
index aa5f658949254fd98dd8de656c11eefba93c56d3..eb4a837feb6ad0326b5fd4822f8f922212ed3166 100644 (file)
@@ -47,6 +47,70 @@ use {
 
 pub(super) const MAX_TIMER_TICKS: usize = 2;
 
+/// A trivial trait which describes any [`OnionMessenger`].
+///
+/// This is not exported to bindings users as general cover traits aren't useful in other
+/// languages.
+pub trait AOnionMessenger {
+       /// A type implementing [`EntropySource`]
+       type EntropySource: EntropySource + ?Sized;
+       /// A type that may be dereferenced to [`Self::EntropySource`]
+       type ES: Deref<Target = Self::EntropySource>;
+       /// A type implementing [`NodeSigner`]
+       type NodeSigner: NodeSigner + ?Sized;
+       /// A type that may be dereferenced to [`Self::NodeSigner`]
+       type NS: Deref<Target = Self::NodeSigner>;
+       /// A type implementing [`Logger`]
+       type Logger: Logger + ?Sized;
+       /// A type that may be dereferenced to [`Self::Logger`]
+       type L: Deref<Target = Self::Logger>;
+       /// A type implementing [`NodeIdLookUp`]
+       type NodeIdLookUp: NodeIdLookUp + ?Sized;
+       /// A type that may be dereferenced to [`Self::NodeIdLookUp`]
+       type NL: Deref<Target = Self::NodeIdLookUp>;
+       /// A type implementing [`MessageRouter`]
+       type MessageRouter: MessageRouter + ?Sized;
+       /// A type that may be dereferenced to [`Self::MessageRouter`]
+       type MR: Deref<Target = Self::MessageRouter>;
+       /// A type implementing [`OffersMessageHandler`]
+       type OffersMessageHandler: OffersMessageHandler + ?Sized;
+       /// A type that may be dereferenced to [`Self::OffersMessageHandler`]
+       type OMH: Deref<Target = Self::OffersMessageHandler>;
+       /// A type implementing [`CustomOnionMessageHandler`]
+       type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized;
+       /// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`]
+       type CMH: Deref<Target = Self::CustomOnionMessageHandler>;
+       /// Returns a reference to the actual [`OnionMessenger`] object.
+       fn get_om(&self) -> &OnionMessenger<Self::ES, Self::NS, Self::L, Self::NL, Self::MR, Self::OMH, Self::CMH>;
+}
+
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> AOnionMessenger
+for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> where
+       ES::Target: EntropySource,
+       NS::Target: NodeSigner,
+       L::Target: Logger,
+       NL::Target: NodeIdLookUp,
+       MR::Target: MessageRouter,
+       OMH::Target: OffersMessageHandler,
+       CMH::Target: CustomOnionMessageHandler,
+{
+       type EntropySource = ES::Target;
+       type ES = ES;
+       type NodeSigner = NS::Target;
+       type NS = NS;
+       type Logger = L::Target;
+       type L = L;
+       type NodeIdLookUp = NL::Target;
+       type NL = NL;
+       type MessageRouter = MR::Target;
+       type MR = MR;
+       type OffersMessageHandler = OMH::Target;
+       type OMH = OMH;
+       type CustomOnionMessageHandler = CMH::Target;
+       type CMH = CMH;
+       fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> { self }
+}
+
 /// A sender, receiver and forwarder of [`OnionMessage`]s.
 ///
 /// # Handling Messages
@@ -181,7 +245,12 @@ where
        offers_handler: OMH,
        custom_handler: CMH,
        intercept_messages_for_offline_peers: bool,
-       pending_events: Mutex<Vec<Event>>,
+       pending_events: Mutex<PendingEvents>,
+}
+
+struct PendingEvents {
+       intercepted_msgs: Vec<Event>,
+       peer_connecteds: Vec<Event>,
 }
 
 /// [`OnionMessage`]s buffered to be sent.
@@ -403,15 +472,14 @@ where
                                intermediate_nodes: vec![], destination, first_node_addresses: None
                        })
                } else {
-                       let node_announcement = network_graph
+                       let node_details = network_graph
                                .node(&NodeId::from_pubkey(&first_node))
                                .and_then(|node_info| node_info.announcement_info.as_ref())
-                               .and_then(|announcement_info| announcement_info.announcement_message.as_ref())
-                               .map(|node_announcement| &node_announcement.contents);
+                               .map(|announcement_info| (announcement_info.features(), announcement_info.addresses()));
 
-                       match node_announcement {
-                               Some(node_announcement) if node_announcement.features.supports_onion_messages() => {
-                                       let first_node_addresses = Some(node_announcement.addresses.clone());
+                       match node_details {
+                               Some((features, addresses)) if features.supports_onion_messages() && addresses.len() > 0 => {
+                                       let first_node_addresses = Some(addresses.clone());
                                        Ok(OnionMessagePath {
                                                intermediate_nodes: vec![], destination, first_node_addresses
                                        })
@@ -929,7 +997,10 @@ where
                        offers_handler,
                        custom_handler,
                        intercept_messages_for_offline_peers,
-                       pending_events: Mutex::new(Vec::new()),
+                       pending_events: Mutex::new(PendingEvents {
+                               intercepted_msgs: Vec::new(),
+                               peer_connecteds: Vec::new(),
+                       }),
                }
        }
 
@@ -1150,18 +1221,61 @@ where
                msgs
        }
 
-       fn enqueue_event(&self, event: Event) {
+       fn enqueue_intercepted_event(&self, event: Event) {
                const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
                let mut pending_events = self.pending_events.lock().unwrap();
-               let total_buffered_bytes: usize = pending_events
-                       .iter()
-                       .map(|ev| ev.serialized_length())
-                       .sum();
+               let total_buffered_bytes: usize =
+                       pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
                if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
                        log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
                        return
                }
-               pending_events.push(event);
+               pending_events.intercepted_msgs.push(event);
+       }
+
+       /// Processes any events asynchronously using the given handler.
+       ///
+       /// Note that the event handler is called in the order each event was generated, however
+       /// futures are polled in parallel for some events to allow for parallelism where events do not
+       /// have an ordering requirement.
+       ///
+       /// See the trait-level documentation of [`EventsProvider`] for requirements.
+       pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
+               &self, handler: H
+       ) {
+               let mut intercepted_msgs = Vec::new();
+               let mut peer_connecteds = Vec::new();
+               {
+                       let mut pending_events = self.pending_events.lock().unwrap();
+                       core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
+                       core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
+               }
+
+               let mut futures = Vec::with_capacity(intercepted_msgs.len());
+               for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
+                       if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
+                               if let Some(addresses) = addresses.take() {
+                                       futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
+                               }
+                       }
+               }
+
+               for ev in intercepted_msgs {
+                       if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
+                       futures.push(Some(handler(ev)));
+               }
+               // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
+               crate::util::async_poll::MultiFuturePoller(futures).await;
+
+               if peer_connecteds.len() <= 1 {
+                       for event in peer_connecteds { handler(event).await; }
+               } else {
+                       let mut futures = Vec::new();
+                       for event in peer_connecteds {
+                               futures.push(Some(handler(event)));
+                       }
+                       crate::util::async_poll::MultiFuturePoller(futures).await;
+               }
        }
 }
 
@@ -1208,7 +1322,20 @@ where
                        }
                }
                let mut events = Vec::new();
-               core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
+               {
+                       let mut pending_events = self.pending_events.lock().unwrap();
+                       #[cfg(debug_assertions)] {
+                               for ev in pending_events.intercepted_msgs.iter() {
+                                       if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
+                               }
+                               for ev in pending_events.peer_connecteds.iter() {
+                                       if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
+                               }
+                       }
+                       core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
+                       events.append(&mut pending_events.peer_connecteds);
+                       pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
+               }
                for ev in events {
                        handler.handle_event(ev);
                }
@@ -1286,7 +1413,7 @@ where
                                                log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
                                        },
                                        _ if self.intercept_messages_for_offline_peers => {
-                                               self.enqueue_event(
+                                               self.enqueue_intercepted_event(
                                                        Event::OnionMessageIntercepted {
                                                                peer_node_id: next_node_id, message: onion_message
                                                        }
@@ -1314,7 +1441,7 @@ where
                                .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
                                .mark_connected();
                        if self.intercept_messages_for_offline_peers {
-                               self.enqueue_event(
+                               self.pending_events.lock().unwrap().peer_connecteds.push(
                                        Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
                                );
                        }