1 use std::collections::hash_map::RandomState;
2 use std::hash::{BuildHasher, Hasher};
3 use std::net::SocketAddr;
6 use std::time::{Duration, Instant};
8 use bitcoin::hashes::hex::ToHex;
9 use bitcoin::secp256k1::PublicKey;
11 use lightning::ln::peer_handler::{
12 ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
14 use lightning::{log_info, log_warn};
15 use lightning::routing::gossip::NetworkGraph;
16 use lightning::sign::KeysManager;
17 use lightning::util::logger::Logger;
18 use tokio::sync::mpsc;
19 use tokio::task::JoinSet;
22 use crate::downloader::GossipRouter;
23 use crate::types::{GossipMessage, GossipPeerManager};
25 pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
26 completion_sender: mpsc::Sender<()>,
27 network_graph: Arc<NetworkGraph<L>>,
29 ) where L::Target: Logger {
30 let mut key = [42; 32];
31 let mut random_data = [43; 32];
32 // Get something psuedo-random from std.
33 let mut key_hasher = RandomState::new().build_hasher();
34 key_hasher.write_u8(1);
35 key[0..8].copy_from_slice(&key_hasher.finish().to_ne_bytes());
36 let mut rand_hasher = RandomState::new().build_hasher();
37 rand_hasher.write_u8(2);
38 random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
40 let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
42 let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
44 let message_handler = MessageHandler {
45 chan_handler: ErroringMessageHandler::new(),
46 route_handler: Arc::clone(&router),
47 onion_message_handler: IgnoringMessageHandler {},
48 custom_message_handler: IgnoringMessageHandler {},
50 let peer_handler = Arc::new(PeerManager::new(
57 router.set_pm(Arc::clone(&peer_handler));
59 let ph_timer = Arc::clone(&peer_handler);
60 tokio::spawn(async move {
61 let mut intvl = tokio::time::interval(Duration::from_secs(10));
64 ph_timer.timer_tick_occurred();
68 log_info!(logger, "Connecting to Lightning peers...");
69 let peers = config::ln_peers();
70 let mut handles = JoinSet::new();
71 let mut connected_peer_count = 0;
73 if peers.len() <= config::CONNECTED_PEER_ASSERTION_LIMIT {
74 log_warn!(logger, "Peer assertion threshold is {}, but only {} peers specified.", config::CONNECTED_PEER_ASSERTION_LIMIT, peers.len());
77 for current_peer in peers {
78 let peer_handler_clone = peer_handler.clone();
79 let logger_clone = logger.clone();
80 handles.spawn(async move {
81 connect_peer(current_peer, peer_handler_clone, logger_clone).await
85 while let Some(connection_result) = handles.join_next().await {
86 if let Ok(connection) = connection_result {
88 connected_peer_count += 1;
89 if connected_peer_count >= config::CONNECTED_PEER_ASSERTION_LIMIT {
96 if connected_peer_count < 1 {
97 panic!("Failed to connect to any peer.");
100 log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count);
102 let mut previous_announcement_count = 0u64;
103 let mut previous_update_count = 0u64;
104 let mut is_caught_up_with_gossip = false;
107 let mut latest_new_gossip_time = Instant::now();
108 let mut needs_to_notify_persister = false;
111 i += 1; // count the background activity
112 let sleep = tokio::time::sleep(Duration::from_secs(5));
116 let counter = router.counter.read().unwrap();
117 let total_message_count = counter.channel_announcements + counter.channel_updates;
118 let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
120 let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
121 // TODO: make new message threshold (20) adjust based on connected peer count
122 is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
123 if new_message_count > 0 {
124 latest_new_gossip_time = Instant::now();
127 // if we either aren't caught up, or just stopped/started being caught up
128 if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
131 "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
135 counter.channel_announcements,
136 counter.channel_announcements_with_mismatched_scripts,
137 counter.channel_updates,
138 counter.channel_updates_without_htlc_max_msats
141 log_info!(logger, "Monitoring for gossip…")
144 if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
145 log_info!(logger, "caught up with gossip!");
146 needs_to_notify_persister = true;
147 } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
148 log_info!(logger, "Received new messages since catching up with gossip!");
151 let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
152 if continuous_caught_up_duration.as_secs() > 600 {
153 log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
156 previous_announcement_count = counter.channel_announcements;
157 previous_update_count = counter.channel_updates;
160 if needs_to_notify_persister {
161 needs_to_notify_persister = false;
162 completion_sender.send(()).await.unwrap();
167 async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>, logger: L) -> bool where L::Target: Logger {
168 // we seek to find out if the first connection attempt was successful
169 let (sender, mut receiver) = mpsc::channel::<bool>(1);
170 tokio::spawn(async move {
171 log_info!(logger, "Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
172 let mut is_first_iteration = true;
174 if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
175 Arc::clone(&peer_manager),
179 log_info!(logger, "Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
180 if is_first_iteration {
181 sender.send(true).await.unwrap();
183 disconnection_future.await;
184 log_warn!(logger, "Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
186 log_warn!(logger, "Failed to connect to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
187 if is_first_iteration {
188 sender.send(false).await.unwrap();
191 is_first_iteration = false;
192 tokio::time::sleep(Duration::from_secs(10)).await;
193 log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
197 let success = receiver.recv().await.unwrap();