X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Ftracking.rs;h=901339c3286ad22d156f5b5d85c486377d2027b4;hb=4c20eda0cf43a80d27bbcb66423225c7c072753a;hp=243590765f1e829e750d6c6dc0cc68b07687a1c1;hpb=ccd1c465be6982102b6e82a12ac1f4e79ca19fa2;p=rapid-gossip-sync-server diff --git a/src/tracking.rs b/src/tracking.rs index 2435907..901339c 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -1,26 +1,31 @@ use std::collections::hash_map::RandomState; use std::hash::{BuildHasher, Hasher}; use std::net::SocketAddr; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; -use bitcoin::hashes::hex::ToHex; use bitcoin::secp256k1::PublicKey; -use lightning; +use hex_conservative::display::DisplayHex; use lightning::ln::peer_handler::{ ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager, }; +use lightning::{log_info, log_warn}; use lightning::routing::gossip::NetworkGraph; -use lightning::chain::keysinterface::KeysManager; +use lightning::sign::KeysManager; +use lightning::util::logger::Logger; use tokio::sync::mpsc; +use tokio::task::JoinSet; -use crate::{config, TestLogger}; +use crate::config; use crate::downloader::GossipRouter; use crate::types::{GossipMessage, GossipPeerManager}; -pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, - completion_sender: mpsc::Sender<()>, - network_graph: Arc>) { +pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, + completion_sender: mpsc::Sender<()>, + network_graph: Arc>, + logger: L, +) where L::Target: Logger { let mut key = [42; 32]; let mut random_data = [43; 32]; // Get something psuedo-random from std. @@ -33,19 +38,19 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender= config::CONNECTED_PEER_ASSERTION_LIMIT { + break; + } + } } } @@ -74,100 +96,104 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender 0 && previous_update_count > 0; - if new_message_count > 0 { - latest_new_gossip_time = Instant::now(); - } + loop { + i += 1; // count the background activity + let sleep = tokio::time::sleep(Duration::from_secs(5)); + sleep.await; - // if we either aren't caught up, or just stopped/started being caught up - if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) { - println!( - "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n", - i, - total_message_count, - new_message_count, - counter.channel_announcements, - counter.channel_announcements_with_mismatched_scripts, - counter.channel_updates, - counter.channel_updates_without_htlc_max_msats - ); - } else { - println!("Monitoring for gossip…") - } + { + let counter = router.counter.read().unwrap(); + let total_message_count = counter.channel_announcements + counter.channel_updates; + let new_message_count = total_message_count - previous_announcement_count - previous_update_count; - if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip { - println!("caught up with gossip!"); - needs_to_notify_persister = true; - } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip { - println!("Received new messages since catching up with gossip!"); - } + let was_previously_caught_up_with_gossip = is_caught_up_with_gossip; + // TODO: make new message threshold (20) adjust based on connected peer count + is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0; + if new_message_count > 0 { + latest_new_gossip_time = Instant::now(); + } - let continuous_caught_up_duration = latest_new_gossip_time.elapsed(); - if continuous_caught_up_duration.as_secs() > 600 { - eprintln!("No new gossip messages in 10 minutes! Something's amiss!"); - } + // if we either aren't caught up, or just stopped/started being caught up + if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) { + log_info!( + logger, + "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n", + i, + total_message_count, + new_message_count, + counter.channel_announcements, + counter.channel_announcements_with_mismatched_scripts, + counter.channel_updates, + counter.channel_updates_without_htlc_max_msats + ); + } else { + log_info!(logger, "Monitoring for gossip…") + } - previous_announcement_count = counter.channel_announcements; - previous_update_count = counter.channel_updates; + if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip { + log_info!(logger, "caught up with gossip!"); + needs_to_notify_persister = true; + } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip { + log_info!(logger, "Received new messages since catching up with gossip!"); } - if needs_to_notify_persister { - needs_to_notify_persister = false; - completion_sender.send(()).await.unwrap(); + let continuous_caught_up_duration = latest_new_gossip_time.elapsed(); + if continuous_caught_up_duration.as_secs() > 600 { + log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!"); } + + previous_announcement_count = counter.channel_announcements; + previous_update_count = counter.channel_updates; } - }); + + if needs_to_notify_persister { + needs_to_notify_persister = false; + completion_sender.send(()).await.unwrap(); + } + } } -async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool { - eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); - let connection = lightning_net_tokio::connect_outbound( - Arc::clone(&peer_manager), - current_peer.0, - current_peer.1, - ).await; - if let Some(disconnection_future) = connection { - eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string()); - tokio::spawn(async move { - disconnection_future.await; - loop { - eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); - if let Some(disconnection_future) = lightning_net_tokio::connect_outbound( - Arc::clone(&peer_manager), - current_peer.0, - current_peer.1, - ).await { - disconnection_future.await; +async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager, logger: L) -> bool where L::Target: Logger { + // we seek to find out if the first connection attempt was successful + let (sender, mut receiver) = mpsc::channel::(1); + tokio::spawn(async move { + let current_peer_pubkey_hex = current_peer.0.serialize().to_lower_hex_string(); + log_info!(logger, "Connecting to peer {}@{}...", current_peer_pubkey_hex, current_peer.1); + let mut is_first_iteration = true; + loop { + if let Some(disconnection_future) = lightning_net_tokio::connect_outbound( + Arc::clone(&peer_manager), + current_peer.0, + current_peer.1, + ).await { + log_info!(logger, "Connected to peer {}@{}!", current_peer_pubkey_hex, current_peer.1); + if is_first_iteration { + sender.send(true).await.unwrap(); + } + disconnection_future.await; + log_warn!(logger, "Disconnected from peer {}@{}", current_peer_pubkey_hex, current_peer.1); + } else { + log_warn!(logger, "Failed to connect to peer {}@{}!", current_peer_pubkey_hex, current_peer.1); + if is_first_iteration { + sender.send(false).await.unwrap(); } - tokio::time::sleep(Duration::from_secs(10)).await; } - }); - true - } else { - eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string()); - false - } + is_first_iteration = false; + tokio::time::sleep(Duration::from_secs(10)).await; + log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer_pubkey_hex, current_peer.1); + } + }); + + let success = receiver.recv().await.unwrap(); + success }