From 8d20ebc376fa4c821b9b7e3f63164b7143986bbc Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 20 Oct 2022 15:51:37 -0700 Subject: [PATCH] Handle events asynchronously in the BackgroundProcessor's async variant --- lightning-background-processor/src/lib.rs | 38 ++++++++++++++++------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index b394a2311..9e2c7203e 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -236,15 +236,11 @@ where A::Target: chain::Access, L::Target: Logger { } macro_rules! define_run_body { - ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident, + ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $channel_manager: ident, $process_channel_manager_events: expr, $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr) => { { - let event_handler = DecoratingEventHandler { - event_handler: $event_handler, - gossip_sync: &$gossip_sync, - }; - log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); @@ -255,8 +251,8 @@ macro_rules! define_run_body { let mut have_pruned = false; loop { - $channel_manager.process_pending_events(&event_handler); - $chain_monitor.process_pending_events(&event_handler); + $process_channel_manager_events; + $process_chain_monitor_events; // Note that the PeerManager::process_events may block on ChannelManager's locks, // hence it comes last here. When the ChannelManager finishes whatever it's doing, @@ -389,7 +385,8 @@ pub async fn process_events_async< CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, OMH: 'static + Deref + Send + Sync, - EH: 'static + EventHandler + Send, + EventHandlerFuture: core::future::Future, + EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, M: 'static + Deref> + Send + Sync, CM: 'static + Deref> + Send + Sync, @@ -402,7 +399,7 @@ pub async fn process_events_async< SleepFuture: core::future::Future, Sleeper: Fn(Duration) -> SleepFuture >( - persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, sleeper: Sleeper, ) -> Result<(), std::io::Error> @@ -422,7 +419,19 @@ where PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, { let mut should_break = true; - define_run_body!(persister, event_handler, chain_monitor, channel_manager, + let async_event_handler = |event| { + let network_graph = gossip_sync.network_graph(); + let event_handler = &event_handler; + async move { + if let Some(network_graph) = network_graph { + handle_network_graph_update(network_graph, &event) + } + event_handler(event).await; + } + }; + define_run_body!(persister, + chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, + channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, gossip_sync, peer_manager, logger, scorer, should_break, { select_biased! { _ = channel_manager.get_persistable_update_future().fuse() => true, @@ -527,7 +536,12 @@ impl BackgroundProcessor { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - define_run_body!(persister, event_handler, chain_monitor, channel_manager, + let event_handler = DecoratingEventHandler { + event_handler, + gossip_sync: &gossip_sync, + }; + define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), + channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), channel_manager.await_persistable_update_timeout(Duration::from_millis(100))) }); -- 2.39.5