Bump LDK to 0.0.121/rust-bitcoin 0.30, bumping MSRV to 1.63
[rapid-gossip-sync-server] / src / tracking.rs
1 use std::collections::hash_map::RandomState;
2 use std::hash::{BuildHasher, Hasher};
3 use std::net::SocketAddr;
4 use std::ops::Deref;
5 use std::sync::Arc;
6 use std::time::{Duration, Instant};
7
8 use bitcoin::secp256k1::PublicKey;
9 use hex_conservative::display::DisplayHex;
10 use lightning::ln::peer_handler::{
11         ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
12 };
13 use lightning::{log_info, log_warn};
14 use lightning::routing::gossip::NetworkGraph;
15 use lightning::sign::KeysManager;
16 use lightning::util::logger::Logger;
17 use tokio::sync::mpsc;
18 use tokio::task::JoinSet;
19
20 use crate::config;
21 use crate::downloader::GossipRouter;
22 use crate::types::{GossipMessage, GossipPeerManager};
23
24 pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
25         completion_sender: mpsc::Sender<()>,
26         network_graph: Arc<NetworkGraph<L>>,
27         logger: L,
28 ) where L::Target: Logger {
29         let mut key = [42; 32];
30         let mut random_data = [43; 32];
31         // Get something psuedo-random from std.
32         let mut key_hasher = RandomState::new().build_hasher();
33         key_hasher.write_u8(1);
34         key[0..8].copy_from_slice(&key_hasher.finish().to_ne_bytes());
35         let mut rand_hasher = RandomState::new().build_hasher();
36         rand_hasher.write_u8(2);
37         random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
38
39         let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
40
41         let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
42
43         let message_handler = MessageHandler {
44                 chan_handler: ErroringMessageHandler::new(),
45                 route_handler: Arc::clone(&router),
46                 onion_message_handler: IgnoringMessageHandler {},
47                 custom_message_handler: IgnoringMessageHandler {},
48         };
49         let peer_handler = Arc::new(PeerManager::new(
50                 message_handler,
51                 0xdeadbeef,
52                 &random_data,
53                 logger.clone(),
54                 keys_manager,
55         ));
56         router.set_pm(Arc::clone(&peer_handler));
57
58         let ph_timer = Arc::clone(&peer_handler);
59         tokio::spawn(async move {
60                 let mut intvl = tokio::time::interval(Duration::from_secs(10));
61                 loop {
62                         intvl.tick().await;
63                         ph_timer.timer_tick_occurred();
64                 }
65         });
66
67         log_info!(logger, "Connecting to Lightning peers...");
68         let peers = config::ln_peers();
69         let mut handles = JoinSet::new();
70         let mut connected_peer_count = 0;
71
72         if peers.len() <= config::CONNECTED_PEER_ASSERTION_LIMIT {
73                 log_warn!(logger, "Peer assertion threshold is {}, but only {} peers specified.", config::CONNECTED_PEER_ASSERTION_LIMIT, peers.len());
74         }
75
76         for current_peer in peers {
77                 let peer_handler_clone = peer_handler.clone();
78                 let logger_clone = logger.clone();
79                 handles.spawn(async move {
80                         connect_peer(current_peer, peer_handler_clone, logger_clone).await
81                 });
82         }
83
84         while let Some(connection_result) = handles.join_next().await {
85                 if let Ok(connection) = connection_result {
86                         if connection {
87                                 connected_peer_count += 1;
88                                 if connected_peer_count >= config::CONNECTED_PEER_ASSERTION_LIMIT {
89                                         break;
90                                 }
91                         }
92                 }
93         }
94
95         if connected_peer_count < 1 {
96                 panic!("Failed to connect to any peer.");
97         }
98
99         log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count);
100
101         let mut previous_announcement_count = 0u64;
102         let mut previous_update_count = 0u64;
103         let mut is_caught_up_with_gossip = false;
104
105         let mut i = 0u32;
106         let mut latest_new_gossip_time = Instant::now();
107         let mut needs_to_notify_persister = false;
108
109         loop {
110                 i += 1; // count the background activity
111                 let sleep = tokio::time::sleep(Duration::from_secs(5));
112                 sleep.await;
113
114                 {
115                         let counter = router.counter.read().unwrap();
116                         let total_message_count = counter.channel_announcements + counter.channel_updates;
117                         let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
118
119                         let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
120                         // TODO: make new message threshold (20) adjust based on connected peer count
121                         is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
122                         if new_message_count > 0 {
123                                 latest_new_gossip_time = Instant::now();
124                         }
125
126                         // if we either aren't caught up, or just stopped/started being caught up
127                         if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
128                                 log_info!(
129                                         logger,
130                                         "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
131                                         i,
132                                         total_message_count,
133                                         new_message_count,
134                                         counter.channel_announcements,
135                                         counter.channel_announcements_with_mismatched_scripts,
136                                         counter.channel_updates,
137                                         counter.channel_updates_without_htlc_max_msats
138                                 );
139                         } else {
140                                 log_info!(logger, "Monitoring for gossip…")
141                         }
142
143                         if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
144                                 log_info!(logger, "caught up with gossip!");
145                                 needs_to_notify_persister = true;
146                         } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
147                                 log_info!(logger, "Received new messages since catching up with gossip!");
148                         }
149
150                         let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
151                         if continuous_caught_up_duration.as_secs() > 600 {
152                                 log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
153                         }
154
155                         previous_announcement_count = counter.channel_announcements;
156                         previous_update_count = counter.channel_updates;
157                 }
158
159                 if needs_to_notify_persister {
160                         needs_to_notify_persister = false;
161                         completion_sender.send(()).await.unwrap();
162                 }
163         }
164 }
165
166 async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>, logger: L) -> bool where L::Target: Logger {
167         // we seek to find out if the first connection attempt was successful
168         let (sender, mut receiver) = mpsc::channel::<bool>(1);
169         tokio::spawn(async move {
170                 let current_peer_pubkey_hex = current_peer.0.serialize().to_lower_hex_string();
171                 log_info!(logger, "Connecting to peer {}@{}...", current_peer_pubkey_hex, current_peer.1);
172                 let mut is_first_iteration = true;
173                 loop {
174                         if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
175                                 Arc::clone(&peer_manager),
176                                 current_peer.0,
177                                 current_peer.1,
178                         ).await {
179                                 log_info!(logger, "Connected to peer {}@{}!", current_peer_pubkey_hex, current_peer.1);
180                                 if is_first_iteration {
181                                         sender.send(true).await.unwrap();
182                                 }
183                                 disconnection_future.await;
184                                 log_warn!(logger, "Disconnected from peer {}@{}", current_peer_pubkey_hex, current_peer.1);
185                         } else {
186                                 log_warn!(logger, "Failed to connect to peer {}@{}!", current_peer_pubkey_hex, current_peer.1);
187                                 if is_first_iteration {
188                                         sender.send(false).await.unwrap();
189                                 }
190                         }
191                         is_first_iteration = false;
192                         tokio::time::sleep(Duration::from_secs(10)).await;
193                         log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer_pubkey_hex, current_peer.1);
194                 }
195         });
196
197         let success = receiver.recv().await.unwrap();
198         success
199 }