Merge pull request #1887 from TheBlueMatt/2022-11-definitely-valid
[rust-lightning] / lightning-background-processor / src / lib.rs
index a481ad3bd92ff775dbda932de978da60c9954849..1c720921970095d4211b4b8113b113926db3854f 100644 (file)
@@ -17,7 +17,7 @@ extern crate lightning_rapid_gossip_sync;
 use lightning::chain;
 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
-use lightning::chain::keysinterface::{Sign, KeysInterface};
+use lightning::chain::keysinterface::KeysInterface;
 use lightning::ln::channelmanager::ChannelManager;
 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
@@ -192,49 +192,22 @@ where
        }
 }
 
-/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
-struct DecoratingEventHandler<
-       'a,
-       E: EventHandler,
-       PGS: Deref<Target = P2PGossipSync<G, A, L>>,
-       RGS: Deref<Target = RapidGossipSync<G, L>>,
-       G: Deref<Target = NetworkGraph<L>>,
-       A: Deref,
-       L: Deref,
->
-where A::Target: chain::Access, L::Target: Logger {
-       event_handler: E,
-       gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
-}
-
-impl<
-       'a,
-       E: EventHandler,
-       PGS: Deref<Target = P2PGossipSync<G, A, L>>,
-       RGS: Deref<Target = RapidGossipSync<G, L>>,
-       G: Deref<Target = NetworkGraph<L>>,
-       A: Deref,
-       L: Deref,
-> EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
-where A::Target: chain::Access, L::Target: Logger {
-       fn handle_event(&self, event: &Event) {
-               if let Some(network_graph) = self.gossip_sync.network_graph() {
-                       network_graph.handle_event(event);
+fn handle_network_graph_update<L: Deref>(
+       network_graph: &NetworkGraph<L>, event: &Event
+) where L::Target: Logger {
+       if let Event::PaymentPathFailed { ref network_update, .. } = event {
+               if let Some(network_update) = network_update {
+                       network_graph.handle_network_update(&network_update);
                }
-               self.event_handler.handle_event(event);
        }
 }
 
 macro_rules! define_run_body {
-       ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
+       ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
+        $channel_manager: ident, $process_channel_manager_events: expr,
         $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
         $loop_exit_check: expr, $await: expr)
        => { {
-               let event_handler = DecoratingEventHandler {
-                       event_handler: $event_handler,
-                       gossip_sync: &$gossip_sync,
-               };
-
                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
                $channel_manager.timer_tick_occurred();
 
@@ -245,8 +218,8 @@ macro_rules! define_run_body {
                let mut have_pruned = false;
 
                loop {
-                       $channel_manager.process_pending_events(&event_handler);
-                       $chain_monitor.process_pending_events(&event_handler);
+                       $process_channel_manager_events;
+                       $process_chain_monitor_events;
 
                        // Note that the PeerManager::process_events may block on ChannelManager's locks,
                        // hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -310,8 +283,8 @@ macro_rules! define_run_body {
                        // continuing our normal cadence.
                        if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
                                // The network graph must not be pruned while rapid sync completion is pending
-                               log_trace!($logger, "Assessing prunability of network graph");
                                if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
+                                       log_trace!($logger, "Pruning and persisting network graph.");
                                        network_graph.remove_stale_channels_and_tracking();
 
                                        if let Err(e) = $persister.persist_graph(network_graph) {
@@ -320,8 +293,6 @@ macro_rules! define_run_body {
 
                                        last_prune_call = Instant::now();
                                        have_pruned = true;
-                               } else {
-                                       log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
                                }
                        }
 
@@ -365,7 +336,6 @@ macro_rules! define_run_body {
 #[cfg(feature = "futures")]
 pub async fn process_events_async<
        'a,
-       Signer: 'static + Sign,
        CA: 'static + Deref + Send + Sync,
        CF: 'static + Deref + Send + Sync,
        CW: 'static + Deref + Send + Sync,
@@ -379,9 +349,10 @@ pub async fn process_events_async<
        CMH: 'static + Deref + Send + Sync,
        RMH: 'static + Deref + Send + Sync,
        OMH: 'static + Deref + Send + Sync,
-       EH: 'static + EventHandler + Send,
+       EventHandlerFuture: core::future::Future<Output = ()>,
+       EventHandler: Fn(Event) -> EventHandlerFuture,
        PS: 'static + Deref + Send,
-       M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
+       M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
        CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
        PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
        RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
@@ -392,27 +363,39 @@ pub async fn process_events_async<
        SleepFuture: core::future::Future<Output = bool>,
        Sleeper: Fn(Duration) -> SleepFuture
 >(
-       persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
+       persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
        gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
        sleeper: Sleeper,
 ) -> Result<(), std::io::Error>
 where
        CA::Target: 'static + chain::Access,
        CF::Target: 'static + chain::Filter,
-       CW::Target: 'static + chain::Watch<Signer>,
+       CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
        T::Target: 'static + BroadcasterInterface,
-       K::Target: 'static + KeysInterface<Signer = Signer>,
+       K::Target: 'static + KeysInterface,
        F::Target: 'static + FeeEstimator,
        L::Target: 'static + Logger,
-       P::Target: 'static + Persist<Signer>,
+       P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
        CMH::Target: 'static + ChannelMessageHandler,
        OMH::Target: 'static + OnionMessageHandler,
        RMH::Target: 'static + RoutingMessageHandler,
        UMH::Target: 'static + CustomMessageHandler,
-       PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+       PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
 {
        let mut should_break = true;
-       define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+       let async_event_handler = |event| {
+               let network_graph = gossip_sync.network_graph();
+               let event_handler = &event_handler;
+               async move {
+                       if let Some(network_graph) = network_graph {
+                               handle_network_graph_update(network_graph, &event)
+                       }
+                       event_handler(event).await;
+               }
+       };
+       define_run_body!(persister,
+               chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
+               channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
                gossip_sync, peer_manager, logger, scorer, should_break, {
                        select_biased! {
                                _ = channel_manager.get_persistable_update_future().fuse() => true,
@@ -471,7 +454,6 @@ impl BackgroundProcessor {
        /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
        pub fn start<
                'a,
-               Signer: 'static + Sign,
                CA: 'static + Deref + Send + Sync,
                CF: 'static + Deref + Send + Sync,
                CW: 'static + Deref + Send + Sync,
@@ -487,7 +469,7 @@ impl BackgroundProcessor {
                RMH: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
                PS: 'static + Deref + Send,
-               M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
+               M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
                CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
                PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
                RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
@@ -502,22 +484,30 @@ impl BackgroundProcessor {
        where
                CA::Target: 'static + chain::Access,
                CF::Target: 'static + chain::Filter,
-               CW::Target: 'static + chain::Watch<Signer>,
+               CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
                T::Target: 'static + BroadcasterInterface,
-               K::Target: 'static + KeysInterface<Signer = Signer>,
+               K::Target: 'static + KeysInterface,
                F::Target: 'static + FeeEstimator,
                L::Target: 'static + Logger,
-               P::Target: 'static + Persist<Signer>,
+               P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
                CMH::Target: 'static + ChannelMessageHandler,
                OMH::Target: 'static + OnionMessageHandler,
                RMH::Target: 'static + RoutingMessageHandler,
                UMH::Target: 'static + CustomMessageHandler,
-               PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
+               PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
        {
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
                let handle = thread::spawn(move || -> Result<(), std::io::Error> {
-                       define_run_body!(persister, event_handler, chain_monitor, channel_manager,
+                       let event_handler = |event| {
+                               let network_graph = gossip_sync.network_graph();
+                               if let Some(network_graph) = network_graph {
+                                       handle_network_graph_update(network_graph, &event)
+                               }
+                               event_handler.handle_event(event);
+                       };
+                       define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
+                               channel_manager, channel_manager.process_pending_events(&event_handler),
                                gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
                                channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
                });
@@ -588,13 +578,13 @@ mod tests {
        use lightning::ln::msgs::{ChannelMessageHandler, Init};
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
        use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+       use lightning::routing::router::DefaultRouter;
        use lightning::util::config::UserConfig;
        use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
        use lightning::util::ser::Writeable;
        use lightning::util::test_utils;
        use lightning::util::persist::KVStorePersister;
        use lightning_invoice::payment::{InvoicePayer, Retry};
-       use lightning_invoice::utils::DefaultRouter;
        use lightning_persister::FilesystemPersister;
        use std::fs;
        use std::path::PathBuf;
@@ -769,7 +759,7 @@ mod tests {
                        begin_open_channel!($node_a, $node_b, $channel_value);
                        let events = $node_a.node.get_and_clear_pending_events();
                        assert_eq!(events.len(), 1);
-                       let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
+                       let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
                        end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
                        tx
                }}
@@ -786,7 +776,7 @@ mod tests {
        macro_rules! handle_funding_generation_ready {
                ($event: expr, $channel_value: expr) => {{
                        match $event {
-                               &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
+                               Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
                                        assert_eq!(channel_value_satoshis, $channel_value);
                                        assert_eq!(user_channel_id, 42);
 
@@ -847,7 +837,7 @@ mod tests {
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                macro_rules! check_persisted_data {
@@ -909,7 +899,7 @@ mod tests {
                let nodes = create_nodes(1, "test_timer_tick_called".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
@@ -932,7 +922,7 @@ mod tests {
 
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                match bg_processor.join() {
                        Ok(_) => panic!("Expected error persisting manager"),
@@ -949,7 +939,7 @@ mod tests {
                let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                match bg_processor.stop() {
@@ -967,7 +957,7 @@ mod tests {
                let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                match bg_processor.stop() {
@@ -988,7 +978,7 @@ mod tests {
 
                // Set up a background event handler for FundingGenerationReady events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
-               let event_handler = move |event: &Event| match event {
+               let event_handler = move |event: Event| match event {
                        Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
                        Event::ChannelReady { .. } => {},
                        _ => panic!("Unexpected event: {:?}", event),
@@ -1017,7 +1007,7 @@ mod tests {
 
                // Set up a background event handler for SpendableOutputs events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
-               let event_handler = move |event: &Event| match event {
+               let event_handler = move |event: Event| match event {
                        Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
                        Event::ChannelReady { .. } => {},
                        Event::ChannelClosed { .. } => {},
@@ -1047,7 +1037,7 @@ mod tests {
                let nodes = create_nodes(2, "test_scorer_persistence".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                loop {
@@ -1075,15 +1065,16 @@ mod tests {
                assert!(original_graph_description.contains("42: features: 0000, node_one:"));
                assert_eq!(network_graph.read_only().channels().len(), 1);
 
-               let event_handler = |_: &_| {};
+               let event_handler = |_: _| {};
                let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
-                       let expected_log_a = "Assessing prunability of network graph".to_string();
-                       let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
-                       if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
-                               log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
+                       let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
+                       if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
+                               .unwrap_or(&0) > 1
+                       {
+                               // Wait until the loop has gone around at least twice.
                                break
                        }
                }
@@ -1128,7 +1119,7 @@ mod tests {
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
-               let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
+               let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
                let event_handler = Arc::clone(&invoice_payer);
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                assert!(bg_processor.stop().is_ok());