X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Ftracking.rs;h=901339c3286ad22d156f5b5d85c486377d2027b4;hb=74dd28d4e2c75a4dbb15b3c5045bddfeb2bd9c53;hp=93389a9fa3994706915497c9f199221408926d15;hpb=6832d206995b49887c38f2264df865115e58ca8e;p=rapid-gossip-sync-server diff --git a/src/tracking.rs b/src/tracking.rs index 93389a9..901339c 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -1,74 +1,94 @@ +use std::collections::hash_map::RandomState; +use std::hash::{BuildHasher, Hasher}; use std::net::SocketAddr; -use std::sync::{Arc, RwLock}; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::ops::Deref; +use std::sync::Arc; +use std::time::{Duration, Instant}; -use bitcoin::hashes::hex::ToHex; -use bitcoin::secp256k1::{PublicKey, SecretKey}; -use lightning; +use bitcoin::secp256k1::PublicKey; +use hex_conservative::display::DisplayHex; use lightning::ln::peer_handler::{ ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager, }; -use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; -use rand::{Rng, thread_rng}; +use lightning::{log_info, log_warn}; +use lightning::routing::gossip::NetworkGraph; +use lightning::sign::KeysManager; +use lightning::util::logger::Logger; use tokio::sync::mpsc; - -use crate::{config, TestLogger}; -use crate::downloader::{GossipCounter, GossipRouter}; -use crate::types::{DetectedGossipMessage, GossipChainAccess, GossipMessage, GossipPeerManager}; -use crate::verifier::ChainVerifier; - -pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, network_graph: Arc>>) { - let mut key = [0; 32]; - let mut random_data = [0; 32]; - thread_rng().fill_bytes(&mut key); - thread_rng().fill_bytes(&mut random_data); - let our_node_secret = SecretKey::from_slice(&key).unwrap(); - - let _arc_chain_access = None::; - let arc_chain_access = Some(Arc::new(ChainVerifier::new())); - let ignorer = IgnoringMessageHandler {}; - let arc_ignorer = Arc::new(ignorer); - - let errorer = ErroringMessageHandler::new(); - let arc_errorer = Arc::new(errorer); - - let logger = TestLogger::new(); - let arc_logger = Arc::new(logger); - - let router = P2PGossipSync::new( - network_graph.clone(), - arc_chain_access, - Arc::clone(&arc_logger), - ); - let arc_router = Arc::new(router); - let wrapped_router = GossipRouter { - native_router: arc_router, - counter: RwLock::new(GossipCounter::new()), - sender: persistence_sender.clone(), - }; - let arc_wrapped_router = Arc::new(wrapped_router); +use tokio::task::JoinSet; + +use crate::config; +use crate::downloader::GossipRouter; +use crate::types::{GossipMessage, GossipPeerManager}; + +pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, + completion_sender: mpsc::Sender<()>, + network_graph: Arc>, + logger: L, +) where L::Target: Logger { + let mut key = [42; 32]; + let mut random_data = [43; 32]; + // Get something psuedo-random from std. + let mut key_hasher = RandomState::new().build_hasher(); + key_hasher.write_u8(1); + key[0..8].copy_from_slice(&key_hasher.finish().to_ne_bytes()); + let mut rand_hasher = RandomState::new().build_hasher(); + rand_hasher.write_u8(2); + random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes()); + + let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef)); + + let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone())); let message_handler = MessageHandler { - chan_handler: arc_errorer, - route_handler: arc_wrapped_router.clone(), + chan_handler: ErroringMessageHandler::new(), + route_handler: Arc::clone(&router), + onion_message_handler: IgnoringMessageHandler {}, + custom_message_handler: IgnoringMessageHandler {}, }; - let peer_handler = PeerManager::new( + let peer_handler = Arc::new(PeerManager::new( message_handler, - our_node_secret, + 0xdeadbeef, &random_data, - Arc::clone(&arc_logger), - arc_ignorer, - ); - let arc_peer_handler = Arc::new(peer_handler); + logger.clone(), + keys_manager, + )); + router.set_pm(Arc::clone(&peer_handler)); - println!("Connecting to Lightning peers…"); + let ph_timer = Arc::clone(&peer_handler); + tokio::spawn(async move { + let mut intvl = tokio::time::interval(Duration::from_secs(10)); + loop { + intvl.tick().await; + ph_timer.timer_tick_occurred(); + } + }); + + log_info!(logger, "Connecting to Lightning peers..."); let peers = config::ln_peers(); + let mut handles = JoinSet::new(); let mut connected_peer_count = 0; + if peers.len() <= config::CONNECTED_PEER_ASSERTION_LIMIT { + log_warn!(logger, "Peer assertion threshold is {}, but only {} peers specified.", config::CONNECTED_PEER_ASSERTION_LIMIT, peers.len()); + } + for current_peer in peers { - let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&arc_peer_handler)).await; - if initial_connection_succeeded { - connected_peer_count += 1; + let peer_handler_clone = peer_handler.clone(); + let logger_clone = logger.clone(); + handles.spawn(async move { + connect_peer(current_peer, peer_handler_clone, logger_clone).await + }); + } + + while let Some(connection_result) = handles.join_next().await { + if let Ok(connection) = connection_result { + if connection { + connected_peer_count += 1; + if connected_peer_count >= config::CONNECTED_PEER_ASSERTION_LIMIT { + break; + } + } } } @@ -76,112 +96,104 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender 0 && previous_update_count > 0; - if new_message_count > 0 { - latest_new_gossip_time = Instant::now(); - } + loop { + i += 1; // count the background activity + let sleep = tokio::time::sleep(Duration::from_secs(5)); + sleep.await; - // if we either aren't caught up, or just stopped/started being caught up - if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) { - println!( - "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n", - i, - total_message_count, - new_message_count, - counter.channel_announcements, - counter.channel_announcements_with_mismatched_scripts, - counter.channel_updates, - counter.channel_updates_without_htlc_max_msats - ); - } else { - println!("Monitoring for gossip…") - } + { + let counter = router.counter.read().unwrap(); + let total_message_count = counter.channel_announcements + counter.channel_updates; + let new_message_count = total_message_count - previous_announcement_count - previous_update_count; - if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip { - println!("caught up with gossip!"); - needs_to_notify_persister = true; - } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip { - println!("Received new messages since catching up with gossip!"); - } + let was_previously_caught_up_with_gossip = is_caught_up_with_gossip; + // TODO: make new message threshold (20) adjust based on connected peer count + is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0; + if new_message_count > 0 { + latest_new_gossip_time = Instant::now(); + } - let continuous_caught_up_duration = latest_new_gossip_time.elapsed(); - if continuous_caught_up_duration.as_secs() > 600 { - eprintln!("No new gossip messages in 10 minutes! Something's amiss!"); - } + // if we either aren't caught up, or just stopped/started being caught up + if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) { + log_info!( + logger, + "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n", + i, + total_message_count, + new_message_count, + counter.channel_announcements, + counter.channel_announcements_with_mismatched_scripts, + counter.channel_updates, + counter.channel_updates_without_htlc_max_msats + ); + } else { + log_info!(logger, "Monitoring for gossip…") + } - previous_announcement_count = counter.channel_announcements; - previous_update_count = counter.channel_updates; + if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip { + log_info!(logger, "caught up with gossip!"); + needs_to_notify_persister = true; + } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip { + log_info!(logger, "Received new messages since catching up with gossip!"); } - if needs_to_notify_persister { - needs_to_notify_persister = false; - let sender = local_persistence_sender.clone(); - tokio::spawn(async move { - let _ = sender.send(DetectedGossipMessage { - timestamp_seen: current_timestamp as u32, - message: GossipMessage::InitialSyncComplete, - }).await; - }); + let continuous_caught_up_duration = latest_new_gossip_time.elapsed(); + if continuous_caught_up_duration.as_secs() > 600 { + log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!"); } + + previous_announcement_count = counter.channel_announcements; + previous_update_count = counter.channel_updates; } - }); + + if needs_to_notify_persister { + needs_to_notify_persister = false; + completion_sender.send(()).await.unwrap(); + } + } } -async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool { - eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); - let connection = lightning_net_tokio::connect_outbound( - Arc::clone(&peer_manager), - current_peer.0, - current_peer.1, - ).await; - if let Some(disconnection_future) = connection { - eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string()); - tokio::spawn(async move { - disconnection_future.await; - loop { - eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); - if let Some(disconnection_future) = lightning_net_tokio::connect_outbound( - Arc::clone(&peer_manager), - current_peer.0, - current_peer.1, - ).await { - disconnection_future.await; - } else { - tokio::time::sleep(Duration::from_secs(10)).await; +async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager, logger: L) -> bool where L::Target: Logger { + // we seek to find out if the first connection attempt was successful + let (sender, mut receiver) = mpsc::channel::(1); + tokio::spawn(async move { + let current_peer_pubkey_hex = current_peer.0.serialize().to_lower_hex_string(); + log_info!(logger, "Connecting to peer {}@{}...", current_peer_pubkey_hex, current_peer.1); + let mut is_first_iteration = true; + loop { + if let Some(disconnection_future) = lightning_net_tokio::connect_outbound( + Arc::clone(&peer_manager), + current_peer.0, + current_peer.1, + ).await { + log_info!(logger, "Connected to peer {}@{}!", current_peer_pubkey_hex, current_peer.1); + if is_first_iteration { + sender.send(true).await.unwrap(); + } + disconnection_future.await; + log_warn!(logger, "Disconnected from peer {}@{}", current_peer_pubkey_hex, current_peer.1); + } else { + log_warn!(logger, "Failed to connect to peer {}@{}!", current_peer_pubkey_hex, current_peer.1); + if is_first_iteration { + sender.send(false).await.unwrap(); } } - }); - true - } else { - eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string()); - false - } + is_first_iteration = false; + tokio::time::sleep(Duration::from_secs(10)).await; + log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer_pubkey_hex, current_peer.1); + } + }); + + let success = receiver.recv().await.unwrap(); + success }