]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Process OnionMessageHandler events in background
authorJeffrey Czyz <jkczyz@gmail.com>
Thu, 16 Nov 2023 16:07:12 +0000 (10:07 -0600)
committerJeffrey Czyz <jkczyz@gmail.com>
Wed, 6 Dec 2023 20:43:39 +0000 (14:43 -0600)
OnionMessageHandler implementations now also implement EventsProvider.
Update lightning-background-processor to also process any events the
PeerManager's OnionMessageHandler provides.

lightning-background-processor/src/lib.rs
lightning/src/ln/peer_handler.rs

index fc080eee1f7c4db42788e4e4df5c5513ec1a642f..4ced59abde4b33d813b3da035139d7133c1ae871 100644 (file)
@@ -273,9 +273,9 @@ macro_rules! define_run_body {
        (
                $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
                $channel_manager: ident, $process_channel_manager_events: expr,
-               $peer_manager: ident, $gossip_sync: ident, $logger: ident, $scorer: ident,
-               $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
-               $check_slow_await: expr
+               $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
+               $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
+               $timer_elapsed: expr, $check_slow_await: expr
        ) => { {
                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
                $channel_manager.timer_tick_occurred();
@@ -292,6 +292,7 @@ macro_rules! define_run_body {
                loop {
                        $process_channel_manager_events;
                        $process_chain_monitor_events;
+                       $process_onion_message_handler_events;
 
                        // Note that the PeerManager::process_events may block on ChannelManager's locks,
                        // hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -655,7 +656,8 @@ where
                persister, chain_monitor,
                chain_monitor.process_pending_events_async(async_event_handler).await,
                channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
-               peer_manager, gossip_sync, logger, scorer, should_break, {
+               peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
+               gossip_sync, logger, scorer, should_break, {
                        let fut = Selector {
                                a: channel_manager.get_event_or_persistence_needed_future(),
                                b: chain_monitor.get_update_future(),
@@ -679,6 +681,27 @@ where
        )
 }
 
+#[cfg(feature = "futures")]
+async fn process_onion_message_handler_events_async<
+       EventHandlerFuture: core::future::Future<Output = ()>,
+       EventHandler: Fn(Event) -> EventHandlerFuture,
+       PM: 'static + Deref + Send + Sync,
+>(
+       peer_manager: &PM, handler: EventHandler
+)
+where
+       PM::Target: APeerManager + Send + Sync,
+{
+       use lightning::events::EventsProvider;
+
+       let events = core::cell::RefCell::new(Vec::new());
+       peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
+
+       for event in events.into_inner() {
+               handler(event).await
+       }
+}
+
 #[cfg(feature = "std")]
 impl BackgroundProcessor {
        /// Start a background thread that takes care of responsibilities enumerated in the [top-level
@@ -788,7 +811,9 @@ impl BackgroundProcessor {
                        define_run_body!(
                                persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
                                channel_manager, channel_manager.process_pending_events(&event_handler),
-                               peer_manager, gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
+                               peer_manager,
+                               peer_manager.onion_message_handler().process_pending_events(&event_handler),
+                               gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
                                { Sleeper::from_two_futures(
                                        channel_manager.get_event_or_persistence_needed_future(),
                                        chain_monitor.get_update_future()
index 2fcf1b33047c4dadd9d6e2c0394cc0b1d5f4e4a7..c78ca879fdb30b9db8fbd97c298c084e9613f2b5 100644 (file)
@@ -684,6 +684,8 @@ pub trait APeerManager {
        type NS: Deref<Target=Self::NST>;
        /// Gets a reference to the underlying [`PeerManager`].
        fn as_ref(&self) -> &PeerManager<Self::Descriptor, Self::CM, Self::RM, Self::OM, Self::L, Self::CMH, Self::NS>;
+       /// Returns the peer manager's [`OnionMessageHandler`].
+       fn onion_message_handler(&self) -> &Self::OMT;
 }
 
 impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref, NS: Deref>
@@ -709,6 +711,9 @@ APeerManager for PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> where
        type NST = <NS as Deref>::Target;
        type NS = NS;
        fn as_ref(&self) -> &PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> { self }
+       fn onion_message_handler(&self) -> &Self::OMT {
+               self.message_handler.onion_message_handler.deref()
+       }
 }
 
 /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls