Allow custom logger types.
authorArik Sosman <git@arik.io>
Wed, 2 Aug 2023 22:41:14 +0000 (15:41 -0700)
committerArik Sosman <git@arik.io>
Wed, 2 Aug 2023 22:41:14 +0000 (15:41 -0700)
src/downloader.rs
src/lib.rs
src/lookup.rs
src/main.rs
src/persistence.rs
src/snapshot.rs
src/tracking.rs
src/types.rs
src/verifier.rs

index ac11ec7091739f62290274c9637af2910b3484e4..0c672e36446c25d05d6dcf48905f67a47feb7d35 100644 (file)
@@ -5,10 +5,10 @@ use lightning::events::{MessageSendEvent, MessageSendEventsProvider};
 use lightning::ln::features::{InitFeatures, NodeFeatures};
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
+use lightning::util::logger::Logger;
 use tokio::sync::mpsc;
 use tokio::sync::mpsc::error::TrySendError;
 
-use crate::TestLogger;
 use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
 use crate::verifier::ChainVerifier;
 
@@ -30,28 +30,28 @@ impl GossipCounter {
        }
 }
 
-pub(crate) struct GossipRouter {
-       native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
+pub(crate) struct GossipRouter<L: Logger + Send + Sync + 'static> {
+       native_router: P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, GossipChainAccess<L>, Arc<L>>,
        pub(crate) counter: RwLock<GossipCounter>,
        sender: mpsc::Sender<GossipMessage>,
-       verifier: Arc<ChainVerifier>,
-       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
+       verifier: Arc<ChainVerifier<L>>,
+       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, GossipChainAccess<L>, Arc<L>>>,
 }
 
-impl GossipRouter {
-       pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
-               let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
+impl<L: Logger + Send + Sync> GossipRouter<L> {
+       pub(crate) fn new(network_graph: Arc<NetworkGraph<Arc<L>>>, sender: mpsc::Sender<GossipMessage>, logger: Arc<L>) -> Self {
+               let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone()));
                let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
                Self {
-                       native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
+                       native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()),
                        outbound_gossiper,
                        counter: RwLock::new(GossipCounter::new()),
                        sender,
-                       verifier,
+                       verifier
                }
        }
 
-       pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
+       pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager<L>) {
                self.verifier.set_ph(peer_handler);
        }
 
@@ -83,7 +83,7 @@ impl GossipRouter {
        }
 }
 
-impl MessageSendEventsProvider for GossipRouter {
+impl<L: Logger + Send + Sync> MessageSendEventsProvider for GossipRouter<L> {
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
                let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
                for ev in gossip_evs {
@@ -102,7 +102,7 @@ impl MessageSendEventsProvider for GossipRouter {
        }
 }
 
-impl RoutingMessageHandler for GossipRouter {
+impl<L: Logger + Send + Sync> RoutingMessageHandler for GossipRouter<L> {
        fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
                self.native_router.handle_node_announcement(msg)
        }
index 37bbe4cb0338170998acef7ae0032c7056f7ea86..363d4aeb9e94ddb2c8a1baddad45027eb2ac7129 100644 (file)
@@ -15,6 +15,7 @@ use std::io::BufReader;
 use std::sync::Arc;
 
 use lightning::routing::gossip::{NetworkGraph, NodeId};
+use lightning::util::logger::Logger;
 use lightning::util::ser::{ReadableArgs, Writeable};
 use tokio::sync::mpsc;
 use crate::lookup::DeltaSet;
@@ -22,10 +23,9 @@ use crate::lookup::DeltaSet;
 use crate::persistence::GossipPersister;
 use crate::serialization::UpdateSerialization;
 use crate::snapshot::Snapshotter;
-use crate::types::TestLogger;
+use crate::types::RGSSLogger;
 
 mod downloader;
-mod types;
 mod tracking;
 mod lookup;
 mod persistence;
@@ -35,14 +35,17 @@ mod config;
 mod hex_utils;
 mod verifier;
 
+pub mod types;
+
 /// The purpose of this prefix is to identify the serialization format, should other rapid gossip
 /// sync formats arise in the future.
 ///
 /// The fourth byte is the protocol version in case our format gets updated.
 const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];
 
-pub struct RapidSyncProcessor {
-       network_graph: Arc<NetworkGraph<TestLogger>>,
+pub struct RapidSyncProcessor<L: Logger> {
+       network_graph: Arc<NetworkGraph<Arc<L>>>,
+       logger: Arc<L>
 }
 
 pub struct SerializedResponse {
@@ -54,27 +57,27 @@ pub struct SerializedResponse {
        pub update_count_incremental: u32,
 }
 
-impl RapidSyncProcessor {
-       pub fn new() -> Self {
+impl<L: Logger + Send + Sync + 'static> RapidSyncProcessor<L> {
+       pub fn new(logger: Arc<L>) -> Self {
                let network = config::network();
-               let logger = TestLogger::new();
                let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
                        println!("Initializing from cached network graph…");
                        let mut buffered_reader = BufReader::new(file);
-                       let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger);
+                       let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger.clone());
                        if let Ok(network_graph) = network_graph_result {
                                println!("Initialized from cached network graph!");
                                network_graph
                        } else {
                                println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
-                               NetworkGraph::new(network, logger)
+                               NetworkGraph::new(network, logger.clone())
                        }
                } else {
-                       NetworkGraph::new(network, logger)
+                       NetworkGraph::new(network, logger.clone())
                };
                let arc_network_graph = Arc::new(network_graph);
                Self {
                        network_graph: arc_network_graph,
+                       logger
                }
        }
 
@@ -87,7 +90,7 @@ impl RapidSyncProcessor {
 
                        println!("Starting gossip download");
                        tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender,
-                               Arc::clone(&self.network_graph)));
+                               Arc::clone(&self.network_graph), Arc::clone(&self.logger)));
                        println!("Starting gossip db persistence listener");
                        tokio::spawn(async move { persister.persist_gossip().await; });
                } else {
@@ -126,7 +129,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
        let chain_hash = genesis_block.block_hash();
        chain_hash.write(&mut blob).unwrap();
 
-       let blob_timestamp = Snapshotter::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
+       let blob_timestamp = Snapshotter::<RGSSLogger>::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
        blob_timestamp.write(&mut blob).unwrap();
 
        0u32.write(&mut blob).unwrap(); // node count
@@ -136,7 +139,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
        blob
 }
 
-async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32) -> SerializedResponse {
+async fn serialize_delta<L: Logger>(network_graph: Arc<NetworkGraph<Arc<L>>>, last_sync_timestamp: u32) -> SerializedResponse {
        let (client, connection) = lookup::connect_to_db().await;
 
        network_graph.remove_stale_channels_and_tracking();
index c554f9f57508bb251d7f664a26cd2ec0eee3ae2d..79fb84e8d599ca4d993fc8ab4bb955cb3cf2648b 100644 (file)
@@ -11,8 +11,9 @@ use tokio_postgres::{Client, Connection, NoTls, Socket};
 use tokio_postgres::tls::NoTlsStream;
 
 use futures::StreamExt;
+use lightning::util::logger::Logger;
 
-use crate::{config, TestLogger};
+use crate::config;
 use crate::serialization::MutatedProperties;
 
 /// The delta set needs to be a BTreeMap so the keys are sorted.
@@ -75,7 +76,7 @@ pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>)
 /// whether they had been seen before.
 /// Also include all announcements for which the first update was announced
 /// after `last_sync_timestamp`
-pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
+pub(super) async fn fetch_channel_announcements<L: Logger>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<Arc<L>>>, client: &Client, last_sync_timestamp: u32) {
        println!("Obtaining channel ids from network graph");
        let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
        let channel_ids = {
index e3468be840d9265884aead3ac521d9ce95e52513..dae4d3efaaaf551c732fa8a6dfad6704dfb08a15 100644 (file)
@@ -1,6 +1,9 @@
+use std::sync::Arc;
 use rapid_gossip_sync_server::RapidSyncProcessor;
+use rapid_gossip_sync_server::types::RGSSLogger;
 
 #[tokio::main]
 async fn main() {
-       RapidSyncProcessor::new().start_sync().await;
+       let logger = Arc::new(RGSSLogger::new());
+       RapidSyncProcessor::new(logger).start_sync().await;
 }
index ac667330c6e0c445d0e7e82f81e2649ba12111db..22abf0216d8575692a1e3e7f9b7d4ce66844df66 100644 (file)
@@ -3,22 +3,23 @@ use std::io::{BufWriter, Write};
 use std::sync::Arc;
 use std::time::{Duration, Instant};
 use lightning::routing::gossip::NetworkGraph;
+use lightning::util::logger::Logger;
 use lightning::util::ser::Writeable;
 use tokio::sync::mpsc;
 use tokio_postgres::NoTls;
 
-use crate::{config, TestLogger};
+use crate::config;
 use crate::types::GossipMessage;
 
 const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15);
 
-pub(crate) struct GossipPersister {
+pub(crate) struct GossipPersister<L: Logger> {
        gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
-       network_graph: Arc<NetworkGraph<TestLogger>>,
+       network_graph: Arc<NetworkGraph<Arc<L>>>,
 }
 
-impl GossipPersister {
-       pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
+impl<L: Logger> GossipPersister<L> {
+       pub fn new(network_graph: Arc<NetworkGraph<Arc<L>>>) -> (Self, mpsc::Sender<GossipMessage>) {
                let (gossip_persistence_sender, gossip_persistence_receiver) =
                        mpsc::channel::<GossipMessage>(100);
                (GossipPersister {
index ac800795bf6de11125818ed49da813320ff3b938..c81cc35327715ea865cd62c24c1723611315c26f 100644 (file)
@@ -5,16 +5,17 @@ use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 
 use lightning::routing::gossip::NetworkGraph;
+use lightning::util::logger::Logger;
 
-use crate::{config, TestLogger};
+use crate::config;
 use crate::config::cache_path;
 
-pub(crate) struct Snapshotter {
-       network_graph: Arc<NetworkGraph<TestLogger>>,
+pub(crate) struct Snapshotter<L: Logger> {
+       network_graph: Arc<NetworkGraph<Arc<L>>>,
 }
 
-impl Snapshotter {
-       pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> Self {
+impl<L: Logger> Snapshotter<L> {
+       pub fn new(network_graph: Arc<NetworkGraph<Arc<L>>>) -> Self {
                Self { network_graph }
        }
 
index 8d2668fcaf08ffde4128fcb6a68bfb8987e26c76..2935eb286780b8228d1d4f6123de370e71b3cbb1 100644 (file)
@@ -12,15 +12,18 @@ use lightning::ln::peer_handler::{
 };
 use lightning::routing::gossip::NetworkGraph;
 use lightning::sign::KeysManager;
+use lightning::util::logger::Logger;
 use tokio::sync::mpsc;
 
-use crate::{config, TestLogger};
+use crate::config;
 use crate::downloader::GossipRouter;
 use crate::types::{GossipMessage, GossipPeerManager};
 
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>,
+pub(crate) async fn download_gossip<L: Logger + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
                completion_sender: mpsc::Sender<()>,
-               network_graph: Arc<NetworkGraph<TestLogger>>) {
+               network_graph: Arc<NetworkGraph<Arc<L>>>,
+               logger: Arc<L>
+) {
        let mut key = [42; 32];
        let mut random_data = [43; 32];
        // Get something psuedo-random from std.
@@ -33,7 +36,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
 
        let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
 
-       let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
+       let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
 
        let message_handler = MessageHandler {
                chan_handler: ErroringMessageHandler::new(),
@@ -45,7 +48,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
                message_handler,
                0xdeadbeef,
                &random_data,
-               TestLogger::new(),
+               logger,
                keys_manager,
        ));
        router.set_pm(Arc::clone(&peer_handler));
@@ -142,7 +145,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
        });
 }
 
-async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
+async fn connect_peer<L: Logger + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>) -> bool {
        eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
        let connection = lightning_net_tokio::connect_outbound(
                Arc::clone(&peer_manager),
index 77a53c477c907fe6c90c48d8bdd1de31b721fd15..b18cbe739180a8389f60add6b9d79b027775fa31 100644 (file)
@@ -1,5 +1,4 @@
 use std::sync::Arc;
-use std::ops::Deref;
 
 use lightning::sign::KeysManager;
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
@@ -9,8 +8,8 @@ use lightning::util::logger::{Logger, Record};
 use crate::downloader::GossipRouter;
 use crate::verifier::ChainVerifier;
 
-pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
-pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc<KeysManager>>>;
+pub(crate) type GossipChainAccess<L> = Arc<ChainVerifier<L>>;
+pub(crate) type GossipPeerManager<L> = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter<L>>, IgnoringMessageHandler, Arc<L>, IgnoringMessageHandler, Arc<KeysManager>>>;
 
 #[derive(Debug)]
 pub(crate) enum GossipMessage {
@@ -19,19 +18,15 @@ pub(crate) enum GossipMessage {
 }
 
 #[derive(Clone, Copy)]
-pub(crate) struct TestLogger {}
-impl Deref for TestLogger {
-       type Target = Self;
-       fn deref(&self) -> &Self { self }
-}
+pub struct RGSSLogger {}
 
-impl TestLogger {
-       pub(crate) fn new() -> TestLogger {
+impl RGSSLogger {
+       pub fn new() -> RGSSLogger {
                Self {}
        }
 }
 
-impl Logger for TestLogger {
+impl Logger for RGSSLogger {
        fn log(&self, record: &Record) {
                // TODO: allow log level threshold to be set
                println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
index 4bda871ad124d85336576f0b2e9b08e3b9c5a92b..a88f2b701257ff998ffdfc09da983421d2f157c1 100644 (file)
@@ -7,25 +7,25 @@ use bitcoin::blockdata::block::Block;
 use bitcoin::hashes::Hash;
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError};
+use lightning::util::logger::Logger;
 use lightning_block_sync::{BlockData, BlockSource};
 use lightning_block_sync::http::BinaryResponse;
 use lightning_block_sync::rest::RestClient;
 
 use crate::config;
-use crate::TestLogger;
 use crate::types::GossipPeerManager;
 
-pub(crate) struct ChainVerifier {
+pub(crate) struct ChainVerifier<L: Logger + Send + Sync + 'static> {
        rest_client: Arc<RestClient>,
-       graph: Arc<NetworkGraph<TestLogger>>,
-       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>,
-       peer_handler: Mutex<Option<GossipPeerManager>>,
+       graph: Arc<NetworkGraph<Arc<L>>>,
+       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<Self>, Arc<L>>>,
+       peer_handler: Mutex<Option<GossipPeerManager<L>>>,
 }
 
 struct RestBinaryResponse(Vec<u8>);
 
-impl ChainVerifier {
-       pub(crate) fn new(graph: Arc<NetworkGraph<TestLogger>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>) -> Self {
+impl<L: Logger + Send + Sync + 'static> ChainVerifier<L> {
+       pub(crate) fn new(graph: Arc<NetworkGraph<Arc<L>>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<Self>, Arc<L>>>) -> Self {
                ChainVerifier {
                        rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()),
                        outbound_gossiper,
@@ -33,7 +33,7 @@ impl ChainVerifier {
                        peer_handler: Mutex::new(None),
                }
        }
-       pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) {
+       pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager<L>) {
                *self.peer_handler.lock().unwrap() = Some(peer_handler);
        }
 
@@ -73,7 +73,7 @@ impl ChainVerifier {
        }
 }
 
-impl UtxoLookup for ChainVerifier {
+impl<L: Logger + Send + Sync + 'static> UtxoLookup for ChainVerifier<L> {
        fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult {
                let res = UtxoFuture::new();
                let fut = res.clone();