use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
-use crate::{GossipChainAccess, TestLogger};
-use crate::types::GossipMessage;
+use crate::TestLogger;
+use crate::types::{GossipMessage, GossipChainAccess};
+use crate::verifier::ChainVerifier;
pub(crate) struct GossipCounter {
pub(crate) channel_announcements: u64,
}
pub(crate) struct GossipRouter {
- pub(crate) native_router: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<TestLogger>>>, GossipChainAccess, Arc<TestLogger>>>,
+ native_router: P2PGossipSync<Arc<NetworkGraph<Arc<TestLogger>>>, GossipChainAccess, Arc<TestLogger>>,
pub(crate) counter: RwLock<GossipCounter>,
- pub(crate) sender: mpsc::Sender<GossipMessage>,
+ sender: mpsc::Sender<GossipMessage>,
+}
+
+impl GossipRouter {
+ pub(crate) fn new(network_graph: Arc<NetworkGraph<Arc<TestLogger>>>, sender: mpsc::Sender<GossipMessage>) -> Self {
+ Self {
+ native_router: P2PGossipSync::new(network_graph, Some(Arc::new(ChainVerifier::new())),
+ Arc::new(TestLogger::new())),
+ counter: RwLock::new(GossipCounter::new()),
+ sender
+ }
+ }
}
impl MessageSendEventsProvider for GossipRouter {
use std::net::SocketAddr;
-use std::sync::{Arc, RwLock};
+use std::sync::Arc;
use std::time::{Duration, Instant};
use bitcoin::hashes::hex::ToHex;
use lightning::ln::peer_handler::{
ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
};
-use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::gossip::NetworkGraph;
use rand::{Rng, thread_rng};
use tokio::sync::mpsc;
use crate::{config, TestLogger};
-use crate::downloader::{GossipCounter, GossipRouter};
-use crate::types::{GossipChainAccess, GossipMessage, GossipPeerManager};
-use crate::verifier::ChainVerifier;
+use crate::downloader::GossipRouter;
+use crate::types::{GossipMessage, GossipPeerManager};
pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
let mut key = [0; 32];
thread_rng().fill_bytes(&mut random_data);
let our_node_secret = SecretKey::from_slice(&key).unwrap();
- let _arc_chain_access = None::<GossipChainAccess>;
- let arc_chain_access = Some(Arc::new(ChainVerifier::new()));
- let ignorer = IgnoringMessageHandler {};
- let arc_ignorer = Arc::new(ignorer);
-
- let errorer = ErroringMessageHandler::new();
- let arc_errorer = Arc::new(errorer);
-
- let logger = TestLogger::new();
- let arc_logger = Arc::new(logger);
-
- let router = P2PGossipSync::new(
- network_graph.clone(),
- arc_chain_access,
- Arc::clone(&arc_logger),
- );
- let arc_router = Arc::new(router);
- let wrapped_router = GossipRouter {
- native_router: arc_router,
- counter: RwLock::new(GossipCounter::new()),
- sender: persistence_sender.clone(),
- };
- let arc_wrapped_router = Arc::new(wrapped_router);
+ let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
let message_handler = MessageHandler {
- chan_handler: arc_errorer,
- route_handler: arc_wrapped_router.clone(),
+ chan_handler: ErroringMessageHandler::new(),
+ route_handler: Arc::clone(&router),
};
- let peer_handler = PeerManager::new(
+ let peer_handler = Arc::new(PeerManager::new(
message_handler,
our_node_secret,
&random_data,
- Arc::clone(&arc_logger),
- arc_ignorer,
- );
- let arc_peer_handler = Arc::new(peer_handler);
+ Arc::new(TestLogger::new()),
+ IgnoringMessageHandler {},
+ ));
- println!("Connecting to Lightning peers…");
+ println!("Connecting to Lightning peers...");
let peers = config::ln_peers();
let mut connected_peer_count = 0;
for current_peer in peers {
- let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&arc_peer_handler)).await;
+ let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&peer_handler)).await;
if initial_connection_succeeded {
connected_peer_count += 1;
}
println!("Connected to {} Lightning peers!", connected_peer_count);
- let local_router = arc_wrapped_router.clone();
- let local_persistence_sender = persistence_sender.clone();
tokio::spawn(async move {
let mut previous_announcement_count = 0u64;
let mut previous_update_count = 0u64;
let sleep = tokio::time::sleep(Duration::from_secs(5));
sleep.await;
- let router_clone = Arc::clone(&local_router);
-
{
- let counter = router_clone.counter.read().unwrap();
+ let counter = router.counter.read().unwrap();
let total_message_count = counter.channel_announcements + counter.channel_updates;
let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
if needs_to_notify_persister {
needs_to_notify_persister = false;
- local_persistence_sender.send(GossipMessage::InitialSyncComplete).await.unwrap();
+ persistence_sender.send(GossipMessage::InitialSyncComplete).await.unwrap();
}
}
});