99166afdfe5e7d18a88a8b7c837bc56609dbe601
[rapid-gossip-sync-server] / src / tracking.rs
1 use std::collections::hash_map::RandomState;
2 use std::hash::{BuildHasher, Hasher};
3 use std::net::SocketAddr;
4 use std::ops::Deref;
5 use std::sync::Arc;
6 use std::time::{Duration, Instant};
7
8 use bitcoin::hashes::hex::ToHex;
9 use bitcoin::secp256k1::PublicKey;
10 use lightning;
11 use lightning::ln::peer_handler::{
12         ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
13 };
14 use lightning::routing::gossip::NetworkGraph;
15 use lightning::sign::KeysManager;
16 use lightning::util::logger::Logger;
17 use tokio::sync::mpsc;
18
19 use crate::config;
20 use crate::downloader::GossipRouter;
21 use crate::types::{GossipMessage, GossipPeerManager};
22
23 pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
24                 completion_sender: mpsc::Sender<()>,
25                 network_graph: Arc<NetworkGraph<L>>,
26                 logger: L
27 ) where L::Target: Logger {
28         let mut key = [42; 32];
29         let mut random_data = [43; 32];
30         // Get something psuedo-random from std.
31         let mut key_hasher = RandomState::new().build_hasher();
32         key_hasher.write_u8(1);
33         key[0..8].copy_from_slice(&key_hasher.finish().to_ne_bytes());
34         let mut rand_hasher = RandomState::new().build_hasher();
35         rand_hasher.write_u8(2);
36         random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
37
38         let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
39
40         let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
41
42         let message_handler = MessageHandler {
43                 chan_handler: ErroringMessageHandler::new(),
44                 route_handler: Arc::clone(&router),
45                 onion_message_handler: IgnoringMessageHandler {},
46                 custom_message_handler: IgnoringMessageHandler {},
47         };
48         let peer_handler = Arc::new(PeerManager::new(
49                 message_handler,
50                 0xdeadbeef,
51                 &random_data,
52                 logger,
53                 keys_manager,
54         ));
55         router.set_pm(Arc::clone(&peer_handler));
56
57         let ph_timer = Arc::clone(&peer_handler);
58         tokio::spawn(async move {
59                 let mut intvl = tokio::time::interval(Duration::from_secs(10));
60                 loop {
61                         intvl.tick().await;
62                         ph_timer.timer_tick_occurred();
63                 }
64         });
65
66         println!("Connecting to Lightning peers...");
67         let peers = config::ln_peers();
68         let mut connected_peer_count = 0;
69
70         for current_peer in peers {
71                 let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&peer_handler)).await;
72                 if initial_connection_succeeded {
73                         connected_peer_count += 1;
74                 }
75         }
76
77         if connected_peer_count < 1 {
78                 panic!("Failed to connect to any peer.");
79         }
80
81         println!("Connected to {} Lightning peers!", connected_peer_count);
82
83         tokio::spawn(async move {
84                 let mut previous_announcement_count = 0u64;
85                 let mut previous_update_count = 0u64;
86                 let mut is_caught_up_with_gossip = false;
87
88                 let mut i = 0u32;
89                 let mut latest_new_gossip_time = Instant::now();
90                 let mut needs_to_notify_persister = false;
91
92                 loop {
93                         i += 1; // count the background activity
94                         let sleep = tokio::time::sleep(Duration::from_secs(5));
95                         sleep.await;
96
97                         {
98                                 let counter = router.counter.read().unwrap();
99                                 let total_message_count = counter.channel_announcements + counter.channel_updates;
100                                 let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
101
102                                 let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
103                                 // TODO: make new message threshold (20) adjust based on connected peer count
104                                 is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
105                                 if new_message_count > 0 {
106                                         latest_new_gossip_time = Instant::now();
107                                 }
108
109                                 // if we either aren't caught up, or just stopped/started being caught up
110                                 if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
111                                         println!(
112                                                 "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
113                                                 i,
114                                                 total_message_count,
115                                                 new_message_count,
116                                                 counter.channel_announcements,
117                                                 counter.channel_announcements_with_mismatched_scripts,
118                                                 counter.channel_updates,
119                                                 counter.channel_updates_without_htlc_max_msats
120                                         );
121                                 } else {
122                                         println!("Monitoring for gossip…")
123                                 }
124
125                                 if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
126                                         println!("caught up with gossip!");
127                                         needs_to_notify_persister = true;
128                                 } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
129                                         println!("Received new messages since catching up with gossip!");
130                                 }
131
132                                 let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
133                                 if continuous_caught_up_duration.as_secs() > 600 {
134                                         eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
135                                 }
136
137                                 previous_announcement_count = counter.channel_announcements;
138                                 previous_update_count = counter.channel_updates;
139                         }
140
141                         if needs_to_notify_persister {
142                                 needs_to_notify_persister = false;
143                                 completion_sender.send(()).await.unwrap();
144                         }
145                 }
146         });
147 }
148
149 async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>) -> bool where L::Target: Logger {
150         eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
151         let connection = lightning_net_tokio::connect_outbound(
152                 Arc::clone(&peer_manager),
153                 current_peer.0,
154                 current_peer.1,
155         ).await;
156         if let Some(disconnection_future) = connection {
157                 eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
158                 tokio::spawn(async move {
159                         disconnection_future.await;
160                         loop {
161                                 eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
162                                 if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
163                                         Arc::clone(&peer_manager),
164                                         current_peer.0,
165                                         current_peer.1,
166                                 ).await {
167                                         disconnection_future.await;
168                                 }
169                                 tokio::time::sleep(Duration::from_secs(10)).await;
170                         }
171                 });
172                 true
173         } else {
174                 eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
175                 false
176         }
177 }