1 use std::collections::hash_map::RandomState;
2 use std::hash::{BuildHasher, Hasher};
3 use std::net::SocketAddr;
5 use std::time::{Duration, Instant};
7 use bitcoin::hashes::hex::ToHex;
8 use bitcoin::secp256k1::PublicKey;
10 use lightning::ln::peer_handler::{
11 ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
13 use lightning::routing::gossip::NetworkGraph;
14 use lightning::sign::KeysManager;
15 use lightning::util::logger::Logger;
16 use tokio::sync::mpsc;
19 use crate::downloader::GossipRouter;
20 use crate::types::{GossipMessage, GossipPeerManager};
22 pub(crate) async fn download_gossip<L: Logger + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
23 completion_sender: mpsc::Sender<()>,
24 network_graph: Arc<NetworkGraph<Arc<L>>>,
27 let mut key = [42; 32];
28 let mut random_data = [43; 32];
29 // Get something psuedo-random from std.
30 let mut key_hasher = RandomState::new().build_hasher();
31 key_hasher.write_u8(1);
32 key[0..8].copy_from_slice(&key_hasher.finish().to_ne_bytes());
33 let mut rand_hasher = RandomState::new().build_hasher();
34 rand_hasher.write_u8(2);
35 random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
37 let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
39 let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
41 let message_handler = MessageHandler {
42 chan_handler: ErroringMessageHandler::new(),
43 route_handler: Arc::clone(&router),
44 onion_message_handler: IgnoringMessageHandler {},
45 custom_message_handler: IgnoringMessageHandler {},
47 let peer_handler = Arc::new(PeerManager::new(
54 router.set_pm(Arc::clone(&peer_handler));
56 let ph_timer = Arc::clone(&peer_handler);
57 tokio::spawn(async move {
58 let mut intvl = tokio::time::interval(Duration::from_secs(10));
61 ph_timer.timer_tick_occurred();
65 println!("Connecting to Lightning peers...");
66 let peers = config::ln_peers();
67 let mut connected_peer_count = 0;
69 for current_peer in peers {
70 let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&peer_handler)).await;
71 if initial_connection_succeeded {
72 connected_peer_count += 1;
76 if connected_peer_count < 1 {
77 panic!("Failed to connect to any peer.");
80 println!("Connected to {} Lightning peers!", connected_peer_count);
82 tokio::spawn(async move {
83 let mut previous_announcement_count = 0u64;
84 let mut previous_update_count = 0u64;
85 let mut is_caught_up_with_gossip = false;
88 let mut latest_new_gossip_time = Instant::now();
89 let mut needs_to_notify_persister = false;
92 i += 1; // count the background activity
93 let sleep = tokio::time::sleep(Duration::from_secs(5));
97 let counter = router.counter.read().unwrap();
98 let total_message_count = counter.channel_announcements + counter.channel_updates;
99 let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
101 let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
102 // TODO: make new message threshold (20) adjust based on connected peer count
103 is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
104 if new_message_count > 0 {
105 latest_new_gossip_time = Instant::now();
108 // if we either aren't caught up, or just stopped/started being caught up
109 if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
111 "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
115 counter.channel_announcements,
116 counter.channel_announcements_with_mismatched_scripts,
117 counter.channel_updates,
118 counter.channel_updates_without_htlc_max_msats
121 println!("Monitoring for gossip…")
124 if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
125 println!("caught up with gossip!");
126 needs_to_notify_persister = true;
127 } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
128 println!("Received new messages since catching up with gossip!");
131 let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
132 if continuous_caught_up_duration.as_secs() > 600 {
133 eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
136 previous_announcement_count = counter.channel_announcements;
137 previous_update_count = counter.channel_updates;
140 if needs_to_notify_persister {
141 needs_to_notify_persister = false;
142 completion_sender.send(()).await.unwrap();
148 async fn connect_peer<L: Logger + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>) -> bool {
149 eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
150 let connection = lightning_net_tokio::connect_outbound(
151 Arc::clone(&peer_manager),
155 if let Some(disconnection_future) = connection {
156 eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
157 tokio::spawn(async move {
158 disconnection_future.await;
160 eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
161 if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
162 Arc::clone(&peer_manager),
166 disconnection_future.await;
168 tokio::time::sleep(Duration::from_secs(10)).await;
173 eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());