]> git.bitcoin.ninja Git - rapid-gossip-sync-server/blob - src/tracking.rs
Fix multiplication overflow bug.
[rapid-gossip-sync-server] / src / tracking.rs
1 use std::collections::hash_map::RandomState;
2 use std::hash::{BuildHasher, Hasher};
3 use std::net::SocketAddr;
4 use std::ops::Deref;
5 use std::sync::Arc;
6 use std::time::{Duration, Instant};
7
8 use bitcoin::hashes::hex::ToHex;
9 use bitcoin::secp256k1::PublicKey;
10 use lightning;
11 use lightning::ln::peer_handler::{
12         ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
13 };
14 use lightning::{log_error, log_info, log_warn};
15 use lightning::routing::gossip::NetworkGraph;
16 use lightning::sign::KeysManager;
17 use lightning::util::logger::Logger;
18 use tokio::sync::mpsc;
19
20 use crate::config;
21 use crate::downloader::GossipRouter;
22 use crate::types::{GossipMessage, GossipPeerManager};
23
24 pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
25                 completion_sender: mpsc::Sender<()>,
26                 network_graph: Arc<NetworkGraph<L>>,
27                 logger: L
28 ) where L::Target: Logger {
29         let mut key = [42; 32];
30         let mut random_data = [43; 32];
31         // Get something psuedo-random from std.
32         let mut key_hasher = RandomState::new().build_hasher();
33         key_hasher.write_u8(1);
34         key[0..8].copy_from_slice(&key_hasher.finish().to_ne_bytes());
35         let mut rand_hasher = RandomState::new().build_hasher();
36         rand_hasher.write_u8(2);
37         random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
38
39         let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
40
41         let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
42
43         let message_handler = MessageHandler {
44                 chan_handler: ErroringMessageHandler::new(),
45                 route_handler: Arc::clone(&router),
46                 onion_message_handler: IgnoringMessageHandler {},
47                 custom_message_handler: IgnoringMessageHandler {},
48         };
49         let peer_handler = Arc::new(PeerManager::new(
50                 message_handler,
51                 0xdeadbeef,
52                 &random_data,
53                 logger.clone(),
54                 keys_manager,
55         ));
56         router.set_pm(Arc::clone(&peer_handler));
57
58         let ph_timer = Arc::clone(&peer_handler);
59         tokio::spawn(async move {
60                 let mut intvl = tokio::time::interval(Duration::from_secs(10));
61                 loop {
62                         intvl.tick().await;
63                         ph_timer.timer_tick_occurred();
64                 }
65         });
66
67         log_info!(logger, "Connecting to Lightning peers...");
68         let peers = config::ln_peers();
69         let mut connected_peer_count = 0;
70
71         for current_peer in peers {
72                 let initial_connection_succeeded = connect_peer(current_peer, peer_handler.clone(), logger.clone()).await;
73                 if initial_connection_succeeded {
74                         connected_peer_count += 1;
75                 }
76         }
77
78         if connected_peer_count < 1 {
79                 panic!("Failed to connect to any peer.");
80         }
81
82         log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count);
83
84         tokio::spawn(async move {
85                 let mut previous_announcement_count = 0u64;
86                 let mut previous_update_count = 0u64;
87                 let mut is_caught_up_with_gossip = false;
88
89                 let mut i = 0u32;
90                 let mut latest_new_gossip_time = Instant::now();
91                 let mut needs_to_notify_persister = false;
92
93                 loop {
94                         i += 1; // count the background activity
95                         let sleep = tokio::time::sleep(Duration::from_secs(5));
96                         sleep.await;
97
98                         {
99                                 let counter = router.counter.read().unwrap();
100                                 let total_message_count = counter.channel_announcements + counter.channel_updates;
101                                 let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
102
103                                 let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
104                                 // TODO: make new message threshold (20) adjust based on connected peer count
105                                 is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
106                                 if new_message_count > 0 {
107                                         latest_new_gossip_time = Instant::now();
108                                 }
109
110                                 // if we either aren't caught up, or just stopped/started being caught up
111                                 if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
112                                         log_info!(
113                                                 logger,
114                                                 "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
115                                                 i,
116                                                 total_message_count,
117                                                 new_message_count,
118                                                 counter.channel_announcements,
119                                                 counter.channel_announcements_with_mismatched_scripts,
120                                                 counter.channel_updates,
121                                                 counter.channel_updates_without_htlc_max_msats
122                                         );
123                                 } else {
124                                         log_info!(logger, "Monitoring for gossip…")
125                                 }
126
127                                 if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
128                                         log_info!(logger, "caught up with gossip!");
129                                         needs_to_notify_persister = true;
130                                 } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
131                                         log_info!(logger, "Received new messages since catching up with gossip!");
132                                 }
133
134                                 let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
135                                 if continuous_caught_up_duration.as_secs() > 600 {
136                                         log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
137                                 }
138
139                                 previous_announcement_count = counter.channel_announcements;
140                                 previous_update_count = counter.channel_updates;
141                         }
142
143                         if needs_to_notify_persister {
144                                 needs_to_notify_persister = false;
145                                 completion_sender.send(()).await.unwrap();
146                         }
147                 }
148         });
149 }
150
151 async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>, logger: L) -> bool where L::Target: Logger {
152         log_info!(logger, "Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
153         let connection = lightning_net_tokio::connect_outbound(
154                 Arc::clone(&peer_manager),
155                 current_peer.0,
156                 current_peer.1,
157         ).await;
158         if let Some(disconnection_future) = connection {
159                 log_info!(logger, "Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
160                 tokio::spawn(async move {
161                         disconnection_future.await;
162                         loop {
163                                 log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
164                                 if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
165                                         Arc::clone(&peer_manager),
166                                         current_peer.0,
167                                         current_peer.1,
168                                 ).await {
169                                         disconnection_future.await;
170                                 }
171                                 tokio::time::sleep(Duration::from_secs(10)).await;
172                         }
173                 });
174                 true
175         } else {
176                 log_error!(logger, "Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
177                 false
178         }
179 }