Update to LDK 0.0.111, rust-bitcoin 0.29
[rapid-gossip-sync-server] / src / tracking.rs
1 use std::collections::hash_map::RandomState;
2 use std::hash::{BuildHasher, Hasher};
3 use std::net::SocketAddr;
4 use std::sync::Arc;
5 use std::time::{Duration, Instant};
6
7 use bitcoin::hashes::hex::ToHex;
8 use bitcoin::secp256k1::{PublicKey, SecretKey};
9 use lightning;
10 use lightning::ln::peer_handler::{
11         ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
12 };
13 use lightning::routing::gossip::NetworkGraph;
14 use tokio::sync::mpsc;
15
16 use crate::{config, TestLogger};
17 use crate::downloader::GossipRouter;
18 use crate::types::{GossipMessage, GossipPeerManager};
19
20 pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>,
21                 completion_sender: mpsc::Sender<()>,
22                 network_graph: Arc<NetworkGraph<TestLogger>>) {
23         let mut key = [42; 32];
24         let mut random_data = [43; 32];
25         // Get something psuedo-random from std.
26         let mut key_hasher = RandomState::new().build_hasher();
27         key_hasher.write_u8(1);
28         key[0..8].copy_from_slice(&key_hasher.finish().to_ne_bytes());
29         let mut rand_hasher = RandomState::new().build_hasher();
30         rand_hasher.write_u8(2);
31         random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
32
33         let our_node_secret = SecretKey::from_slice(&key).unwrap();
34         let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
35
36         let message_handler = MessageHandler {
37                 chan_handler: ErroringMessageHandler::new(),
38                 route_handler: Arc::clone(&router),
39                 onion_message_handler: IgnoringMessageHandler {},
40         };
41         let peer_handler = Arc::new(PeerManager::new(
42                 message_handler,
43                 our_node_secret,
44                 0xdeadbeef,
45                 &random_data,
46                 TestLogger::new(),
47                 IgnoringMessageHandler {},
48         ));
49
50         println!("Connecting to Lightning peers...");
51         let peers = config::ln_peers();
52         let mut connected_peer_count = 0;
53
54         for current_peer in peers {
55                 let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&peer_handler)).await;
56                 if initial_connection_succeeded {
57                         connected_peer_count += 1;
58                 }
59         }
60
61         if connected_peer_count < 1 {
62                 panic!("Failed to connect to any peer.");
63         }
64
65         println!("Connected to {} Lightning peers!", connected_peer_count);
66
67         tokio::spawn(async move {
68                 let mut previous_announcement_count = 0u64;
69                 let mut previous_update_count = 0u64;
70                 let mut is_caught_up_with_gossip = false;
71
72                 let mut i = 0u32;
73                 let mut latest_new_gossip_time = Instant::now();
74                 let mut needs_to_notify_persister = false;
75
76                 loop {
77                         i += 1; // count the background activity
78                         let sleep = tokio::time::sleep(Duration::from_secs(5));
79                         sleep.await;
80
81                         {
82                                 let counter = router.counter.read().unwrap();
83                                 let total_message_count = counter.channel_announcements + counter.channel_updates;
84                                 let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
85
86                                 let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
87                                 // TODO: make new message threshold (20) adjust based on connected peer count
88                                 is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
89                                 if new_message_count > 0 {
90                                         latest_new_gossip_time = Instant::now();
91                                 }
92
93                                 // if we either aren't caught up, or just stopped/started being caught up
94                                 if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
95                                         println!(
96                                                 "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
97                                                 i,
98                                                 total_message_count,
99                                                 new_message_count,
100                                                 counter.channel_announcements,
101                                                 counter.channel_announcements_with_mismatched_scripts,
102                                                 counter.channel_updates,
103                                                 counter.channel_updates_without_htlc_max_msats
104                                         );
105                                 } else {
106                                         println!("Monitoring for gossip…")
107                                 }
108
109                                 if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
110                                         println!("caught up with gossip!");
111                                         needs_to_notify_persister = true;
112                                 } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
113                                         println!("Received new messages since catching up with gossip!");
114                                 }
115
116                                 let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
117                                 if continuous_caught_up_duration.as_secs() > 600 {
118                                         eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
119                                 }
120
121                                 previous_announcement_count = counter.channel_announcements;
122                                 previous_update_count = counter.channel_updates;
123                         }
124
125                         if needs_to_notify_persister {
126                                 needs_to_notify_persister = false;
127                                 completion_sender.send(()).await.unwrap();
128                         }
129                 }
130         });
131 }
132
133 async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
134         eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
135         let connection = lightning_net_tokio::connect_outbound(
136                 Arc::clone(&peer_manager),
137                 current_peer.0,
138                 current_peer.1,
139         ).await;
140         if let Some(disconnection_future) = connection {
141                 eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
142                 tokio::spawn(async move {
143                         disconnection_future.await;
144                         loop {
145                                 eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
146                                 if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
147                                         Arc::clone(&peer_manager),
148                                         current_peer.0,
149                                         current_peer.1,
150                                 ).await {
151                                         disconnection_future.await;
152                                 } else {
153                                         tokio::time::sleep(Duration::from_secs(10)).await;
154                                 }
155                         }
156                 });
157                 true
158         } else {
159                 eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
160                 false
161         }
162 }