Merge pull request #7 from TheBlueMatt/2022-08-fix-deadlock
[rapid-gossip-sync-server] / src / tracking.rs
1 use std::net::SocketAddr;
2 use std::sync::Arc;
3 use std::time::{Duration, Instant};
4
5 use bitcoin::hashes::hex::ToHex;
6 use bitcoin::secp256k1::{PublicKey, SecretKey};
7 use lightning;
8 use lightning::ln::peer_handler::{
9         ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
10 };
11 use lightning::routing::gossip::NetworkGraph;
12 use rand::{Rng, thread_rng};
13 use tokio::sync::mpsc;
14
15 use crate::{config, TestLogger};
16 use crate::downloader::GossipRouter;
17 use crate::types::{GossipMessage, GossipPeerManager};
18
19 pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>,
20                 completion_sender: mpsc::Sender<()>,
21                 network_graph: Arc<NetworkGraph<TestLogger>>) {
22         let mut key = [0; 32];
23         let mut random_data = [0; 32];
24         thread_rng().fill_bytes(&mut key);
25         thread_rng().fill_bytes(&mut random_data);
26         let our_node_secret = SecretKey::from_slice(&key).unwrap();
27
28         let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
29
30         let message_handler = MessageHandler {
31                 chan_handler: ErroringMessageHandler::new(),
32                 route_handler: Arc::clone(&router),
33         };
34         let peer_handler = Arc::new(PeerManager::new(
35                 message_handler,
36                 our_node_secret,
37                 &random_data,
38                 TestLogger::new(),
39                 IgnoringMessageHandler {},
40         ));
41
42         println!("Connecting to Lightning peers...");
43         let peers = config::ln_peers();
44         let mut connected_peer_count = 0;
45
46         for current_peer in peers {
47                 let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&peer_handler)).await;
48                 if initial_connection_succeeded {
49                         connected_peer_count += 1;
50                 }
51         }
52
53         if connected_peer_count < 1 {
54                 panic!("Failed to connect to any peer.");
55         }
56
57         println!("Connected to {} Lightning peers!", connected_peer_count);
58
59         tokio::spawn(async move {
60                 let mut previous_announcement_count = 0u64;
61                 let mut previous_update_count = 0u64;
62                 let mut is_caught_up_with_gossip = false;
63
64                 let mut i = 0u32;
65                 let mut latest_new_gossip_time = Instant::now();
66                 let mut needs_to_notify_persister = false;
67
68                 loop {
69                         i += 1; // count the background activity
70                         let sleep = tokio::time::sleep(Duration::from_secs(5));
71                         sleep.await;
72
73                         {
74                                 let counter = router.counter.read().unwrap();
75                                 let total_message_count = counter.channel_announcements + counter.channel_updates;
76                                 let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
77
78                                 let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
79                                 // TODO: make new message threshold (20) adjust based on connected peer count
80                                 is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
81                                 if new_message_count > 0 {
82                                         latest_new_gossip_time = Instant::now();
83                                 }
84
85                                 // if we either aren't caught up, or just stopped/started being caught up
86                                 if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
87                                         println!(
88                                                 "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
89                                                 i,
90                                                 total_message_count,
91                                                 new_message_count,
92                                                 counter.channel_announcements,
93                                                 counter.channel_announcements_with_mismatched_scripts,
94                                                 counter.channel_updates,
95                                                 counter.channel_updates_without_htlc_max_msats
96                                         );
97                                 } else {
98                                         println!("Monitoring for gossip…")
99                                 }
100
101                                 if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
102                                         println!("caught up with gossip!");
103                                         needs_to_notify_persister = true;
104                                 } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
105                                         println!("Received new messages since catching up with gossip!");
106                                 }
107
108                                 let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
109                                 if continuous_caught_up_duration.as_secs() > 600 {
110                                         eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
111                                 }
112
113                                 previous_announcement_count = counter.channel_announcements;
114                                 previous_update_count = counter.channel_updates;
115                         }
116
117                         if needs_to_notify_persister {
118                                 needs_to_notify_persister = false;
119                                 completion_sender.send(()).await.unwrap();
120                         }
121                 }
122         });
123 }
124
125 async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
126         eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
127         let connection = lightning_net_tokio::connect_outbound(
128                 Arc::clone(&peer_manager),
129                 current_peer.0,
130                 current_peer.1,
131         ).await;
132         if let Some(disconnection_future) = connection {
133                 eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
134                 tokio::spawn(async move {
135                         disconnection_future.await;
136                         loop {
137                                 eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
138                                 if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
139                                         Arc::clone(&peer_manager),
140                                         current_peer.0,
141                                         current_peer.1,
142                                 ).await {
143                                         disconnection_future.await;
144                                 } else {
145                                         tokio::time::sleep(Duration::from_secs(10)).await;
146                                 }
147                         }
148                 });
149                 true
150         } else {
151                 eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
152                 false
153         }
154 }