From 2cf9129a187a66cfed10f9583c14fc8ee7339a18 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 2 Aug 2023 15:41:14 -0700 Subject: [PATCH] Allow custom logger types. --- src/downloader.rs | 26 +++++++++++++------------- src/lib.rs | 29 ++++++++++++++++------------- src/lookup.rs | 5 +++-- src/main.rs | 5 ++++- src/persistence.rs | 11 ++++++----- src/snapshot.rs | 11 ++++++----- src/tracking.rs | 15 +++++++++------ src/types.rs | 17 ++++++----------- src/verifier.rs | 18 +++++++++--------- 9 files changed, 72 insertions(+), 65 deletions(-) diff --git a/src/downloader.rs b/src/downloader.rs index ac11ec7..0c672e3 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -5,10 +5,10 @@ use lightning::events::{MessageSendEvent, MessageSendEventsProvider}; use lightning::ln::features::{InitFeatures, NodeFeatures}; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; +use lightning::util::logger::Logger; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use crate::TestLogger; use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager}; use crate::verifier::ChainVerifier; @@ -30,28 +30,28 @@ impl GossipCounter { } } -pub(crate) struct GossipRouter { - native_router: P2PGossipSync>, GossipChainAccess, TestLogger>, +pub(crate) struct GossipRouter { + native_router: P2PGossipSync>>, GossipChainAccess, Arc>, pub(crate) counter: RwLock, sender: mpsc::Sender, - verifier: Arc, - outbound_gossiper: Arc>, GossipChainAccess, TestLogger>>, + verifier: Arc>, + outbound_gossiper: Arc>>, GossipChainAccess, Arc>>, } -impl GossipRouter { - pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender) -> Self { - let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new())); +impl GossipRouter { + pub(crate) fn new(network_graph: Arc>>, sender: mpsc::Sender, logger: Arc) -> Self { + let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone())); let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper))); Self { - native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()), + native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()), outbound_gossiper, counter: RwLock::new(GossipCounter::new()), sender, - verifier, + verifier } } - pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) { + pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) { self.verifier.set_ph(peer_handler); } @@ -83,7 +83,7 @@ impl GossipRouter { } } -impl MessageSendEventsProvider for GossipRouter { +impl MessageSendEventsProvider for GossipRouter { fn get_and_clear_pending_msg_events(&self) -> Vec { let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events(); for ev in gossip_evs { @@ -102,7 +102,7 @@ impl MessageSendEventsProvider for GossipRouter { } } -impl RoutingMessageHandler for GossipRouter { +impl RoutingMessageHandler for GossipRouter { fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { self.native_router.handle_node_announcement(msg) } diff --git a/src/lib.rs b/src/lib.rs index 37bbe4c..363d4ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ use std::io::BufReader; use std::sync::Arc; use lightning::routing::gossip::{NetworkGraph, NodeId}; +use lightning::util::logger::Logger; use lightning::util::ser::{ReadableArgs, Writeable}; use tokio::sync::mpsc; use crate::lookup::DeltaSet; @@ -22,10 +23,9 @@ use crate::lookup::DeltaSet; use crate::persistence::GossipPersister; use crate::serialization::UpdateSerialization; use crate::snapshot::Snapshotter; -use crate::types::TestLogger; +use crate::types::RGSSLogger; mod downloader; -mod types; mod tracking; mod lookup; mod persistence; @@ -35,14 +35,17 @@ mod config; mod hex_utils; mod verifier; +pub mod types; + /// The purpose of this prefix is to identify the serialization format, should other rapid gossip /// sync formats arise in the future. /// /// The fourth byte is the protocol version in case our format gets updated. const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1]; -pub struct RapidSyncProcessor { - network_graph: Arc>, +pub struct RapidSyncProcessor { + network_graph: Arc>>, + logger: Arc } pub struct SerializedResponse { @@ -54,27 +57,27 @@ pub struct SerializedResponse { pub update_count_incremental: u32, } -impl RapidSyncProcessor { - pub fn new() -> Self { +impl RapidSyncProcessor { + pub fn new(logger: Arc) -> Self { let network = config::network(); - let logger = TestLogger::new(); let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) { println!("Initializing from cached network graph…"); let mut buffered_reader = BufReader::new(file); - let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger); + let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger.clone()); if let Ok(network_graph) = network_graph_result { println!("Initialized from cached network graph!"); network_graph } else { println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap()); - NetworkGraph::new(network, logger) + NetworkGraph::new(network, logger.clone()) } } else { - NetworkGraph::new(network, logger) + NetworkGraph::new(network, logger.clone()) }; let arc_network_graph = Arc::new(network_graph); Self { network_graph: arc_network_graph, + logger } } @@ -87,7 +90,7 @@ impl RapidSyncProcessor { println!("Starting gossip download"); tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender, - Arc::clone(&self.network_graph))); + Arc::clone(&self.network_graph), Arc::clone(&self.logger))); println!("Starting gossip db persistence listener"); tokio::spawn(async move { persister.persist_gossip().await; }); } else { @@ -126,7 +129,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { let chain_hash = genesis_block.block_hash(); chain_hash.write(&mut blob).unwrap(); - let blob_timestamp = Snapshotter::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32; + let blob_timestamp = Snapshotter::::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32; blob_timestamp.write(&mut blob).unwrap(); 0u32.write(&mut blob).unwrap(); // node count @@ -136,7 +139,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { blob } -async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32) -> SerializedResponse { +async fn serialize_delta(network_graph: Arc>>, last_sync_timestamp: u32) -> SerializedResponse { let (client, connection) = lookup::connect_to_db().await; network_graph.remove_stale_channels_and_tracking(); diff --git a/src/lookup.rs b/src/lookup.rs index c554f9f..79fb84e 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -11,8 +11,9 @@ use tokio_postgres::{Client, Connection, NoTls, Socket}; use tokio_postgres::tls::NoTlsStream; use futures::StreamExt; +use lightning::util::logger::Logger; -use crate::{config, TestLogger}; +use crate::config; use crate::serialization::MutatedProperties; /// The delta set needs to be a BTreeMap so the keys are sorted. @@ -75,7 +76,7 @@ pub(super) async fn connect_to_db() -> (Client, Connection) /// whether they had been seen before. /// Also include all announcements for which the first update was announced /// after `last_sync_timestamp` -pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32) { +pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>>, client: &Client, last_sync_timestamp: u32) { println!("Obtaining channel ids from network graph"); let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); let channel_ids = { diff --git a/src/main.rs b/src/main.rs index e3468be..dae4d3e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; use rapid_gossip_sync_server::RapidSyncProcessor; +use rapid_gossip_sync_server::types::RGSSLogger; #[tokio::main] async fn main() { - RapidSyncProcessor::new().start_sync().await; + let logger = Arc::new(RGSSLogger::new()); + RapidSyncProcessor::new(logger).start_sync().await; } diff --git a/src/persistence.rs b/src/persistence.rs index ac66733..22abf02 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -3,22 +3,23 @@ use std::io::{BufWriter, Write}; use std::sync::Arc; use std::time::{Duration, Instant}; use lightning::routing::gossip::NetworkGraph; +use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use tokio::sync::mpsc; use tokio_postgres::NoTls; -use crate::{config, TestLogger}; +use crate::config; use crate::types::GossipMessage; const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15); -pub(crate) struct GossipPersister { +pub(crate) struct GossipPersister { gossip_persistence_receiver: mpsc::Receiver, - network_graph: Arc>, + network_graph: Arc>>, } -impl GossipPersister { - pub fn new(network_graph: Arc>) -> (Self, mpsc::Sender) { +impl GossipPersister { + pub fn new(network_graph: Arc>>) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); (GossipPersister { diff --git a/src/snapshot.rs b/src/snapshot.rs index ac80079..c81cc35 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -5,16 +5,17 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use lightning::routing::gossip::NetworkGraph; +use lightning::util::logger::Logger; -use crate::{config, TestLogger}; +use crate::config; use crate::config::cache_path; -pub(crate) struct Snapshotter { - network_graph: Arc>, +pub(crate) struct Snapshotter { + network_graph: Arc>>, } -impl Snapshotter { - pub fn new(network_graph: Arc>) -> Self { +impl Snapshotter { + pub fn new(network_graph: Arc>>) -> Self { Self { network_graph } } diff --git a/src/tracking.rs b/src/tracking.rs index 8d2668f..2935eb2 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -12,15 +12,18 @@ use lightning::ln::peer_handler::{ }; use lightning::routing::gossip::NetworkGraph; use lightning::sign::KeysManager; +use lightning::util::logger::Logger; use tokio::sync::mpsc; -use crate::{config, TestLogger}; +use crate::config; use crate::downloader::GossipRouter; use crate::types::{GossipMessage, GossipPeerManager}; -pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, +pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, completion_sender: mpsc::Sender<()>, - network_graph: Arc>) { + network_graph: Arc>>, + logger: Arc +) { let mut key = [42; 32]; let mut random_data = [43; 32]; // Get something psuedo-random from std. @@ -33,7 +36,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender bool { +async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool { eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); let connection = lightning_net_tokio::connect_outbound( Arc::clone(&peer_manager), diff --git a/src/types.rs b/src/types.rs index 77a53c4..b18cbe7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::ops::Deref; use lightning::sign::KeysManager; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate}; @@ -9,8 +8,8 @@ use lightning::util::logger::{Logger, Record}; use crate::downloader::GossipRouter; use crate::verifier::ChainVerifier; -pub(crate) type GossipChainAccess = Arc; -pub(crate) type GossipPeerManager = Arc, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc>>; +pub(crate) type GossipChainAccess = Arc>; +pub(crate) type GossipPeerManager = Arc>, IgnoringMessageHandler, Arc, IgnoringMessageHandler, Arc>>; #[derive(Debug)] pub(crate) enum GossipMessage { @@ -19,19 +18,15 @@ pub(crate) enum GossipMessage { } #[derive(Clone, Copy)] -pub(crate) struct TestLogger {} -impl Deref for TestLogger { - type Target = Self; - fn deref(&self) -> &Self { self } -} +pub struct RGSSLogger {} -impl TestLogger { - pub(crate) fn new() -> TestLogger { +impl RGSSLogger { + pub fn new() -> RGSSLogger { Self {} } } -impl Logger for TestLogger { +impl Logger for RGSSLogger { fn log(&self, record: &Record) { // TODO: allow log level threshold to be set println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args); diff --git a/src/verifier.rs b/src/verifier.rs index 4bda871..a88f2b7 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -7,25 +7,25 @@ use bitcoin::blockdata::block::Block; use bitcoin::hashes::Hash; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError}; +use lightning::util::logger::Logger; use lightning_block_sync::{BlockData, BlockSource}; use lightning_block_sync::http::BinaryResponse; use lightning_block_sync::rest::RestClient; use crate::config; -use crate::TestLogger; use crate::types::GossipPeerManager; -pub(crate) struct ChainVerifier { +pub(crate) struct ChainVerifier { rest_client: Arc, - graph: Arc>, - outbound_gossiper: Arc>, Arc, TestLogger>>, - peer_handler: Mutex>, + graph: Arc>>, + outbound_gossiper: Arc>>, Arc, Arc>>, + peer_handler: Mutex>>, } struct RestBinaryResponse(Vec); -impl ChainVerifier { - pub(crate) fn new(graph: Arc>, outbound_gossiper: Arc>, Arc, TestLogger>>) -> Self { +impl ChainVerifier { + pub(crate) fn new(graph: Arc>>, outbound_gossiper: Arc>>, Arc, Arc>>) -> Self { ChainVerifier { rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()), outbound_gossiper, @@ -33,7 +33,7 @@ impl ChainVerifier { peer_handler: Mutex::new(None), } } - pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) { + pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) { *self.peer_handler.lock().unwrap() = Some(peer_handler); } @@ -73,7 +73,7 @@ impl ChainVerifier { } } -impl UtxoLookup for ChainVerifier { +impl UtxoLookup for ChainVerifier { fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult { let res = UtxoFuture::new(); let fut = res.clone(); -- 2.39.5