use lightning::sign::KeysManager;
use lightning::util::logger::Logger;
use tokio::sync::mpsc;
+use tokio::task::JoinSet;
use crate::config;
use crate::downloader::GossipRouter;
use crate::types::{GossipMessage, GossipPeerManager};
pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
- completion_sender: mpsc::Sender<()>,
- network_graph: Arc<NetworkGraph<L>>,
- logger: L
+ completion_sender: mpsc::Sender<()>,
+ network_graph: Arc<NetworkGraph<L>>,
+ logger: L,
) where L::Target: Logger {
let mut key = [42; 32];
let mut random_data = [43; 32];
log_info!(logger, "Connecting to Lightning peers...");
let peers = config::ln_peers();
+ let mut handles = JoinSet::new();
let mut connected_peer_count = 0;
+ if peers.len() <= config::CONNECTED_PEER_ASSERTION_LIMIT {
+ log_warn!(logger, "Peer assertion threshold is {}, but only {} peers specified.", config::CONNECTED_PEER_ASSERTION_LIMIT, peers.len());
+ }
+
for current_peer in peers {
- let initial_connection_succeeded = connect_peer(current_peer, peer_handler.clone(), logger.clone()).await;
- if initial_connection_succeeded {
- connected_peer_count += 1;
+ let peer_handler_clone = peer_handler.clone();
+ let logger_clone = logger.clone();
+ handles.spawn(async move {
+ connect_peer(current_peer, peer_handler_clone, logger_clone).await
+ });
+ }
+
+ while let Some(connection_result) = handles.join_next().await {
+ if let Ok(connection) = connection_result {
+ if connection {
+ connected_peer_count += 1;
+ if connected_peer_count >= config::CONNECTED_PEER_ASSERTION_LIMIT {
+ break;
+ }
+ }
}
}
log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count);
- tokio::spawn(async move {
- let mut previous_announcement_count = 0u64;
- let mut previous_update_count = 0u64;
- let mut is_caught_up_with_gossip = false;
-
- let mut i = 0u32;
- let mut latest_new_gossip_time = Instant::now();
- let mut needs_to_notify_persister = false;
-
- loop {
- i += 1; // count the background activity
- let sleep = tokio::time::sleep(Duration::from_secs(5));
- sleep.await;
-
- {
- let counter = router.counter.read().unwrap();
- let total_message_count = counter.channel_announcements + counter.channel_updates;
- let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
-
- let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
- // TODO: make new message threshold (20) adjust based on connected peer count
- is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
- if new_message_count > 0 {
- latest_new_gossip_time = Instant::now();
- }
-
- // if we either aren't caught up, or just stopped/started being caught up
- if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
- log_info!(
- logger,
- "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
- i,
- total_message_count,
- new_message_count,
- counter.channel_announcements,
- counter.channel_announcements_with_mismatched_scripts,
- counter.channel_updates,
- counter.channel_updates_without_htlc_max_msats
- );
- } else {
- log_info!(logger, "Monitoring for gossip…")
- }
-
- if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
- log_info!(logger, "caught up with gossip!");
- needs_to_notify_persister = true;
- } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
- log_info!(logger, "Received new messages since catching up with gossip!");
- }
+ let mut previous_announcement_count = 0u64;
+ let mut previous_update_count = 0u64;
+ let mut is_caught_up_with_gossip = false;
+
+ let mut i = 0u32;
+ let mut latest_new_gossip_time = Instant::now();
+ let mut needs_to_notify_persister = false;
+
+ loop {
+ i += 1; // count the background activity
+ let sleep = tokio::time::sleep(Duration::from_secs(5));
+ sleep.await;
+
+ {
+ let counter = router.counter.read().unwrap();
+ let total_message_count = counter.channel_announcements + counter.channel_updates;
+ let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
+
+ let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
+ // TODO: make new message threshold (20) adjust based on connected peer count
+ is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
+ if new_message_count > 0 {
+ latest_new_gossip_time = Instant::now();
+ }
- let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
- if continuous_caught_up_duration.as_secs() > 600 {
- log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
- }
+ // if we either aren't caught up, or just stopped/started being caught up
+ if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
+ log_info!(
+ logger,
+ "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
+ i,
+ total_message_count,
+ new_message_count,
+ counter.channel_announcements,
+ counter.channel_announcements_with_mismatched_scripts,
+ counter.channel_updates,
+ counter.channel_updates_without_htlc_max_msats
+ );
+ } else {
+ log_info!(logger, "Monitoring for gossip…")
+ }
- previous_announcement_count = counter.channel_announcements;
- previous_update_count = counter.channel_updates;
+ if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
+ log_info!(logger, "caught up with gossip!");
+ needs_to_notify_persister = true;
+ } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
+ log_info!(logger, "Received new messages since catching up with gossip!");
}
- if needs_to_notify_persister {
- needs_to_notify_persister = false;
- completion_sender.send(()).await.unwrap();
+ let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
+ if continuous_caught_up_duration.as_secs() > 600 {
+ log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
}
+
+ previous_announcement_count = counter.channel_announcements;
+ previous_update_count = counter.channel_updates;
}
- });
+
+ if needs_to_notify_persister {
+ needs_to_notify_persister = false;
+ completion_sender.send(()).await.unwrap();
+ }
+ }
}
async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>, logger: L) -> bool where L::Target: Logger {