X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Ftracking.rs;h=ee428e0f0acd0d1e834cc6af31e3b23dcacee6b7;hb=66dd8c2933cc57aa2bdc8e4ca33716f6d2309b5e;hp=c53fc5b5f1f16f8e76eacdf0ac76743e53cfed58;hpb=ed395b114b2992e3393c663b5e1ffec8c529648e;p=rapid-gossip-sync-server diff --git a/src/tracking.rs b/src/tracking.rs index c53fc5b..ee428e0 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -16,15 +16,16 @@ use lightning::routing::gossip::NetworkGraph; use lightning::sign::KeysManager; use lightning::util::logger::Logger; use tokio::sync::mpsc; +use tokio::task::JoinSet; use crate::config; use crate::downloader::GossipRouter; use crate::types::{GossipMessage, GossipPeerManager}; pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, - completion_sender: mpsc::Sender<()>, - network_graph: Arc>, - logger: L + completion_sender: mpsc::Sender<()>, + network_graph: Arc>, + logger: L, ) where L::Target: Logger { let mut key = [42; 32]; let mut random_data = [43; 32]; @@ -66,12 +67,29 @@ pub(crate) async fn download_gossip(pe log_info!(logger, "Connecting to Lightning peers..."); let peers = config::ln_peers(); + let mut handles = JoinSet::new(); let mut connected_peer_count = 0; + if peers.len() <= config::CONNECTED_PEER_ASSERTION_LIMIT { + log_warn!(logger, "Peer assertion threshold is {}, but only {} peers specified.", config::CONNECTED_PEER_ASSERTION_LIMIT, peers.len()); + } + for current_peer in peers { - let initial_connection_succeeded = connect_peer(current_peer, peer_handler.clone(), logger.clone()).await; - if initial_connection_succeeded { - connected_peer_count += 1; + let peer_handler_clone = peer_handler.clone(); + let logger_clone = logger.clone(); + handles.spawn(async move { + connect_peer(current_peer, peer_handler_clone, logger_clone).await + }); + } + + while let Some(connection_result) = handles.join_next().await { + if let Ok(connection) = connection_result { + if connection { + connected_peer_count += 1; + if connected_peer_count >= config::CONNECTED_PEER_ASSERTION_LIMIT { + break; + } + } } } @@ -81,71 +99,69 @@ pub(crate) async fn download_gossip(pe log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count); - tokio::spawn(async move { - let mut previous_announcement_count = 0u64; - let mut previous_update_count = 0u64; - let mut is_caught_up_with_gossip = false; - - let mut i = 0u32; - let mut latest_new_gossip_time = Instant::now(); - let mut needs_to_notify_persister = false; - - loop { - i += 1; // count the background activity - let sleep = tokio::time::sleep(Duration::from_secs(5)); - sleep.await; - - { - let counter = router.counter.read().unwrap(); - let total_message_count = counter.channel_announcements + counter.channel_updates; - let new_message_count = total_message_count - previous_announcement_count - previous_update_count; - - let was_previously_caught_up_with_gossip = is_caught_up_with_gossip; - // TODO: make new message threshold (20) adjust based on connected peer count - is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0; - if new_message_count > 0 { - latest_new_gossip_time = Instant::now(); - } - - // if we either aren't caught up, or just stopped/started being caught up - if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) { - log_info!( - logger, - "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n", - i, - total_message_count, - new_message_count, - counter.channel_announcements, - counter.channel_announcements_with_mismatched_scripts, - counter.channel_updates, - counter.channel_updates_without_htlc_max_msats - ); - } else { - log_info!(logger, "Monitoring for gossip…") - } - - if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip { - log_info!(logger, "caught up with gossip!"); - needs_to_notify_persister = true; - } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip { - log_info!(logger, "Received new messages since catching up with gossip!"); - } + let mut previous_announcement_count = 0u64; + let mut previous_update_count = 0u64; + let mut is_caught_up_with_gossip = false; + + let mut i = 0u32; + let mut latest_new_gossip_time = Instant::now(); + let mut needs_to_notify_persister = false; + + loop { + i += 1; // count the background activity + let sleep = tokio::time::sleep(Duration::from_secs(5)); + sleep.await; + + { + let counter = router.counter.read().unwrap(); + let total_message_count = counter.channel_announcements + counter.channel_updates; + let new_message_count = total_message_count - previous_announcement_count - previous_update_count; + + let was_previously_caught_up_with_gossip = is_caught_up_with_gossip; + // TODO: make new message threshold (20) adjust based on connected peer count + is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0; + if new_message_count > 0 { + latest_new_gossip_time = Instant::now(); + } - let continuous_caught_up_duration = latest_new_gossip_time.elapsed(); - if continuous_caught_up_duration.as_secs() > 600 { - log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!"); - } + // if we either aren't caught up, or just stopped/started being caught up + if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) { + log_info!( + logger, + "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n", + i, + total_message_count, + new_message_count, + counter.channel_announcements, + counter.channel_announcements_with_mismatched_scripts, + counter.channel_updates, + counter.channel_updates_without_htlc_max_msats + ); + } else { + log_info!(logger, "Monitoring for gossip…") + } - previous_announcement_count = counter.channel_announcements; - previous_update_count = counter.channel_updates; + if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip { + log_info!(logger, "caught up with gossip!"); + needs_to_notify_persister = true; + } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip { + log_info!(logger, "Received new messages since catching up with gossip!"); } - if needs_to_notify_persister { - needs_to_notify_persister = false; - completion_sender.send(()).await.unwrap(); + let continuous_caught_up_duration = latest_new_gossip_time.elapsed(); + if continuous_caught_up_duration.as_secs() > 600 { + log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!"); } + + previous_announcement_count = counter.channel_announcements; + previous_update_count = counter.channel_updates; } - }); + + if needs_to_notify_persister { + needs_to_notify_persister = false; + completion_sender.send(()).await.unwrap(); + } + } } async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager, logger: L) -> bool where L::Target: Logger {