Connect peers independently.
authorArik Sosman <git@arik.io>
Mon, 28 Aug 2023 21:40:35 +0000 (14:40 -0700)
committerArik Sosman <git@arik.io>
Wed, 6 Sep 2023 23:23:26 +0000 (16:23 -0700)
Previously, we wouldn't start monitoring the gossip data until all
intended peers had either failed or succeeded to connect.

Instead, we're now spawn each peer connection independently, and only
await the first 5 successful peer connections.

src/config.rs
src/tracking.rs

index b905c53f3b877f27b12e21d8fd816f9864a6a1a0..1de248220bd617057e4ccba5f55f34e2737cdd2f 100644 (file)
@@ -23,6 +23,10 @@ pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
 /// That reminder may be either in the form of a channel announcement, or in the form of empty
 /// updates in both directions.
 pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60);
+/// The number of successful peer connections to await prior to continuing to gossip storage.
+/// The application will still work if the number of specified peers is lower, as long as there is
+/// at least one successful peer connection, but it may result in long startup times.
+pub(crate) const CONNECTED_PEER_ASSERTION_LIMIT: usize = 5;
 pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;
 
 pub(crate) fn snapshot_generation_interval() -> u32 {
index c53fc5b5f1f16f8e76eacdf0ac76743e53cfed58..ee428e0f0acd0d1e834cc6af31e3b23dcacee6b7 100644 (file)
@@ -16,15 +16,16 @@ use lightning::routing::gossip::NetworkGraph;
 use lightning::sign::KeysManager;
 use lightning::util::logger::Logger;
 use tokio::sync::mpsc;
+use tokio::task::JoinSet;
 
 use crate::config;
 use crate::downloader::GossipRouter;
 use crate::types::{GossipMessage, GossipPeerManager};
 
 pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
-               completion_sender: mpsc::Sender<()>,
-               network_graph: Arc<NetworkGraph<L>>,
-               logger: L
+       completion_sender: mpsc::Sender<()>,
+       network_graph: Arc<NetworkGraph<L>>,
+       logger: L,
 ) where L::Target: Logger {
        let mut key = [42; 32];
        let mut random_data = [43; 32];
@@ -66,12 +67,29 @@ pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(pe
 
        log_info!(logger, "Connecting to Lightning peers...");
        let peers = config::ln_peers();
+       let mut handles = JoinSet::new();
        let mut connected_peer_count = 0;
 
+       if peers.len() <= config::CONNECTED_PEER_ASSERTION_LIMIT {
+               log_warn!(logger, "Peer assertion threshold is {}, but only {} peers specified.", config::CONNECTED_PEER_ASSERTION_LIMIT, peers.len());
+       }
+
        for current_peer in peers {
-               let initial_connection_succeeded = connect_peer(current_peer, peer_handler.clone(), logger.clone()).await;
-               if initial_connection_succeeded {
-                       connected_peer_count += 1;
+               let peer_handler_clone = peer_handler.clone();
+               let logger_clone = logger.clone();
+               handles.spawn(async move {
+                       connect_peer(current_peer, peer_handler_clone, logger_clone).await
+               });
+       }
+
+       while let Some(connection_result) = handles.join_next().await {
+               if let Ok(connection) = connection_result {
+                       if connection {
+                               connected_peer_count += 1;
+                               if connected_peer_count >= config::CONNECTED_PEER_ASSERTION_LIMIT {
+                                       break;
+                               }
+                       }
                }
        }
 
@@ -81,71 +99,69 @@ pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(pe
 
        log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count);
 
-       tokio::spawn(async move {
-               let mut previous_announcement_count = 0u64;
-               let mut previous_update_count = 0u64;
-               let mut is_caught_up_with_gossip = false;
-
-               let mut i = 0u32;
-               let mut latest_new_gossip_time = Instant::now();
-               let mut needs_to_notify_persister = false;
-
-               loop {
-                       i += 1; // count the background activity
-                       let sleep = tokio::time::sleep(Duration::from_secs(5));
-                       sleep.await;
-
-                       {
-                               let counter = router.counter.read().unwrap();
-                               let total_message_count = counter.channel_announcements + counter.channel_updates;
-                               let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
-
-                               let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
-                               // TODO: make new message threshold (20) adjust based on connected peer count
-                               is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
-                               if new_message_count > 0 {
-                                       latest_new_gossip_time = Instant::now();
-                               }
-
-                               // if we either aren't caught up, or just stopped/started being caught up
-                               if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
-                                       log_info!(
-                                               logger,
-                                               "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
-                                               i,
-                                               total_message_count,
-                                               new_message_count,
-                                               counter.channel_announcements,
-                                               counter.channel_announcements_with_mismatched_scripts,
-                                               counter.channel_updates,
-                                               counter.channel_updates_without_htlc_max_msats
-                                       );
-                               } else {
-                                       log_info!(logger, "Monitoring for gossip…")
-                               }
-
-                               if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
-                                       log_info!(logger, "caught up with gossip!");
-                                       needs_to_notify_persister = true;
-                               } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
-                                       log_info!(logger, "Received new messages since catching up with gossip!");
-                               }
+       let mut previous_announcement_count = 0u64;
+       let mut previous_update_count = 0u64;
+       let mut is_caught_up_with_gossip = false;
+
+       let mut i = 0u32;
+       let mut latest_new_gossip_time = Instant::now();
+       let mut needs_to_notify_persister = false;
+
+       loop {
+               i += 1; // count the background activity
+               let sleep = tokio::time::sleep(Duration::from_secs(5));
+               sleep.await;
+
+               {
+                       let counter = router.counter.read().unwrap();
+                       let total_message_count = counter.channel_announcements + counter.channel_updates;
+                       let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
+
+                       let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
+                       // TODO: make new message threshold (20) adjust based on connected peer count
+                       is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
+                       if new_message_count > 0 {
+                               latest_new_gossip_time = Instant::now();
+                       }
 
-                               let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
-                               if continuous_caught_up_duration.as_secs() > 600 {
-                                       log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
-                               }
+                       // if we either aren't caught up, or just stopped/started being caught up
+                       if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
+                               log_info!(
+                                       logger,
+                                       "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
+                                       i,
+                                       total_message_count,
+                                       new_message_count,
+                                       counter.channel_announcements,
+                                       counter.channel_announcements_with_mismatched_scripts,
+                                       counter.channel_updates,
+                                       counter.channel_updates_without_htlc_max_msats
+                               );
+                       } else {
+                               log_info!(logger, "Monitoring for gossip…")
+                       }
 
-                               previous_announcement_count = counter.channel_announcements;
-                               previous_update_count = counter.channel_updates;
+                       if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
+                               log_info!(logger, "caught up with gossip!");
+                               needs_to_notify_persister = true;
+                       } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
+                               log_info!(logger, "Received new messages since catching up with gossip!");
                        }
 
-                       if needs_to_notify_persister {
-                               needs_to_notify_persister = false;
-                               completion_sender.send(()).await.unwrap();
+                       let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
+                       if continuous_caught_up_duration.as_secs() > 600 {
+                               log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
                        }
+
+                       previous_announcement_count = counter.channel_announcements;
+                       previous_update_count = counter.channel_updates;
                }
-       });
+
+               if needs_to_notify_persister {
+                       needs_to_notify_persister = false;
+                       completion_sender.send(()).await.unwrap();
+               }
+       }
 }
 
 async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>, logger: L) -> bool where L::Target: Logger {