Drop DecoratingEventHandler in favor of inline closure
[rust-lightning] / lightning-background-processor / src / lib.rs
index 76a7cfc9c4c526f76a5c2c49ef3df35f8405ed9f..ec7d29256e62d751f5bf6633139c6c97d888c830 100644 (file)
@@ -192,49 +192,22 @@ where
        }
 }
 
-/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
-struct DecoratingEventHandler<
-       'a,
-       E: EventHandler,
-       PGS: Deref<Target = P2PGossipSync<G, A, L>>,
-       RGS: Deref<Target = RapidGossipSync<G, L>>,
-       G: Deref<Target = NetworkGraph<L>>,
-       A: Deref,
-       L: Deref,
->
-where A::Target: chain::Access, L::Target: Logger {
-       event_handler: E,
-       gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
-}
-
-impl<
-       'a,
-       E: EventHandler,
-       PGS: Deref<Target = P2PGossipSync<G, A, L>>,
-       RGS: Deref<Target = RapidGossipSync<G, L>>,
-       G: Deref<Target = NetworkGraph<L>>,
-       A: Deref,
-       L: Deref,
-> EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
-where A::Target: chain::Access, L::Target: Logger {
-       fn handle_event(&self, event: &Event) {
-               if let Some(network_graph) = self.gossip_sync.network_graph() {
-                       network_graph.handle_event(event);
+fn handle_network_graph_update<L: Deref>(
+       network_graph: &NetworkGraph<L>, event: &Event
+) where L::Target: Logger {
+       if let Event::PaymentPathFailed { ref network_update, .. } = event {
+               if let Some(network_update) = network_update {
+                       network_graph.handle_network_update(&network_update);
                }
-               self.event_handler.handle_event(event);
        }
 }
 
 macro_rules! define_run_body {
-       ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
+       ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
+        $channel_manager: ident, $process_channel_manager_events: expr,
         $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
         $loop_exit_check: expr, $await: expr)
        => { {
-               let event_handler = DecoratingEventHandler {
-                       event_handler: $event_handler,
-                       gossip_sync: &$gossip_sync,
-               };
-
                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
                $channel_manager.timer_tick_occurred();
 
@@ -245,8 +218,8 @@ macro_rules! define_run_body {
                let mut have_pruned = false;
 
                loop {
-                       $channel_manager.process_pending_events(&event_handler);
-                       $chain_monitor.process_pending_events(&event_handler);
+                       $process_channel_manager_events;
+                       $process_chain_monitor_events;
 
                        // Note that the PeerManager::process_events may block on ChannelManager's locks,
                        // hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -358,8 +331,8 @@ macro_rules! define_run_body {
 /// Processes background events in a future.
 ///
 /// `sleeper` should return a future which completes in the given amount of time and returns a
-/// boolean indicating whether the background processing should continue. Once `sleeper` returns a
-/// future which outputs false, the loop will exit and this function's future will complete.
+/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
+/// future which outputs true, the loop will exit and this function's future will complete.
 ///
 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
 #[cfg(feature = "futures")]
@@ -379,7 +352,8 @@ pub async fn process_events_async<
        CMH: 'static + Deref + Send + Sync,
        RMH: 'static + Deref + Send + Sync,
        OMH: 'static + Deref + Send + Sync,
-       EH: 'static + EventHandler + Send,
+       EventHandlerFuture: core::future::Future<Output = ()>,
+       EventHandler: Fn(Event) -> EventHandlerFuture,
        PS: 'static + Deref + Send,
        M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
        CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
@@ -392,7 +366,7 @@ pub async fn process_events_async<
        SleepFuture: core::future::Future<Output = bool>,
        Sleeper: Fn(Duration) -> SleepFuture
 >(
-       persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
+       persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
        gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
        sleeper: Sleeper,
 ) -> Result<(), std::io::Error>
@@ -411,13 +385,25 @@ where
        UMH::Target: 'static + CustomMessageHandler,
        PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
 {
-       let mut should_continue = true;
-       define_run_body!(persister, event_handler, chain_monitor, channel_manager,
-               gossip_sync, peer_manager, logger, scorer, should_continue, {
+       let mut should_break = true;
+       let async_event_handler = |event| {
+               let network_graph = gossip_sync.network_graph();
+               let event_handler = &event_handler;
+               async move {
+                       if let Some(network_graph) = network_graph {
+                               handle_network_graph_update(network_graph, &event)
+                       }
+                       event_handler(event).await;
+               }
+       };
+       define_run_body!(persister,
+               chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
+               channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
+               gossip_sync, peer_manager, logger, scorer, should_break, {
                        select_biased! {
                                _ = channel_manager.get_persistable_update_future().fuse() => true,
-                               cont = sleeper(Duration::from_millis(100)).fuse() => {
-                                       should_continue = cont;
+                               exit = sleeper(Duration::from_millis(100)).fuse() => {
+                                       should_break = exit;
                                        false
                                }
                        }
@@ -517,7 +503,15 @@ impl BackgroundProcessor {
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
                let handle = thread::spawn(move || -> Result<(), std::io::Error> {
-                       define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+                       let event_handler = |event| {
+                               let network_graph = gossip_sync.network_graph();
+                               if let Some(network_graph) = network_graph {
+                                       handle_network_graph_update(network_graph, &event)
+                               }
+                               event_handler.handle_event(event);
+                       };
+                       define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
+                               channel_manager, channel_manager.process_pending_events(&event_handler),
                                gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
                                channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
                });
@@ -769,7 +763,7 @@ mod tests {
                        begin_open_channel!($node_a, $node_b, $channel_value);
                        let events = $node_a.node.get_and_clear_pending_events();
                        assert_eq!(events.len(), 1);
-                       let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
+                       let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
                        end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
                        tx
                }}
@@ -786,7 +780,7 @@ mod tests {
        macro_rules! handle_funding_generation_ready {
                ($event: expr, $channel_value: expr) => {{
                        match $event {
-                               &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
+                               Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
                                        assert_eq!(channel_value_satoshis, $channel_value);
                                        assert_eq!(user_channel_id, 42);
 
@@ -847,7 +841,7 @@ mod tests {
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                macro_rules! check_persisted_data {
@@ -909,7 +903,7 @@ mod tests {
                let nodes = create_nodes(1, "test_timer_tick_called".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
@@ -932,7 +926,7 @@ mod tests {
 
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                match bg_processor.join() {
                        Ok(_) => panic!("Expected error persisting manager"),
@@ -949,7 +943,7 @@ mod tests {
                let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                match bg_processor.stop() {
@@ -967,7 +961,7 @@ mod tests {
                let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                match bg_processor.stop() {
@@ -988,9 +982,12 @@ mod tests {
 
                // Set up a background event handler for FundingGenerationReady events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
-               let event_handler = move |event: &Event| {
-                       sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
+               let event_handler = move |event: Event| match event {
+                       Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
+                       Event::ChannelReady { .. } => {},
+                       _ => panic!("Unexpected event: {:?}", event),
                };
+
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                // Open a channel and check that the FundingGenerationReady event was handled.
@@ -1014,7 +1011,12 @@ mod tests {
 
                // Set up a background event handler for SpendableOutputs events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
-               let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
+               let event_handler = move |event: Event| match event {
+                       Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
+                       Event::ChannelReady { .. } => {},
+                       Event::ChannelClosed { .. } => {},
+                       _ => panic!("Unexpected event: {:?}", event),
+               };
                let persister = Arc::new(Persister::new(data_dir));
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
@@ -1022,12 +1024,12 @@ mod tests {
                nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
                let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
                confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
+
                let event = receiver
                        .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
-                       .expect("SpendableOutputs not handled within deadline");
+                       .expect("Events not handled within deadline");
                match event {
                        Event::SpendableOutputs { .. } => {},
-                       Event::ChannelClosed { .. } => {},
                        _ => panic!("Unexpected event: {:?}", event),
                }
 
@@ -1039,7 +1041,7 @@ mod tests {
                let nodes = create_nodes(2, "test_scorer_persistence".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                loop {
@@ -1067,7 +1069,7 @@ mod tests {
                assert!(original_graph_description.contains("42: features: 0000, node_one:"));
                assert_eq!(network_graph.read_only().channels().len(), 1);
 
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                loop {
@@ -1120,7 +1122,7 @@ mod tests {
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
-               let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
+               let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
                let event_handler = Arc::clone(&invoice_payer);
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                assert!(bg_processor.stop().is_ok());