+use std::collections::hash_map::RandomState;
+use std::hash::{BuildHasher, Hasher};
use std::net::SocketAddr;
-use std::sync::{Arc, RwLock};
-use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+use std::ops::Deref;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
-use bitcoin::hashes::hex::ToHex;
-use bitcoin::secp256k1::{PublicKey, SecretKey};
-use lightning;
+use bitcoin::secp256k1::PublicKey;
+use hex_conservative::display::DisplayHex;
use lightning::ln::peer_handler::{
ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
};
-use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
-use rand::{Rng, thread_rng};
+use lightning::{log_info, log_warn};
+use lightning::routing::gossip::NetworkGraph;
+use lightning::sign::KeysManager;
+use lightning::util::logger::Logger;
use tokio::sync::mpsc;
-
-use crate::{config, TestLogger};
-use crate::downloader::{GossipCounter, GossipRouter};
-use crate::types::{DetectedGossipMessage, GossipChainAccess, GossipMessage, GossipPeerManager};
-use crate::verifier::ChainVerifier;
-
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
- let mut key = [0; 32];
- let mut random_data = [0; 32];
- thread_rng().fill_bytes(&mut key);
- thread_rng().fill_bytes(&mut random_data);
- let our_node_secret = SecretKey::from_slice(&key).unwrap();
-
- let _arc_chain_access = None::<GossipChainAccess>;
- let arc_chain_access = Some(Arc::new(ChainVerifier::new()));
- let ignorer = IgnoringMessageHandler {};
- let arc_ignorer = Arc::new(ignorer);
-
- let errorer = ErroringMessageHandler::new();
- let arc_errorer = Arc::new(errorer);
-
- let logger = TestLogger::new();
- let arc_logger = Arc::new(logger);
-
- let router = P2PGossipSync::new(
- network_graph.clone(),
- arc_chain_access,
- Arc::clone(&arc_logger),
- );
- let arc_router = Arc::new(router);
- let wrapped_router = GossipRouter {
- native_router: arc_router,
- counter: RwLock::new(GossipCounter::new()),
- sender: persistence_sender.clone(),
- };
- let arc_wrapped_router = Arc::new(wrapped_router);
+use tokio::task::JoinSet;
+
+use crate::config;
+use crate::downloader::GossipRouter;
+use crate::types::{GossipMessage, GossipPeerManager};
+
+pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
+ completion_sender: mpsc::Sender<()>,
+ network_graph: Arc<NetworkGraph<L>>,
+ logger: L,
+) where L::Target: Logger {
+ let mut key = [42; 32];
+ let mut random_data = [43; 32];
+ // Get something psuedo-random from std.
+ let mut key_hasher = RandomState::new().build_hasher();
+ key_hasher.write_u8(1);
+ key[0..8].copy_from_slice(&key_hasher.finish().to_ne_bytes());
+ let mut rand_hasher = RandomState::new().build_hasher();
+ rand_hasher.write_u8(2);
+ random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
+
+ let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
+
+ let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
let message_handler = MessageHandler {
- chan_handler: arc_errorer,
- route_handler: arc_wrapped_router.clone(),
+ chan_handler: ErroringMessageHandler::new(),
+ route_handler: Arc::clone(&router),
+ onion_message_handler: IgnoringMessageHandler {},
+ custom_message_handler: IgnoringMessageHandler {},
};
- let peer_handler = PeerManager::new(
+ let peer_handler = Arc::new(PeerManager::new(
message_handler,
- our_node_secret,
+ 0xdeadbeef,
&random_data,
- Arc::clone(&arc_logger),
- arc_ignorer,
- );
- let arc_peer_handler = Arc::new(peer_handler);
+ logger.clone(),
+ keys_manager,
+ ));
+ router.set_pm(Arc::clone(&peer_handler));
- println!("Connecting to Lightning peers…");
+ let ph_timer = Arc::clone(&peer_handler);
+ tokio::spawn(async move {
+ let mut intvl = tokio::time::interval(Duration::from_secs(10));
+ loop {
+ intvl.tick().await;
+ ph_timer.timer_tick_occurred();
+ }
+ });
+
+ log_info!(logger, "Connecting to Lightning peers...");
let peers = config::ln_peers();
+ let mut handles = JoinSet::new();
let mut connected_peer_count = 0;
+ if peers.len() <= config::CONNECTED_PEER_ASSERTION_LIMIT {
+ log_warn!(logger, "Peer assertion threshold is {}, but only {} peers specified.", config::CONNECTED_PEER_ASSERTION_LIMIT, peers.len());
+ }
+
for current_peer in peers {
- let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&arc_peer_handler)).await;
- if initial_connection_succeeded {
- connected_peer_count += 1;
+ let peer_handler_clone = peer_handler.clone();
+ let logger_clone = logger.clone();
+ handles.spawn(async move {
+ connect_peer(current_peer, peer_handler_clone, logger_clone).await
+ });
+ }
+
+ while let Some(connection_result) = handles.join_next().await {
+ if let Ok(connection) = connection_result {
+ if connection {
+ connected_peer_count += 1;
+ if connected_peer_count >= config::CONNECTED_PEER_ASSERTION_LIMIT {
+ break;
+ }
+ }
}
}
panic!("Failed to connect to any peer.");
}
- println!("Connected to {} Lightning peers!", connected_peer_count);
+ log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count);
- let local_router = arc_wrapped_router.clone();
- let local_persistence_sender = persistence_sender.clone();
- tokio::spawn(async move {
- let mut previous_announcement_count = 0u64;
- let mut previous_update_count = 0u64;
- let mut is_caught_up_with_gossip = false;
+ let mut previous_announcement_count = 0u64;
+ let mut previous_update_count = 0u64;
+ let mut is_caught_up_with_gossip = false;
- let mut i = 0u32;
- let mut latest_new_gossip_time = Instant::now();
- let mut needs_to_notify_persister = false;
+ let mut i = 0u32;
+ let mut latest_new_gossip_time = Instant::now();
+ let mut needs_to_notify_persister = false;
- loop {
- i += 1; // count the background activity
- let sleep = tokio::time::sleep(Duration::from_secs(5));
- sleep.await;
-
- let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
- let router_clone = Arc::clone(&local_router);
-
- {
- let counter = router_clone.counter.read().unwrap();
- let total_message_count = counter.channel_announcements + counter.channel_updates;
- let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
-
- let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
- // TODO: make new message threshold (20) adjust based on connected peer count
- is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
- if new_message_count > 0 {
- latest_new_gossip_time = Instant::now();
- }
+ loop {
+ i += 1; // count the background activity
+ let sleep = tokio::time::sleep(Duration::from_secs(5));
+ sleep.await;
- // if we either aren't caught up, or just stopped/started being caught up
- if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
- println!(
- "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
- i,
- total_message_count,
- new_message_count,
- counter.channel_announcements,
- counter.channel_announcements_with_mismatched_scripts,
- counter.channel_updates,
- counter.channel_updates_without_htlc_max_msats
- );
- } else {
- println!("Monitoring for gossip…")
- }
+ {
+ let counter = router.counter.read().unwrap();
+ let total_message_count = counter.channel_announcements + counter.channel_updates;
+ let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
- if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
- println!("caught up with gossip!");
- needs_to_notify_persister = true;
- } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
- println!("Received new messages since catching up with gossip!");
- }
+ let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
+ // TODO: make new message threshold (20) adjust based on connected peer count
+ is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
+ if new_message_count > 0 {
+ latest_new_gossip_time = Instant::now();
+ }
- let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
- if continuous_caught_up_duration.as_secs() > 600 {
- eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
- }
+ // if we either aren't caught up, or just stopped/started being caught up
+ if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
+ log_info!(
+ logger,
+ "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
+ i,
+ total_message_count,
+ new_message_count,
+ counter.channel_announcements,
+ counter.channel_announcements_with_mismatched_scripts,
+ counter.channel_updates,
+ counter.channel_updates_without_htlc_max_msats
+ );
+ } else {
+ log_info!(logger, "Monitoring for gossip…")
+ }
- previous_announcement_count = counter.channel_announcements;
- previous_update_count = counter.channel_updates;
+ if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
+ log_info!(logger, "caught up with gossip!");
+ needs_to_notify_persister = true;
+ } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
+ log_info!(logger, "Received new messages since catching up with gossip!");
}
- if needs_to_notify_persister {
- needs_to_notify_persister = false;
- let sender = local_persistence_sender.clone();
- tokio::spawn(async move {
- let _ = sender.send(DetectedGossipMessage {
- timestamp_seen: current_timestamp as u32,
- message: GossipMessage::InitialSyncComplete,
- }).await;
- });
+ let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
+ if continuous_caught_up_duration.as_secs() > 600 {
+ log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
}
+
+ previous_announcement_count = counter.channel_announcements;
+ previous_update_count = counter.channel_updates;
}
- });
+
+ if needs_to_notify_persister {
+ needs_to_notify_persister = false;
+ completion_sender.send(()).await.unwrap();
+ }
+ }
}
-async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
- eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
- let connection = lightning_net_tokio::connect_outbound(
- Arc::clone(&peer_manager),
- current_peer.0,
- current_peer.1,
- ).await;
- if let Some(disconnection_future) = connection {
- eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
- tokio::spawn(async move {
- disconnection_future.await;
- loop {
- eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
- if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
- Arc::clone(&peer_manager),
- current_peer.0,
- current_peer.1,
- ).await {
- disconnection_future.await;
- } else {
- tokio::time::sleep(Duration::from_secs(10)).await;
+async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>, logger: L) -> bool where L::Target: Logger {
+ // we seek to find out if the first connection attempt was successful
+ let (sender, mut receiver) = mpsc::channel::<bool>(1);
+ tokio::spawn(async move {
+ let current_peer_pubkey_hex = current_peer.0.serialize().to_lower_hex_string();
+ log_info!(logger, "Connecting to peer {}@{}...", current_peer_pubkey_hex, current_peer.1);
+ let mut is_first_iteration = true;
+ loop {
+ if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
+ Arc::clone(&peer_manager),
+ current_peer.0,
+ current_peer.1,
+ ).await {
+ log_info!(logger, "Connected to peer {}@{}!", current_peer_pubkey_hex, current_peer.1);
+ if is_first_iteration {
+ sender.send(true).await.unwrap();
+ }
+ disconnection_future.await;
+ log_warn!(logger, "Disconnected from peer {}@{}", current_peer_pubkey_hex, current_peer.1);
+ } else {
+ log_warn!(logger, "Failed to connect to peer {}@{}!", current_peer_pubkey_hex, current_peer.1);
+ if is_first_iteration {
+ sender.send(false).await.unwrap();
}
}
- });
- true
- } else {
- eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
- false
- }
+ is_first_iteration = false;
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer_pubkey_hex, current_peer.1);
+ }
+ });
+
+ let success = receiver.recv().await.unwrap();
+ success
}