From 66dd8c2933cc57aa2bdc8e4ca33716f6d2309b5e Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Mon, 28 Aug 2023 14:40:35 -0700 Subject: [PATCH] Connect peers independently. Previously, we wouldn't start monitoring the gossip data until all intended peers had either failed or succeeded to connect. Instead, we're now spawn each peer connection independently, and only await the first 5 successful peer connections. --- src/config.rs | 4 ++ src/tracking.rs | 146 +++++++++++++++++++++++++++--------------------- 2 files changed, 85 insertions(+), 65 deletions(-) diff --git a/src/config.rs b/src/config.rs index b905c53..1de2482 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,6 +23,10 @@ pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks /// That reminder may be either in the form of a channel announcement, or in the form of empty /// updates in both directions. pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60); +/// The number of successful peer connections to await prior to continuing to gossip storage. +/// The application will still work if the number of specified peers is lower, as long as there is +/// at least one successful peer connection, but it may result in long startup times. +pub(crate) const CONNECTED_PEER_ASSERTION_LIMIT: usize = 5; pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true; pub(crate) fn snapshot_generation_interval() -> u32 { diff --git a/src/tracking.rs b/src/tracking.rs index c53fc5b..ee428e0 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -16,15 +16,16 @@ use lightning::routing::gossip::NetworkGraph; use lightning::sign::KeysManager; use lightning::util::logger::Logger; use tokio::sync::mpsc; +use tokio::task::JoinSet; use crate::config; use crate::downloader::GossipRouter; use crate::types::{GossipMessage, GossipPeerManager}; pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, - completion_sender: mpsc::Sender<()>, - network_graph: Arc>, - logger: L + completion_sender: mpsc::Sender<()>, + network_graph: Arc>, + logger: L, ) where L::Target: Logger { let mut key = [42; 32]; let mut random_data = [43; 32]; @@ -66,12 +67,29 @@ pub(crate) async fn download_gossip(pe log_info!(logger, "Connecting to Lightning peers..."); let peers = config::ln_peers(); + let mut handles = JoinSet::new(); let mut connected_peer_count = 0; + if peers.len() <= config::CONNECTED_PEER_ASSERTION_LIMIT { + log_warn!(logger, "Peer assertion threshold is {}, but only {} peers specified.", config::CONNECTED_PEER_ASSERTION_LIMIT, peers.len()); + } + for current_peer in peers { - let initial_connection_succeeded = connect_peer(current_peer, peer_handler.clone(), logger.clone()).await; - if initial_connection_succeeded { - connected_peer_count += 1; + let peer_handler_clone = peer_handler.clone(); + let logger_clone = logger.clone(); + handles.spawn(async move { + connect_peer(current_peer, peer_handler_clone, logger_clone).await + }); + } + + while let Some(connection_result) = handles.join_next().await { + if let Ok(connection) = connection_result { + if connection { + connected_peer_count += 1; + if connected_peer_count >= config::CONNECTED_PEER_ASSERTION_LIMIT { + break; + } + } } } @@ -81,71 +99,69 @@ pub(crate) async fn download_gossip(pe log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count); - tokio::spawn(async move { - let mut previous_announcement_count = 0u64; - let mut previous_update_count = 0u64; - let mut is_caught_up_with_gossip = false; - - let mut i = 0u32; - let mut latest_new_gossip_time = Instant::now(); - let mut needs_to_notify_persister = false; - - loop { - i += 1; // count the background activity - let sleep = tokio::time::sleep(Duration::from_secs(5)); - sleep.await; - - { - let counter = router.counter.read().unwrap(); - let total_message_count = counter.channel_announcements + counter.channel_updates; - let new_message_count = total_message_count - previous_announcement_count - previous_update_count; - - let was_previously_caught_up_with_gossip = is_caught_up_with_gossip; - // TODO: make new message threshold (20) adjust based on connected peer count - is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0; - if new_message_count > 0 { - latest_new_gossip_time = Instant::now(); - } - - // if we either aren't caught up, or just stopped/started being caught up - if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) { - log_info!( - logger, - "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n", - i, - total_message_count, - new_message_count, - counter.channel_announcements, - counter.channel_announcements_with_mismatched_scripts, - counter.channel_updates, - counter.channel_updates_without_htlc_max_msats - ); - } else { - log_info!(logger, "Monitoring for gossip…") - } - - if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip { - log_info!(logger, "caught up with gossip!"); - needs_to_notify_persister = true; - } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip { - log_info!(logger, "Received new messages since catching up with gossip!"); - } + let mut previous_announcement_count = 0u64; + let mut previous_update_count = 0u64; + let mut is_caught_up_with_gossip = false; + + let mut i = 0u32; + let mut latest_new_gossip_time = Instant::now(); + let mut needs_to_notify_persister = false; + + loop { + i += 1; // count the background activity + let sleep = tokio::time::sleep(Duration::from_secs(5)); + sleep.await; + + { + let counter = router.counter.read().unwrap(); + let total_message_count = counter.channel_announcements + counter.channel_updates; + let new_message_count = total_message_count - previous_announcement_count - previous_update_count; + + let was_previously_caught_up_with_gossip = is_caught_up_with_gossip; + // TODO: make new message threshold (20) adjust based on connected peer count + is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0; + if new_message_count > 0 { + latest_new_gossip_time = Instant::now(); + } - let continuous_caught_up_duration = latest_new_gossip_time.elapsed(); - if continuous_caught_up_duration.as_secs() > 600 { - log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!"); - } + // if we either aren't caught up, or just stopped/started being caught up + if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) { + log_info!( + logger, + "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n", + i, + total_message_count, + new_message_count, + counter.channel_announcements, + counter.channel_announcements_with_mismatched_scripts, + counter.channel_updates, + counter.channel_updates_without_htlc_max_msats + ); + } else { + log_info!(logger, "Monitoring for gossip…") + } - previous_announcement_count = counter.channel_announcements; - previous_update_count = counter.channel_updates; + if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip { + log_info!(logger, "caught up with gossip!"); + needs_to_notify_persister = true; + } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip { + log_info!(logger, "Received new messages since catching up with gossip!"); } - if needs_to_notify_persister { - needs_to_notify_persister = false; - completion_sender.send(()).await.unwrap(); + let continuous_caught_up_duration = latest_new_gossip_time.elapsed(); + if continuous_caught_up_duration.as_secs() > 600 { + log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!"); } + + previous_announcement_count = counter.channel_announcements; + previous_update_count = counter.channel_updates; } - }); + + if needs_to_notify_persister { + needs_to_notify_persister = false; + completion_sender.send(()).await.unwrap(); + } + } } async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager, logger: L) -> bool where L::Target: Logger { -- 2.30.2