Don't special-case initial connection attempt.
[rapid-gossip-sync-server] / src / tracking.rs
index 9c249232be5d3f37c1f08c5257906040893212f5..d25b60a665b93277ae0c1d4bbf83ec17c58cb4e2 100644 (file)
@@ -1,75 +1,95 @@
+use std::collections::hash_map::RandomState;
+use std::hash::{BuildHasher, Hasher};
 use std::net::SocketAddr;
-use std::sync::{Arc, RwLock};
-use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+use std::ops::Deref;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
 
 use bitcoin::hashes::hex::ToHex;
-use bitcoin::secp256k1::{PublicKey, SecretKey};
-use futures::executor;
+use bitcoin::secp256k1::PublicKey;
 use lightning;
 use lightning::ln::peer_handler::{
        ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
 };
-use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
-use rand::{Rng, thread_rng};
+use lightning::{log_info, log_warn};
+use lightning::routing::gossip::NetworkGraph;
+use lightning::sign::KeysManager;
+use lightning::util::logger::Logger;
 use tokio::sync::mpsc;
-
-use crate::{config, TestLogger};
-use crate::downloader::{GossipCounter, GossipRouter};
-use crate::types::{DetectedGossipMessage, GossipChainAccess, GossipMessage, GossipPeerManager};
-use crate::verifier::ChainVerifier;
-
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
-       let mut key = [0; 32];
-       let mut random_data = [0; 32];
-       thread_rng().fill_bytes(&mut key);
-       thread_rng().fill_bytes(&mut random_data);
-       let our_node_secret = SecretKey::from_slice(&key).unwrap();
-
-       let _arc_chain_access = None::<GossipChainAccess>;
-       let arc_chain_access = Some(Arc::new(ChainVerifier::new()));
-       let ignorer = IgnoringMessageHandler {};
-       let arc_ignorer = Arc::new(ignorer);
-
-       let errorer = ErroringMessageHandler::new();
-       let arc_errorer = Arc::new(errorer);
-
-       let logger = TestLogger::new();
-       let arc_logger = Arc::new(logger);
-
-       let router = P2PGossipSync::new(
-               network_graph.clone(),
-               arc_chain_access,
-               Arc::clone(&arc_logger),
-       );
-       let arc_router = Arc::new(router);
-       let wrapped_router = GossipRouter {
-               native_router: arc_router,
-               counter: RwLock::new(GossipCounter::new()),
-               sender: persistence_sender.clone(),
-       };
-       let arc_wrapped_router = Arc::new(wrapped_router);
+use tokio::task::JoinSet;
+
+use crate::config;
+use crate::downloader::GossipRouter;
+use crate::types::{GossipMessage, GossipPeerManager};
+
+pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
+       completion_sender: mpsc::Sender<()>,
+       network_graph: Arc<NetworkGraph<L>>,
+       logger: L,
+) where L::Target: Logger {
+       let mut key = [42; 32];
+       let mut random_data = [43; 32];
+       // Get something psuedo-random from std.
+       let mut key_hasher = RandomState::new().build_hasher();
+       key_hasher.write_u8(1);
+       key[0..8].copy_from_slice(&key_hasher.finish().to_ne_bytes());
+       let mut rand_hasher = RandomState::new().build_hasher();
+       rand_hasher.write_u8(2);
+       random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
+
+       let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
+
+       let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
 
        let message_handler = MessageHandler {
-               chan_handler: arc_errorer,
-               route_handler: arc_wrapped_router.clone(),
+               chan_handler: ErroringMessageHandler::new(),
+               route_handler: Arc::clone(&router),
+               onion_message_handler: IgnoringMessageHandler {},
+               custom_message_handler: IgnoringMessageHandler {},
        };
-       let peer_handler = PeerManager::new(
+       let peer_handler = Arc::new(PeerManager::new(
                message_handler,
-               our_node_secret,
+               0xdeadbeef,
                &random_data,
-               Arc::clone(&arc_logger),
-               arc_ignorer,
-       );
-       let arc_peer_handler = Arc::new(peer_handler);
+               logger.clone(),
+               keys_manager,
+       ));
+       router.set_pm(Arc::clone(&peer_handler));
 
-       println!("Connecting to Lightning peers…");
+       let ph_timer = Arc::clone(&peer_handler);
+       tokio::spawn(async move {
+               let mut intvl = tokio::time::interval(Duration::from_secs(10));
+               loop {
+                       intvl.tick().await;
+                       ph_timer.timer_tick_occurred();
+               }
+       });
+
+       log_info!(logger, "Connecting to Lightning peers...");
        let peers = config::ln_peers();
+       let mut handles = JoinSet::new();
        let mut connected_peer_count = 0;
 
+       if peers.len() <= config::CONNECTED_PEER_ASSERTION_LIMIT {
+               log_warn!(logger, "Peer assertion threshold is {}, but only {} peers specified.", config::CONNECTED_PEER_ASSERTION_LIMIT, peers.len());
+       }
+
        for current_peer in peers {
-               let initial_connection_succeeded = monitor_peer_connection(current_peer, Arc::clone(&arc_peer_handler));
-               if initial_connection_succeeded {
-                       connected_peer_count += 1;
+               let peer_handler_clone = peer_handler.clone();
+               let logger_clone = logger.clone();
+               handles.spawn(async move {
+                       connect_peer(current_peer, peer_handler_clone, logger_clone).await
+               });
+       }
+
+       while let Some(connection_result) = handles.join_next().await {
+               if let Ok(connection) = connection_result {
+                       if connection {
+                               connected_peer_count += 1;
+                               if connected_peer_count >= config::CONNECTED_PEER_ASSERTION_LIMIT {
+                                       break;
+                               }
+                       }
                }
        }
 
@@ -77,107 +97,102 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGos
                panic!("Failed to connect to any peer.");
        }
 
-       println!("Connected to {} Lightning peers!", connected_peer_count);
+       log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count);
 
-       let local_router = arc_wrapped_router.clone();
-       let local_persistence_sender = persistence_sender.clone();
-       tokio::spawn(async move {
-               let mut previous_announcement_count = 0u64;
-               let mut previous_update_count = 0u64;
-               let mut is_caught_up_with_gossip = false;
+       let mut previous_announcement_count = 0u64;
+       let mut previous_update_count = 0u64;
+       let mut is_caught_up_with_gossip = false;
 
-               let mut i = 0u32;
-               let mut latest_new_gossip_time = Instant::now();
-               let mut needs_to_notify_persister = false;
+       let mut i = 0u32;
+       let mut latest_new_gossip_time = Instant::now();
+       let mut needs_to_notify_persister = false;
 
-               loop {
-                       i += 1; // count the background activity
-                       let sleep = tokio::time::sleep(Duration::from_secs(5));
-                       sleep.await;
-
-                       let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-                       let router_clone = Arc::clone(&local_router);
-
-                       {
-                               let counter = router_clone.counter.read().unwrap();
-                               let total_message_count = counter.channel_announcements + counter.channel_updates;
-                               let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
-
-                               let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
-                               // TODO: make new message threshold (20) adjust based on connected peer count
-                               is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
-                               if new_message_count > 0 {
-                                       latest_new_gossip_time = Instant::now();
-                               }
+       loop {
+               i += 1; // count the background activity
+               let sleep = tokio::time::sleep(Duration::from_secs(5));
+               sleep.await;
 
-                               // if we either aren't caught up, or just stopped/started being caught up
-                               if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
-                                       println!(
-                                               "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
-                                               i,
-                                               total_message_count,
-                                               new_message_count,
-                                               counter.channel_announcements,
-                                               counter.channel_announcements_with_mismatched_scripts,
-                                               counter.channel_updates,
-                                               counter.channel_updates_without_htlc_max_msats
-                                       );
-                               } else {
-                                       println!("Monitoring for gossip…")
-                               }
+               {
+                       let counter = router.counter.read().unwrap();
+                       let total_message_count = counter.channel_announcements + counter.channel_updates;
+                       let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
 
-                               if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
-                                       println!("caught up with gossip!");
-                                       needs_to_notify_persister = true;
-                               } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
-                                       println!("Received new messages since catching up with gossip!");
-                               }
+                       let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
+                       // TODO: make new message threshold (20) adjust based on connected peer count
+                       is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
+                       if new_message_count > 0 {
+                               latest_new_gossip_time = Instant::now();
+                       }
 
-                               let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
-                               if continuous_caught_up_duration.as_secs() > 600 {
-                                       eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
-                               }
+                       // if we either aren't caught up, or just stopped/started being caught up
+                       if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
+                               log_info!(
+                                       logger,
+                                       "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
+                                       i,
+                                       total_message_count,
+                                       new_message_count,
+                                       counter.channel_announcements,
+                                       counter.channel_announcements_with_mismatched_scripts,
+                                       counter.channel_updates,
+                                       counter.channel_updates_without_htlc_max_msats
+                               );
+                       } else {
+                               log_info!(logger, "Monitoring for gossip…")
+                       }
 
-                               previous_announcement_count = counter.channel_announcements;
-                               previous_update_count = counter.channel_updates;
+                       if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
+                               log_info!(logger, "caught up with gossip!");
+                               needs_to_notify_persister = true;
+                       } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
+                               log_info!(logger, "Received new messages since catching up with gossip!");
                        }
 
-                       if needs_to_notify_persister {
-                               needs_to_notify_persister = false;
-                               let sender = local_persistence_sender.clone();
-                               tokio::spawn(async move {
-                                       let _ = sender.send(DetectedGossipMessage {
-                                               timestamp_seen: current_timestamp as u32,
-                                               message: GossipMessage::InitialSyncComplete,
-                                       }).await;
-                               });
+                       let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
+                       if continuous_caught_up_duration.as_secs() > 600 {
+                               log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
                        }
+
+                       previous_announcement_count = counter.channel_announcements;
+                       previous_update_count = counter.channel_updates;
                }
-       });
+
+               if needs_to_notify_persister {
+                       needs_to_notify_persister = false;
+                       completion_sender.send(()).await.unwrap();
+               }
+       }
 }
 
-fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
-       let peer_manager_clone = Arc::clone(&peer_manager);
-       eprintln!("Connecting to peer {}@{}…", current_peer.0.to_hex(), current_peer.1.to_string());
-       let connection = executor::block_on(async move {
-               lightning_net_tokio::connect_outbound(
-                       peer_manager_clone,
-                       current_peer.0,
-                       current_peer.1,
-               ).await
+async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>, logger: L) -> bool where L::Target: Logger {
+       // we seek to find out if the first connection attempt was successful
+       let (sender, mut receiver) = mpsc::channel::<bool>(1);
+       tokio::spawn(async move {
+               log_info!(logger, "Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
+               let mut is_first_iteration = true;
+               loop {
+                       if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
+                               Arc::clone(&peer_manager),
+                               current_peer.0,
+                               current_peer.1,
+                       ).await {
+                               log_info!(logger, "Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
+                               if is_first_iteration {
+                                       sender.send(true).await.unwrap();
+                               }
+                               disconnection_future.await;
+                               log_warn!(logger, "Disconnected from peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
+                               tokio::time::sleep(Duration::from_secs(10)).await;
+                               log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
+                       } else {
+                               if is_first_iteration {
+                                       sender.send(false).await.unwrap();
+                               }
+                       }
+                       is_first_iteration = false;
+               }
        });
-       let mut initial_connection_succeeded = false;
-       if let Some(disconnection_future) = connection {
-               eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
-               initial_connection_succeeded = true;
-               let peer_manager_clone = Arc::clone(&peer_manager);
-               tokio::spawn(async move {
-                       disconnection_future.await;
-                       eprintln!("Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
-                       monitor_peer_connection(current_peer.clone(), peer_manager_clone);
-               });
-       } else {
-               eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string())
-       };
-       initial_connection_succeeded
+
+       let success = receiver.recv().await.unwrap();
+       success
 }