Merge pull request #1769 from TheBlueMatt/2022-10-disconnected-hints
[rust-lightning] / lightning-background-processor / src / lib.rs
index e1fa8144ac4fce2066ea7bf0c47ee5fcdfb5b8aa..27cd0663e3291f27e385787d99769553c012de80 100644 (file)
@@ -35,7 +35,7 @@ use std::time::{Duration, Instant};
 use std::ops::Deref;
 
 #[cfg(feature = "futures")]
-use futures::{select, future::FutureExt};
+use futures_util::{select_biased, future::FutureExt};
 
 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -46,8 +46,8 @@ use futures::{select, future::FutureExt};
 ///   [`ChannelManager`] persistence should be done in the background.
 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
 ///   at the appropriate intervals.
-/// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
-///   is provided to [`BackgroundProcessor::start`]).
+/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
+///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
 ///
 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
 /// upon as doing so may result in high latency.
@@ -312,7 +312,7 @@ macro_rules! define_run_body {
                                // The network graph must not be pruned while rapid sync completion is pending
                                log_trace!($logger, "Assessing prunability of network graph");
                                if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
-                                       network_graph.remove_stale_channels();
+                                       network_graph.remove_stale_channels_and_tracking();
 
                                        if let Err(e) = $persister.persist_graph(network_graph) {
                                                log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
@@ -378,6 +378,7 @@ pub async fn process_events_async<
        Descriptor: 'static + SocketDescriptor + Send + Sync,
        CMH: 'static + Deref + Send + Sync,
        RMH: 'static + Deref + Send + Sync,
+       OMH: 'static + Deref + Send + Sync,
        EH: 'static + EventHandler + Send,
        PS: 'static + Deref + Send,
        M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
@@ -385,7 +386,7 @@ pub async fn process_events_async<
        PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
        RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
        UMH: 'static + Deref + Send + Sync,
-       PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+       PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
        S: 'static + Deref<Target = SC> + Send + Sync,
        SC: WriteableScore<'a>,
        SleepFuture: core::future::Future<Output = bool>,
@@ -405,6 +406,7 @@ where
        L::Target: 'static + Logger,
        P::Target: 'static + Persist<Signer>,
        CMH::Target: 'static + ChannelMessageHandler,
+       OMH::Target: 'static + OnionMessageHandler,
        RMH::Target: 'static + RoutingMessageHandler,
        UMH::Target: 'static + CustomMessageHandler,
        PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
@@ -412,7 +414,7 @@ where
        let mut should_continue = true;
        define_run_body!(persister, event_handler, chain_monitor, channel_manager,
                gossip_sync, peer_manager, logger, scorer, should_continue, {
-                       select! {
+                       select_biased! {
                                _ = channel_manager.get_persistable_update_future().fuse() => true,
                                cont = sleeper(Duration::from_millis(100)).fuse() => {
                                        should_continue = cont;
@@ -581,8 +583,8 @@ mod tests {
        use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
        use lightning::chain::transaction::OutPoint;
        use lightning::get_event_msg;
-       use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
-       use lightning::ln::features::{ChannelFeatures, InitFeatures};
+       use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
+       use lightning::ln::features::ChannelFeatures;
        use lightning::ln::msgs::{ChannelMessageHandler, Init};
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
        use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
@@ -607,7 +609,7 @@ mod tests {
 
        const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
 
-       #[derive(Clone, Eq, Hash, PartialEq)]
+       #[derive(Clone, Hash, PartialEq, Eq)]
        struct TestDescriptor{}
        impl SocketDescriptor for TestDescriptor {
                fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
@@ -746,7 +748,7 @@ mod tests {
                        let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
                        let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
                        let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
-                       let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
+                       let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
                        let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
                        let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
                        nodes.push(node);
@@ -754,8 +756,8 @@ mod tests {
 
                for i in 0..num_nodes {
                        for j in (i+1)..num_nodes {
-                               nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
-                               nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
+                               nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
+                               nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
                        }
                }
 
@@ -776,8 +778,8 @@ mod tests {
        macro_rules! begin_open_channel {
                ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
                        $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
-                       $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
-                       $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
+                       $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
+                       $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
                }}
        }
 
@@ -1117,8 +1119,8 @@ mod tests {
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
-               let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
+               let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
+               let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
                let event_handler = Arc::clone(&invoice_payer);
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                assert!(bg_processor.stop().is_ok());