1 use std::net::SocketAddr;
3 use std::time::{Duration, Instant};
5 use bitcoin::hashes::hex::ToHex;
6 use bitcoin::secp256k1::{PublicKey, SecretKey};
8 use lightning::ln::peer_handler::{
9 ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
11 use lightning::routing::gossip::NetworkGraph;
12 use rand::{Rng, thread_rng};
13 use tokio::sync::mpsc;
15 use crate::{config, TestLogger};
16 use crate::downloader::GossipRouter;
17 use crate::types::{GossipMessage, GossipPeerManager};
19 pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
20 let mut key = [0; 32];
21 let mut random_data = [0; 32];
22 thread_rng().fill_bytes(&mut key);
23 thread_rng().fill_bytes(&mut random_data);
24 let our_node_secret = SecretKey::from_slice(&key).unwrap();
26 let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
28 let message_handler = MessageHandler {
29 chan_handler: ErroringMessageHandler::new(),
30 route_handler: Arc::clone(&router),
32 let peer_handler = Arc::new(PeerManager::new(
36 Arc::new(TestLogger::new()),
37 IgnoringMessageHandler {},
40 println!("Connecting to Lightning peers...");
41 let peers = config::ln_peers();
42 let mut connected_peer_count = 0;
44 for current_peer in peers {
45 let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&peer_handler)).await;
46 if initial_connection_succeeded {
47 connected_peer_count += 1;
51 if connected_peer_count < 1 {
52 panic!("Failed to connect to any peer.");
55 println!("Connected to {} Lightning peers!", connected_peer_count);
57 tokio::spawn(async move {
58 let mut previous_announcement_count = 0u64;
59 let mut previous_update_count = 0u64;
60 let mut is_caught_up_with_gossip = false;
63 let mut latest_new_gossip_time = Instant::now();
64 let mut needs_to_notify_persister = false;
67 i += 1; // count the background activity
68 let sleep = tokio::time::sleep(Duration::from_secs(5));
72 let counter = router.counter.read().unwrap();
73 let total_message_count = counter.channel_announcements + counter.channel_updates;
74 let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
76 let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
77 // TODO: make new message threshold (20) adjust based on connected peer count
78 is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
79 if new_message_count > 0 {
80 latest_new_gossip_time = Instant::now();
83 // if we either aren't caught up, or just stopped/started being caught up
84 if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
86 "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
90 counter.channel_announcements,
91 counter.channel_announcements_with_mismatched_scripts,
92 counter.channel_updates,
93 counter.channel_updates_without_htlc_max_msats
96 println!("Monitoring for gossip…")
99 if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
100 println!("caught up with gossip!");
101 needs_to_notify_persister = true;
102 } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
103 println!("Received new messages since catching up with gossip!");
106 let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
107 if continuous_caught_up_duration.as_secs() > 600 {
108 eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
111 previous_announcement_count = counter.channel_announcements;
112 previous_update_count = counter.channel_updates;
115 if needs_to_notify_persister {
116 needs_to_notify_persister = false;
117 persistence_sender.send(GossipMessage::InitialSyncComplete).await.unwrap();
123 async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
124 eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
125 let connection = lightning_net_tokio::connect_outbound(
126 Arc::clone(&peer_manager),
130 if let Some(disconnection_future) = connection {
131 eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
132 tokio::spawn(async move {
133 disconnection_future.await;
135 eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
136 if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
137 Arc::clone(&peer_manager),
141 disconnection_future.await;
143 tokio::time::sleep(Duration::from_secs(10)).await;
149 eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());