1 use std::net::SocketAddr;
2 use std::sync::{Arc, RwLock};
3 use std::time::{Duration, Instant};
5 use bitcoin::hashes::hex::ToHex;
6 use bitcoin::secp256k1::{PublicKey, SecretKey};
9 use lightning::ln::peer_handler::{
10 ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
12 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
13 use rand::{Rng, thread_rng};
14 use tokio::sync::mpsc;
16 use crate::{config, TestLogger};
17 use crate::downloader::{GossipCounter, GossipRouter};
18 use crate::types::{GossipChainAccess, GossipMessage, GossipPeerManager};
19 use crate::verifier::ChainVerifier;
21 pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
22 let mut key = [0; 32];
23 let mut random_data = [0; 32];
24 thread_rng().fill_bytes(&mut key);
25 thread_rng().fill_bytes(&mut random_data);
26 let our_node_secret = SecretKey::from_slice(&key).unwrap();
28 let _arc_chain_access = None::<GossipChainAccess>;
29 let arc_chain_access = Some(Arc::new(ChainVerifier::new()));
30 let ignorer = IgnoringMessageHandler {};
31 let arc_ignorer = Arc::new(ignorer);
33 let errorer = ErroringMessageHandler::new();
34 let arc_errorer = Arc::new(errorer);
36 let logger = TestLogger::new();
37 let arc_logger = Arc::new(logger);
39 let router = P2PGossipSync::new(
40 network_graph.clone(),
42 Arc::clone(&arc_logger),
44 let arc_router = Arc::new(router);
45 let wrapped_router = GossipRouter {
46 native_router: arc_router,
47 counter: RwLock::new(GossipCounter::new()),
48 sender: persistence_sender.clone(),
50 let arc_wrapped_router = Arc::new(wrapped_router);
52 let message_handler = MessageHandler {
53 chan_handler: arc_errorer,
54 route_handler: arc_wrapped_router.clone(),
56 let peer_handler = PeerManager::new(
60 Arc::clone(&arc_logger),
63 let arc_peer_handler = Arc::new(peer_handler);
65 println!("Connecting to Lightning peers…");
66 let peers = config::ln_peers();
67 let mut connected_peer_count = 0;
69 for current_peer in peers {
70 let initial_connection_succeeded = monitor_peer_connection(current_peer, Arc::clone(&arc_peer_handler));
71 if initial_connection_succeeded {
72 connected_peer_count += 1;
76 if connected_peer_count < 1 {
77 panic!("Failed to connect to any peer.");
80 println!("Connected to {} Lightning peers!", connected_peer_count);
82 let local_router = arc_wrapped_router.clone();
83 let local_persistence_sender = persistence_sender.clone();
84 tokio::spawn(async move {
85 let mut previous_announcement_count = 0u64;
86 let mut previous_update_count = 0u64;
87 let mut is_caught_up_with_gossip = false;
90 let mut latest_new_gossip_time = Instant::now();
91 let mut needs_to_notify_persister = false;
94 i += 1; // count the background activity
95 let sleep = tokio::time::sleep(Duration::from_secs(5));
98 let router_clone = Arc::clone(&local_router);
101 let counter = router_clone.counter.read().unwrap();
102 let total_message_count = counter.channel_announcements + counter.channel_updates;
103 let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
105 let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
106 // TODO: make new message threshold (20) adjust based on connected peer count
107 is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
108 if new_message_count > 0 {
109 latest_new_gossip_time = Instant::now();
112 // if we either aren't caught up, or just stopped/started being caught up
113 if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
115 "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
119 counter.channel_announcements,
120 counter.channel_announcements_with_mismatched_scripts,
121 counter.channel_updates,
122 counter.channel_updates_without_htlc_max_msats
125 println!("Monitoring for gossip…")
128 if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
129 println!("caught up with gossip!");
130 needs_to_notify_persister = true;
131 } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
132 println!("Received new messages since catching up with gossip!");
135 let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
136 if continuous_caught_up_duration.as_secs() > 600 {
137 eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
140 previous_announcement_count = counter.channel_announcements;
141 previous_update_count = counter.channel_updates;
144 if needs_to_notify_persister {
145 needs_to_notify_persister = false;
146 local_persistence_sender.send(GossipMessage::InitialSyncComplete).await.unwrap();
152 fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
153 let peer_manager_clone = Arc::clone(&peer_manager);
154 eprintln!("Connecting to peer {}@{}…", current_peer.0.to_hex(), current_peer.1.to_string());
155 let connection = executor::block_on(async move {
156 lightning_net_tokio::connect_outbound(
162 let mut initial_connection_succeeded = false;
163 if let Some(disconnection_future) = connection {
164 eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
165 initial_connection_succeeded = true;
166 let peer_manager_clone = Arc::clone(&peer_manager);
167 tokio::spawn(async move {
168 disconnection_future.await;
169 eprintln!("Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
170 monitor_peer_connection(current_peer.clone(), peer_manager_clone);
173 eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string())
175 initial_connection_succeeded