Don't spawn new single-threaded executors, stay inside tokio
[rapid-gossip-sync-server] / src / tracking.rs
1 use std::net::SocketAddr;
2 use std::sync::{Arc, RwLock};
3 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5 use bitcoin::hashes::hex::ToHex;
6 use bitcoin::secp256k1::{PublicKey, SecretKey};
7 use lightning;
8 use lightning::ln::peer_handler::{
9         ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
10 };
11 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
12 use rand::{Rng, thread_rng};
13 use tokio::sync::mpsc;
14
15 use crate::{config, TestLogger};
16 use crate::downloader::{GossipCounter, GossipRouter};
17 use crate::types::{DetectedGossipMessage, GossipChainAccess, GossipMessage, GossipPeerManager};
18 use crate::verifier::ChainVerifier;
19
20 pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
21         let mut key = [0; 32];
22         let mut random_data = [0; 32];
23         thread_rng().fill_bytes(&mut key);
24         thread_rng().fill_bytes(&mut random_data);
25         let our_node_secret = SecretKey::from_slice(&key).unwrap();
26
27         let _arc_chain_access = None::<GossipChainAccess>;
28         let arc_chain_access = Some(Arc::new(ChainVerifier::new()));
29         let ignorer = IgnoringMessageHandler {};
30         let arc_ignorer = Arc::new(ignorer);
31
32         let errorer = ErroringMessageHandler::new();
33         let arc_errorer = Arc::new(errorer);
34
35         let logger = TestLogger::new();
36         let arc_logger = Arc::new(logger);
37
38         let router = P2PGossipSync::new(
39                 network_graph.clone(),
40                 arc_chain_access,
41                 Arc::clone(&arc_logger),
42         );
43         let arc_router = Arc::new(router);
44         let wrapped_router = GossipRouter {
45                 native_router: arc_router,
46                 counter: RwLock::new(GossipCounter::new()),
47                 sender: persistence_sender.clone(),
48         };
49         let arc_wrapped_router = Arc::new(wrapped_router);
50
51         let message_handler = MessageHandler {
52                 chan_handler: arc_errorer,
53                 route_handler: arc_wrapped_router.clone(),
54         };
55         let peer_handler = PeerManager::new(
56                 message_handler,
57                 our_node_secret,
58                 &random_data,
59                 Arc::clone(&arc_logger),
60                 arc_ignorer,
61         );
62         let arc_peer_handler = Arc::new(peer_handler);
63
64         println!("Connecting to Lightning peers…");
65         let peers = config::ln_peers();
66         let mut connected_peer_count = 0;
67
68         for current_peer in peers {
69                 let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&arc_peer_handler)).await;
70                 if initial_connection_succeeded {
71                         connected_peer_count += 1;
72                 }
73         }
74
75         if connected_peer_count < 1 {
76                 panic!("Failed to connect to any peer.");
77         }
78
79         println!("Connected to {} Lightning peers!", connected_peer_count);
80
81         let local_router = arc_wrapped_router.clone();
82         let local_persistence_sender = persistence_sender.clone();
83         tokio::spawn(async move {
84                 let mut previous_announcement_count = 0u64;
85                 let mut previous_update_count = 0u64;
86                 let mut is_caught_up_with_gossip = false;
87
88                 let mut i = 0u32;
89                 let mut latest_new_gossip_time = Instant::now();
90                 let mut needs_to_notify_persister = false;
91
92                 loop {
93                         i += 1; // count the background activity
94                         let sleep = tokio::time::sleep(Duration::from_secs(5));
95                         sleep.await;
96
97                         let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
98                         let router_clone = Arc::clone(&local_router);
99
100                         {
101                                 let counter = router_clone.counter.read().unwrap();
102                                 let total_message_count = counter.channel_announcements + counter.channel_updates;
103                                 let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
104
105                                 let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
106                                 // TODO: make new message threshold (20) adjust based on connected peer count
107                                 is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
108                                 if new_message_count > 0 {
109                                         latest_new_gossip_time = Instant::now();
110                                 }
111
112                                 // if we either aren't caught up, or just stopped/started being caught up
113                                 if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
114                                         println!(
115                                                 "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
116                                                 i,
117                                                 total_message_count,
118                                                 new_message_count,
119                                                 counter.channel_announcements,
120                                                 counter.channel_announcements_with_mismatched_scripts,
121                                                 counter.channel_updates,
122                                                 counter.channel_updates_without_htlc_max_msats
123                                         );
124                                 } else {
125                                         println!("Monitoring for gossip…")
126                                 }
127
128                                 if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
129                                         println!("caught up with gossip!");
130                                         needs_to_notify_persister = true;
131                                 } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
132                                         println!("Received new messages since catching up with gossip!");
133                                 }
134
135                                 let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
136                                 if continuous_caught_up_duration.as_secs() > 600 {
137                                         eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
138                                 }
139
140                                 previous_announcement_count = counter.channel_announcements;
141                                 previous_update_count = counter.channel_updates;
142                         }
143
144                         if needs_to_notify_persister {
145                                 needs_to_notify_persister = false;
146                                 let sender = local_persistence_sender.clone();
147                                 tokio::spawn(async move {
148                                         let _ = sender.send(DetectedGossipMessage {
149                                                 timestamp_seen: current_timestamp as u32,
150                                                 message: GossipMessage::InitialSyncComplete,
151                                         }).await;
152                                 });
153                         }
154                 }
155         });
156 }
157
158 async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
159         eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
160         let connection = lightning_net_tokio::connect_outbound(
161                 Arc::clone(&peer_manager),
162                 current_peer.0,
163                 current_peer.1,
164         ).await;
165         if let Some(disconnection_future) = connection {
166                 eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
167                 tokio::spawn(async move {
168                         disconnection_future.await;
169                         loop {
170                                 eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
171                                 if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
172                                         Arc::clone(&peer_manager),
173                                         current_peer.0,
174                                         current_peer.1,
175                                 ).await {
176                                         disconnection_future.await;
177                                 } else {
178                                         tokio::time::sleep(Duration::from_secs(10)).await;
179                                 }
180                         }
181                 });
182                 true
183         } else {
184                 eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
185                 false
186         }
187 }