Replace `EventsProvider` on `OnionMessageHandler` with a single fn
authorMatt Corallo <git@bluematt.me>
Sun, 17 Dec 2023 19:43:28 +0000 (19:43 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 22 Apr 2024 21:57:48 +0000 (21:57 +0000)
`OnionMessageHandler`s are expected to regularly release a set of
nodes which we need to directly connect to to deliver onion
messages. In order to do so, they currently extend
`EventsProvider`, returning that set as `Event::ConnectionNeeded`s.

While this works fine in Rust, the `EventsProvider` interface
doesn't map well in bindings due to it taking a flexible trait impl
as a method argument.

Instead, here, we convert `OnionMessageHandler` to include a single
function which returns nodes in the form of a `node_id` and
`Vec<SocketAddress>`. This is a bit simpler, if less flexible, API,
and while largely equivalent, is easier to map in bindings.

lightning-background-processor/src/lib.rs
lightning/src/ln/msgs.rs
lightning/src/ln/peer_handler.rs
lightning/src/onion_message/messenger.rs

index ddedc9dacce874f81518df140ac6050752011ce3..dd4eea96040add82642e69a26ba0bb2675e9bb55 100644 (file)
@@ -694,7 +694,10 @@ where
                persister, chain_monitor,
                chain_monitor.process_pending_events_async(async_event_handler).await,
                channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
-               peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
+               peer_manager,
+               for event in onion_message_handler_events(peer_manager) {
+                       handler(event).await
+               },
                gossip_sync, logger, scorer, should_break, {
                        let fut = Selector {
                                a: channel_manager.get_event_or_persistence_needed_future(),
@@ -719,23 +722,11 @@ where
        )
 }
 
-#[cfg(feature = "futures")]
-async fn process_onion_message_handler_events_async<
-       EventHandlerFuture: core::future::Future<Output = ()>,
-       EventHandler: Fn(Event) -> EventHandlerFuture,
-       PM: 'static + Deref + Send + Sync,
->(
-       peer_manager: &PM, handler: EventHandler
-)
-where
-       PM::Target: APeerManager + Send + Sync,
-{
-       let events = core::cell::RefCell::new(Vec::new());
-       peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
-
-       for event in events.into_inner() {
-               handler(event).await
-       }
+fn onion_message_handler_events<PM: 'static + Deref + Send + Sync>(
+       peer_manager: &PM
+) -> impl Iterator<Item=Event> where PM::Target: APeerManager + Send + Sync {
+       peer_manager.onion_message_handler().get_and_clear_connections_needed()
+               .into_iter().map(|(node_id, addresses)| Event::ConnectionNeeded { node_id, addresses })
 }
 
 #[cfg(feature = "std")]
@@ -851,7 +842,9 @@ impl BackgroundProcessor {
                                persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
                                channel_manager, channel_manager.process_pending_events(&event_handler),
                                peer_manager,
-                               peer_manager.onion_message_handler().process_pending_events(&event_handler),
+                               for event in onion_message_handler_events(&peer_manager) {
+                                       event_handler.handle_event(event);
+                               },
                                gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
                                { Sleeper::from_two_futures(
                                        channel_manager.get_event_or_persistence_needed_future(),
index e833e3ebf0409ae1974f7d4598fadeab30332147..c5763d08d6728867876274f1d882341369a2900f 100644 (file)
@@ -1632,7 +1632,16 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
 }
 
 /// A handler for received [`OnionMessage`]s and for providing generated ones to send.
-pub trait OnionMessageHandler: EventsProvider {
+pub trait OnionMessageHandler {
+       /// Because much of the lightning network does not yet support forwarding onion messages, we
+       /// may need to directly connect to a node which will forward a message for us. In such a case,
+       /// this method will return the set of nodes which need connection by node_id and the
+       /// corresponding socket addresses where they may accept incoming connections.
+       ///
+       /// Thus, this method should be polled regularly to detect messages await such a direct
+       /// connection.
+       fn get_and_clear_connections_needed(&self) -> Vec<(PublicKey, Vec<SocketAddress>)>;
+
        /// Handle an incoming `onion_message` message from the given peer.
        fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
 
index 77208921cc1bc2c0a8cdec630515ff3bcd33422b..3ad75cccefb2dc62cd59cc56a87c9420974d2752 100644 (file)
@@ -123,6 +123,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        fn processing_queue_high(&self) -> bool { false }
 }
 impl OnionMessageHandler for IgnoringMessageHandler {
+       fn get_and_clear_connections_needed(&self) -> Vec<(PublicKey, Vec<SocketAddress>)> { Vec::new() }
        fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
        fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
        fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
index 5554497a4b74ddd0e9729380c94fe60066faba09..a0e674fa45df4e8e6a57236066acc4ffff5c9bec 100644 (file)
@@ -890,7 +890,7 @@ fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, On
        false
 }
 
-impl<ES: Deref, NS: Deref, L: Deref, MR: Deref, OMH: Deref, CMH: Deref> EventsProvider
+impl<ES: Deref, NS: Deref, L: Deref, MR: Deref, OMH: Deref, CMH: Deref> OnionMessageHandler
 for OnionMessenger<ES, NS, L, MR, OMH, CMH>
 where
        ES::Target: EntropySource,
@@ -900,27 +900,18 @@ where
        OMH::Target: OffersMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
 {
-       fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
+       fn get_and_clear_connections_needed(&self) -> Vec<(PublicKey, Vec<SocketAddress>)> {
+               let mut res = Vec::new();
                for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
                        if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
                                if let Some(addresses) = addresses.take() {
-                                       handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
+                                       res.push((*node_id, addresses));
                                }
                        }
                }
+               res
        }
-}
 
-impl<ES: Deref, NS: Deref, L: Deref, MR: Deref, OMH: Deref, CMH: Deref> OnionMessageHandler
-for OnionMessenger<ES, NS, L, MR, OMH, CMH>
-where
-       ES::Target: EntropySource,
-       NS::Target: NodeSigner,
-       L::Target: Logger,
-       MR::Target: MessageRouter,
-       OMH::Target: OffersMessageHandler,
-       CMH::Target: CustomOnionMessageHandler,
-{
        fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &OnionMessage) {
                match peel_onion_message(
                        msg, &self.secp_ctx, &*self.node_signer, &*self.logger, &*self.custom_handler