Merge pull request #51 from arik-so/2023/08/pre_test_refactors
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Sun, 13 Aug 2023 05:30:42 +0000 (05:30 +0000)
committerGitHub <noreply@github.com>
Sun, 13 Aug 2023 05:30:42 +0000 (05:30 +0000)
Allow arbitrary logger types.

Cargo.toml
src/config.rs
src/downloader.rs
src/lib.rs
src/lookup.rs
src/main.rs
src/persistence.rs
src/snapshot.rs
src/tracking.rs
src/types.rs
src/verifier.rs

index ec3f3a673b454c976d964127a3912f47c1190bc9..99aeed72ffc662cf01630edb870ae5fef79c530b 100644 (file)
@@ -5,9 +5,9 @@ edition = "2021"
 
 [dependencies]
 bitcoin = "0.29"
-lightning = { version = "0.0.116-alpha1" }
-lightning-block-sync = { version = "0.0.116-alpha1", features=["rest-client"] }
-lightning-net-tokio = { version = "0.0.116-alpha1" }
+lightning = { version = "0.0.116" }
+lightning-block-sync = { version = "0.0.116", features=["rest-client"] }
+lightning-net-tokio = { version = "0.0.116" }
 tokio = { version = "1.25", features = ["full"] }
 tokio-postgres = { version="=0.7.5" }
 futures = "0.3"
index b0e3a7ce7722dafb236001df3e6bb46a1f6a60f7..804b0f8ff8a66262b7273702239b3ac645d24fae 100644 (file)
@@ -35,6 +35,19 @@ pub(crate) fn network() -> Network {
        }
 }
 
+pub(crate) fn log_level() -> lightning::util::logger::Level {
+       let level = env::var("RAPID_GOSSIP_SYNC_SERVER_LOG_LEVEL").unwrap_or("info".to_string()).to_lowercase();
+       match level.as_str() {
+               "gossip" => lightning::util::logger::Level::Gossip,
+               "trace" => lightning::util::logger::Level::Trace,
+               "debug" => lightning::util::logger::Level::Debug,
+               "info" => lightning::util::logger::Level::Info,
+               "warn" => lightning::util::logger::Level::Warn,
+               "error" => lightning::util::logger::Level::Error,
+               _ => panic!("Invalid log level"),
+       }
+}
+
 pub(crate) fn network_graph_cache_path() -> String {
        format!("{}/network_graph.bin", cache_path())
 }
index ac11ec7091739f62290274c9637af2910b3484e4..8d5cdb0005e7d8e3ea74f5036ee6c20220a9729e 100644 (file)
@@ -1,3 +1,4 @@
+use std::ops::Deref;
 use std::sync::{Arc, RwLock};
 
 use bitcoin::secp256k1::PublicKey;
@@ -5,10 +6,10 @@ use lightning::events::{MessageSendEvent, MessageSendEventsProvider};
 use lightning::ln::features::{InitFeatures, NodeFeatures};
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
+use lightning::util::logger::Logger;
 use tokio::sync::mpsc;
 use tokio::sync::mpsc::error::TrySendError;
 
-use crate::TestLogger;
 use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
 use crate::verifier::ChainVerifier;
 
@@ -30,28 +31,28 @@ impl GossipCounter {
        }
 }
 
-pub(crate) struct GossipRouter {
-       native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
+pub(crate) struct GossipRouter<L: Deref + Clone + Send + Sync + 'static> where L::Target: Logger {
+       native_router: P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>,
        pub(crate) counter: RwLock<GossipCounter>,
        sender: mpsc::Sender<GossipMessage>,
-       verifier: Arc<ChainVerifier>,
-       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
+       verifier: Arc<ChainVerifier<L>>,
+       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>>,
 }
 
-impl GossipRouter {
-       pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
-               let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
-               let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
+impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
+       pub(crate) fn new(network_graph: Arc<NetworkGraph<L>>, sender: mpsc::Sender<GossipMessage>, logger: L) -> Self {
+               let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone()));
+               let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone()));
                Self {
-                       native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
+                       native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()),
                        outbound_gossiper,
                        counter: RwLock::new(GossipCounter::new()),
                        sender,
-                       verifier,
+                       verifier
                }
        }
 
-       pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
+       pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager<L>) {
                self.verifier.set_ph(peer_handler);
        }
 
@@ -83,7 +84,7 @@ impl GossipRouter {
        }
 }
 
-impl MessageSendEventsProvider for GossipRouter {
+impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<L> where L::Target: Logger {
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
                let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
                for ev in gossip_evs {
@@ -102,7 +103,7 @@ impl MessageSendEventsProvider for GossipRouter {
        }
 }
 
-impl RoutingMessageHandler for GossipRouter {
+impl<L: Deref + Clone + Send + Sync> RoutingMessageHandler for GossipRouter<L> where L::Target: Logger {
        fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
                self.native_router.handle_node_announcement(msg)
        }
index 37bbe4cb0338170998acef7ae0032c7056f7ea86..b6281bcb6836b58f7ecff75664ba6075bde8df13 100644 (file)
@@ -12,9 +12,12 @@ extern crate core;
 use std::collections::{HashMap, HashSet};
 use std::fs::File;
 use std::io::BufReader;
+use std::ops::Deref;
 use std::sync::Arc;
+use lightning::log_info;
 
 use lightning::routing::gossip::{NetworkGraph, NodeId};
+use lightning::util::logger::Logger;
 use lightning::util::ser::{ReadableArgs, Writeable};
 use tokio::sync::mpsc;
 use crate::lookup::DeltaSet;
@@ -22,10 +25,9 @@ use crate::lookup::DeltaSet;
 use crate::persistence::GossipPersister;
 use crate::serialization::UpdateSerialization;
 use crate::snapshot::Snapshotter;
-use crate::types::TestLogger;
+use crate::types::RGSSLogger;
 
 mod downloader;
-mod types;
 mod tracking;
 mod lookup;
 mod persistence;
@@ -35,14 +37,17 @@ mod config;
 mod hex_utils;
 mod verifier;
 
+pub mod types;
+
 /// The purpose of this prefix is to identify the serialization format, should other rapid gossip
 /// sync formats arise in the future.
 ///
 /// The fourth byte is the protocol version in case our format gets updated.
 const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];
 
-pub struct RapidSyncProcessor {
-       network_graph: Arc<NetworkGraph<TestLogger>>,
+pub struct RapidSyncProcessor<L: Deref> where L::Target: Logger {
+       network_graph: Arc<NetworkGraph<L>>,
+       logger: L
 }
 
 pub struct SerializedResponse {
@@ -54,27 +59,27 @@ pub struct SerializedResponse {
        pub update_count_incremental: u32,
 }
 
-impl RapidSyncProcessor {
-       pub fn new() -> Self {
+impl<L: Deref + Clone + Send + Sync + 'static> RapidSyncProcessor<L> where L::Target: Logger {
+       pub fn new(logger: L) -> Self {
                let network = config::network();
-               let logger = TestLogger::new();
                let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
-                       println!("Initializing from cached network graph…");
+                       log_info!(logger, "Initializing from cached network graph…");
                        let mut buffered_reader = BufReader::new(file);
-                       let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger);
+                       let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger.clone());
                        if let Ok(network_graph) = network_graph_result {
-                               println!("Initialized from cached network graph!");
+                               log_info!(logger, "Initialized from cached network graph!");
                                network_graph
                        } else {
-                               println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
-                               NetworkGraph::new(network, logger)
+                               log_info!(logger, "Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
+                               NetworkGraph::new(network, logger.clone())
                        }
                } else {
-                       NetworkGraph::new(network, logger)
+                       NetworkGraph::new(network, logger.clone())
                };
                let arc_network_graph = Arc::new(network_graph);
                Self {
                        network_graph: arc_network_graph,
+                       logger
                }
        }
 
@@ -83,12 +88,12 @@ impl RapidSyncProcessor {
                let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1);
 
                if config::DOWNLOAD_NEW_GOSSIP {
-                       let (mut persister, persistence_sender) = GossipPersister::new(Arc::clone(&self.network_graph));
+                       let (mut persister, persistence_sender) = GossipPersister::new(self.network_graph.clone(), self.logger.clone());
 
-                       println!("Starting gossip download");
+                       log_info!(self.logger, "Starting gossip download");
                        tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender,
-                               Arc::clone(&self.network_graph)));
-                       println!("Starting gossip db persistence listener");
+                               Arc::clone(&self.network_graph), self.logger.clone()));
+                       log_info!(self.logger, "Starting gossip db persistence listener");
                        tokio::spawn(async move { persister.persist_gossip().await; });
                } else {
                        sync_completion_sender.send(()).await.unwrap();
@@ -98,10 +103,10 @@ impl RapidSyncProcessor {
                if sync_completion.is_none() {
                        panic!("Sync failed!");
                }
-               println!("Initial sync complete!");
+               log_info!(self.logger, "Initial sync complete!");
 
                // start the gossip snapshotting service
-               Snapshotter::new(Arc::clone(&self.network_graph)).snapshot_gossip().await;
+               Snapshotter::new(Arc::clone(&self.network_graph), self.logger.clone()).snapshot_gossip().await;
        }
 }
 
@@ -126,7 +131,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
        let chain_hash = genesis_block.block_hash();
        chain_hash.write(&mut blob).unwrap();
 
-       let blob_timestamp = Snapshotter::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
+       let blob_timestamp = Snapshotter::<Arc<RGSSLogger>>::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
        blob_timestamp.write(&mut blob).unwrap();
 
        0u32.write(&mut blob).unwrap(); // node count
@@ -136,7 +141,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
        blob
 }
 
-async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32) -> SerializedResponse {
+async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, last_sync_timestamp: u32, logger: L) -> SerializedResponse where L::Target: Logger {
        let (client, connection) = lookup::connect_to_db().await;
 
        network_graph.remove_stale_channels_and_tracking();
@@ -170,12 +175,12 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync
        };
 
        let mut delta_set = DeltaSet::new();
-       lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await;
-       println!("announcement channel count: {}", delta_set.len());
-       lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp).await;
-       println!("update-fetched channel count: {}", delta_set.len());
-       lookup::filter_delta_set(&mut delta_set);
-       println!("update-filtered channel count: {}", delta_set.len());
+       lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, logger.clone()).await;
+       log_info!(logger, "announcement channel count: {}", delta_set.len());
+       lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
+       log_info!(logger, "update-fetched channel count: {}", delta_set.len());
+       lookup::filter_delta_set(&mut delta_set, logger.clone());
+       log_info!(logger, "update-filtered channel count: {}", delta_set.len());
        let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp);
 
        // process announcements
@@ -246,8 +251,8 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync
 
        prefixed_output.append(&mut output);
 
-       println!("duplicated node ids: {}", duplicate_node_ids);
-       println!("latest seen timestamp: {:?}", serialization_details.latest_seen);
+       log_info!(logger, "duplicated node ids: {}", duplicate_node_ids);
+       log_info!(logger, "latest seen timestamp: {:?}", serialization_details.latest_seen);
 
        SerializedResponse {
                data: prefixed_output,
index c554f9f57508bb251d7f664a26cd2ec0eee3ae2d..1c6b4186cae203e8394becc4541af83814d4eafb 100644 (file)
@@ -1,6 +1,6 @@
 use std::collections::{BTreeMap, HashSet};
 use std::io::Cursor;
-use std::ops::Add;
+use std::ops::{Add, Deref};
 use std::sync::Arc;
 use std::time::{Duration, Instant, SystemTime};
 
@@ -11,8 +11,10 @@ use tokio_postgres::{Client, Connection, NoTls, Socket};
 use tokio_postgres::tls::NoTlsStream;
 
 use futures::StreamExt;
+use lightning::log_info;
+use lightning::util::logger::Logger;
 
-use crate::{config, TestLogger};
+use crate::config;
 use crate::serialization::MutatedProperties;
 
 /// The delta set needs to be a BTreeMap so the keys are sorted.
@@ -75,12 +77,12 @@ pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>)
 /// whether they had been seen before.
 /// Also include all announcements for which the first update was announced
 /// after `last_sync_timestamp`
-pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
-       println!("Obtaining channel ids from network graph");
+pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
+       log_info!(logger, "Obtaining channel ids from network graph");
        let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
        let channel_ids = {
                let read_only_graph = network_graph.read_only();
-               println!("Retrieved read-only network graph copy");
+               log_info!(logger, "Retrieved read-only network graph copy");
                let channel_iterator = read_only_graph.channels().unordered_iter();
                channel_iterator
                        .filter(|c| c.1.announcement_message.is_some())
@@ -88,7 +90,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
                        .collect::<Vec<_>>()
        };
 
-       println!("Obtaining corresponding database entries");
+       log_info!(logger, "Obtaining corresponding database entries");
        // get all the channel announcements that are currently in the network graph
        let announcement_rows = client.query_raw("SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
        let mut pinned_rows = Box::pin(announcement_rows);
@@ -113,7 +115,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
        {
                // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
 
-               println!("Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
+               log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
                // Steps:
                // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
                // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
@@ -155,7 +157,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
        {
                // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
 
-               println!("Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
+               log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
                // Steps:
                // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
                // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
@@ -213,7 +215,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
        }
 }
 
-pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32) {
+pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
        let start = Instant::now();
        let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
 
@@ -235,7 +237,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
                ", [last_sync_timestamp_object]).await.unwrap();
        let mut pinned_rows = Box::pin(reference_rows);
 
-       println!("Fetched reference rows in {:?}", start.elapsed());
+       log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
 
        let mut last_seen_update_ids: Vec<i32> = Vec::new();
        let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
@@ -263,7 +265,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
                reference_row_count += 1;
        }
 
-       println!("Processed {} reference rows (delta size: {}) in {:?}",
+       log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
                reference_row_count, delta_set.len(), start.elapsed());
 
        // get all the intermediate channel updates
@@ -276,7 +278,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
                WHERE seen >= $1
                ", [last_sync_timestamp_object]).await.unwrap();
        let mut pinned_updates = Box::pin(intermediate_updates);
-       println!("Fetched intermediate rows in {:?}", start.elapsed());
+       log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
 
        let mut previous_scid = u64::MAX;
        let mut previously_seen_directions = (false, false);
@@ -351,10 +353,10 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
                        }
                }
        }
-       println!("Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
+       log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
 }
 
-pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) {
+pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
        let original_length = delta_set.len();
        let keys: Vec<u64> = delta_set.keys().cloned().collect();
        for k in keys {
@@ -386,6 +388,6 @@ pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) {
 
        let new_length = delta_set.len();
        if original_length != new_length {
-               println!("length modified!");
+               log_info!(logger, "length modified!");
        }
 }
index e3468be840d9265884aead3ac521d9ce95e52513..dae4d3efaaaf551c732fa8a6dfad6704dfb08a15 100644 (file)
@@ -1,6 +1,9 @@
+use std::sync::Arc;
 use rapid_gossip_sync_server::RapidSyncProcessor;
+use rapid_gossip_sync_server::types::RGSSLogger;
 
 #[tokio::main]
 async fn main() {
-       RapidSyncProcessor::new().start_sync().await;
+       let logger = Arc::new(RGSSLogger::new());
+       RapidSyncProcessor::new(logger).start_sync().await;
 }
index ac667330c6e0c445d0e7e82f81e2649ba12111db..f638894dd3590c9d0c583452c7e03aba8275cb15 100644 (file)
@@ -1,29 +1,34 @@
 use std::fs::OpenOptions;
 use std::io::{BufWriter, Write};
+use std::ops::Deref;
 use std::sync::Arc;
 use std::time::{Duration, Instant};
+use lightning::log_info;
 use lightning::routing::gossip::NetworkGraph;
+use lightning::util::logger::Logger;
 use lightning::util::ser::Writeable;
 use tokio::sync::mpsc;
 use tokio_postgres::NoTls;
 
-use crate::{config, TestLogger};
+use crate::config;
 use crate::types::GossipMessage;
 
 const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15);
 
-pub(crate) struct GossipPersister {
+pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
        gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
-       network_graph: Arc<NetworkGraph<TestLogger>>,
+       network_graph: Arc<NetworkGraph<L>>,
+       logger: L
 }
 
-impl GossipPersister {
-       pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
+impl<L: Deref> GossipPersister<L> where L::Target: Logger {
+       pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
                let (gossip_persistence_sender, gossip_persistence_receiver) =
                        mpsc::channel::<GossipMessage>(100);
                (GossipPersister {
                        gossip_persistence_receiver,
-                       network_graph
+                       network_graph,
+                       logger
                }, gossip_persistence_sender)
        }
 
@@ -99,7 +104,7 @@ impl GossipPersister {
                        i += 1; // count the persisted gossip messages
 
                        if latest_persistence_log.elapsed().as_secs() >= 60 {
-                               println!("Persisting gossip message #{}", i);
+                               log_info!(self.logger, "Persisting gossip message #{}", i);
                                latest_persistence_log = Instant::now();
                        }
 
@@ -177,7 +182,7 @@ impl GossipPersister {
        }
 
        fn persist_network_graph(&self) {
-               println!("Caching network graph…");
+               log_info!(self.logger, "Caching network graph…");
                let cache_path = config::network_graph_cache_path();
                let file = OpenOptions::new()
                        .create(true)
@@ -189,6 +194,6 @@ impl GossipPersister {
                let mut writer = BufWriter::new(file);
                self.network_graph.write(&mut writer).unwrap();
                writer.flush().unwrap();
-               println!("Cached network graph!");
+               log_info!(self.logger, "Cached network graph!");
        }
 }
index ac800795bf6de11125818ed49da813320ff3b938..96c1e4d28a08c7e6b99ec5c2b12a6f3dfd0ba75b 100644 (file)
@@ -1,25 +1,29 @@
 use std::collections::HashMap;
 use std::fs;
+use std::ops::Deref;
 use std::os::unix::fs::symlink;
 use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use lightning::log_info;
 
 use lightning::routing::gossip::NetworkGraph;
+use lightning::util::logger::Logger;
 
-use crate::{config, TestLogger};
+use crate::config;
 use crate::config::cache_path;
 
-pub(crate) struct Snapshotter {
-       network_graph: Arc<NetworkGraph<TestLogger>>,
+pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
+       network_graph: Arc<NetworkGraph<L>>,
+       logger: L
 }
 
-impl Snapshotter {
-       pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> Self {
-               Self { network_graph }
+impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
+       pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
+               Self { network_graph, logger }
        }
 
        pub(crate) async fn snapshot_gossip(&self) {
-               println!("Initiating snapshotting service");
+               log_info!(self.logger, "Initiating snapshotting service");
 
                let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
                let round_day_seconds = config::SNAPSHOT_CALCULATION_INTERVAL as u64;
@@ -35,7 +39,7 @@ impl Snapshotter {
                        // 1. get the current timestamp
                        let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
                        let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, round_day_seconds);
-                       println!("Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
+                       log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
 
                        // 2. sleep until the next round 24 hours
                        // 3. refresh all snapshots
@@ -75,14 +79,14 @@ impl Snapshotter {
                        for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
                                let network_graph_clone = self.network_graph.clone();
                                {
-                                       println!("Calculating {}-day snapshot", day_range);
+                                       log_info!(self.logger, "Calculating {}-day snapshot", day_range);
                                        // calculate the snapshot
-                                       let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32).await;
+                                       let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
 
                                        // persist the snapshot and update the symlink
                                        let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);
                                        let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
-                                       println!("Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
+                                       log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
                                        fs::write(&snapshot_path, snapshot.data).unwrap();
                                        snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
                                }
@@ -97,7 +101,7 @@ impl Snapshotter {
 
                                let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
                                let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
-                               println!("Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
+                               log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
                                symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
                        }
 
@@ -126,7 +130,7 @@ impl Snapshotter {
                                };
                                let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
 
-                               println!("Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
+                               log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
                                symlink(&relative_snapshot_path, &symlink_path).unwrap();
                        }
 
@@ -148,7 +152,7 @@ impl Snapshotter {
                        let remainder = current_time % round_day_seconds;
                        let time_until_next_day = round_day_seconds - remainder;
 
-                       println!("Sleeping until next snapshot capture: {}s", time_until_next_day);
+                       log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day);
                        // add in an extra five seconds to assure the rounding down works correctly
                        let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
                        sleep.await;
index 8d2668fcaf08ffde4128fcb6a68bfb8987e26c76..c53fc5b5f1f16f8e76eacdf0ac76743e53cfed58 100644 (file)
@@ -1,6 +1,7 @@
 use std::collections::hash_map::RandomState;
 use std::hash::{BuildHasher, Hasher};
 use std::net::SocketAddr;
+use std::ops::Deref;
 use std::sync::Arc;
 use std::time::{Duration, Instant};
 
@@ -10,17 +11,21 @@ use lightning;
 use lightning::ln::peer_handler::{
        ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
 };
+use lightning::{log_error, log_info, log_warn};
 use lightning::routing::gossip::NetworkGraph;
 use lightning::sign::KeysManager;
+use lightning::util::logger::Logger;
 use tokio::sync::mpsc;
 
-use crate::{config, TestLogger};
+use crate::config;
 use crate::downloader::GossipRouter;
 use crate::types::{GossipMessage, GossipPeerManager};
 
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>,
+pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
                completion_sender: mpsc::Sender<()>,
-               network_graph: Arc<NetworkGraph<TestLogger>>) {
+               network_graph: Arc<NetworkGraph<L>>,
+               logger: L
+) where L::Target: Logger {
        let mut key = [42; 32];
        let mut random_data = [43; 32];
        // Get something psuedo-random from std.
@@ -33,7 +38,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
 
        let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
 
-       let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
+       let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
 
        let message_handler = MessageHandler {
                chan_handler: ErroringMessageHandler::new(),
@@ -45,7 +50,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
                message_handler,
                0xdeadbeef,
                &random_data,
-               TestLogger::new(),
+               logger.clone(),
                keys_manager,
        ));
        router.set_pm(Arc::clone(&peer_handler));
@@ -59,12 +64,12 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
                }
        });
 
-       println!("Connecting to Lightning peers...");
+       log_info!(logger, "Connecting to Lightning peers...");
        let peers = config::ln_peers();
        let mut connected_peer_count = 0;
 
        for current_peer in peers {
-               let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&peer_handler)).await;
+               let initial_connection_succeeded = connect_peer(current_peer, peer_handler.clone(), logger.clone()).await;
                if initial_connection_succeeded {
                        connected_peer_count += 1;
                }
@@ -74,7 +79,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
                panic!("Failed to connect to any peer.");
        }
 
-       println!("Connected to {} Lightning peers!", connected_peer_count);
+       log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count);
 
        tokio::spawn(async move {
                let mut previous_announcement_count = 0u64;
@@ -104,7 +109,8 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
 
                                // if we either aren't caught up, or just stopped/started being caught up
                                if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
-                                       println!(
+                                       log_info!(
+                                               logger,
                                                "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
                                                i,
                                                total_message_count,
@@ -115,19 +121,19 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
                                                counter.channel_updates_without_htlc_max_msats
                                        );
                                } else {
-                                       println!("Monitoring for gossip…")
+                                       log_info!(logger, "Monitoring for gossip…")
                                }
 
                                if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
-                                       println!("caught up with gossip!");
+                                       log_info!(logger, "caught up with gossip!");
                                        needs_to_notify_persister = true;
                                } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
-                                       println!("Received new messages since catching up with gossip!");
+                                       log_info!(logger, "Received new messages since catching up with gossip!");
                                }
 
                                let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
                                if continuous_caught_up_duration.as_secs() > 600 {
-                                       eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
+                                       log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
                                }
 
                                previous_announcement_count = counter.channel_announcements;
@@ -142,19 +148,19 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
        });
 }
 
-async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
-       eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
+async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>, logger: L) -> bool where L::Target: Logger {
+       log_info!(logger, "Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
        let connection = lightning_net_tokio::connect_outbound(
                Arc::clone(&peer_manager),
                current_peer.0,
                current_peer.1,
        ).await;
        if let Some(disconnection_future) = connection {
-               eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
+               log_info!(logger, "Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
                tokio::spawn(async move {
                        disconnection_future.await;
                        loop {
-                               eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
+                               log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
                                if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
                                        Arc::clone(&peer_manager),
                                        current_peer.0,
@@ -167,7 +173,7 @@ async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: Gossi
                });
                true
        } else {
-               eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
+               log_error!(logger, "Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
                false
        }
 }
index 77a53c477c907fe6c90c48d8bdd1de31b721fd15..0b03081f35814eda15182f110bcf96cbf9cc6993 100644 (file)
@@ -1,16 +1,16 @@
 use std::sync::Arc;
-use std::ops::Deref;
 
 use lightning::sign::KeysManager;
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
 use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager};
 use lightning::util::logger::{Logger, Record};
+use crate::config;
 
 use crate::downloader::GossipRouter;
 use crate::verifier::ChainVerifier;
 
-pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
-pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc<KeysManager>>>;
+pub(crate) type GossipChainAccess<L> = Arc<ChainVerifier<L>>;
+pub(crate) type GossipPeerManager<L> = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter<L>>, IgnoringMessageHandler, L, IgnoringMessageHandler, Arc<KeysManager>>>;
 
 #[derive(Debug)]
 pub(crate) enum GossipMessage {
@@ -19,21 +19,20 @@ pub(crate) enum GossipMessage {
 }
 
 #[derive(Clone, Copy)]
-pub(crate) struct TestLogger {}
-impl Deref for TestLogger {
-       type Target = Self;
-       fn deref(&self) -> &Self { self }
-}
+pub struct RGSSLogger {}
 
-impl TestLogger {
-       pub(crate) fn new() -> TestLogger {
+impl RGSSLogger {
+       pub fn new() -> RGSSLogger {
                Self {}
        }
 }
 
-impl Logger for TestLogger {
+impl Logger for RGSSLogger {
        fn log(&self, record: &Record) {
-               // TODO: allow log level threshold to be set
+               let threshold = config::log_level();
+               if record.level < threshold {
+                       return;
+               }
                println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
        }
 }
index 4bda871ad124d85336576f0b2e9b08e3b9c5a92b..6813ff7d3dd766da51d0ae23779e27f2ea8c76de 100644 (file)
@@ -1,60 +1,70 @@
 use std::convert::TryInto;
+use std::ops::Deref;
 use std::sync::Arc;
 use std::sync::Mutex;
 
 use bitcoin::{BlockHash, TxOut};
 use bitcoin::blockdata::block::Block;
 use bitcoin::hashes::Hash;
+use lightning::log_error;
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError};
+use lightning::util::logger::Logger;
 use lightning_block_sync::{BlockData, BlockSource};
 use lightning_block_sync::http::BinaryResponse;
 use lightning_block_sync::rest::RestClient;
 
 use crate::config;
-use crate::TestLogger;
 use crate::types::GossipPeerManager;
 
-pub(crate) struct ChainVerifier {
+pub(crate) struct ChainVerifier<L: Deref + Clone + Send + Sync + 'static> where L::Target: Logger {
        rest_client: Arc<RestClient>,
-       graph: Arc<NetworkGraph<TestLogger>>,
-       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>,
-       peer_handler: Mutex<Option<GossipPeerManager>>,
+       graph: Arc<NetworkGraph<L>>,
+       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
+       peer_handler: Mutex<Option<GossipPeerManager<L>>>,
+       logger: L
 }
 
 struct RestBinaryResponse(Vec<u8>);
 
-impl ChainVerifier {
-       pub(crate) fn new(graph: Arc<NetworkGraph<TestLogger>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>) -> Self {
+impl<L: Deref + Clone + Send + Sync + 'static> ChainVerifier<L> where L::Target: Logger {
+       pub(crate) fn new(graph: Arc<NetworkGraph<L>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>, logger: L) -> Self {
                ChainVerifier {
                        rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()),
                        outbound_gossiper,
                        graph,
                        peer_handler: Mutex::new(None),
+                       logger
                }
        }
-       pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) {
+       pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager<L>) {
                *self.peer_handler.lock().unwrap() = Some(peer_handler);
        }
 
-       async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
+       async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
                let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
                let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
                let output_index = (short_channel_id & 0xffff) as u16;
 
-               let mut block = Self::retrieve_block(client, block_height).await?;
-               if transaction_index as usize >= block.txdata.len() { return Err(UtxoLookupError::UnknownTx); }
+               let mut block = Self::retrieve_block(client, block_height, logger.clone()).await?;
+               if transaction_index as usize >= block.txdata.len() {
+                       log_error!(logger, "Could't find transaction {} in block {}", transaction_index, block_height);
+                       return Err(UtxoLookupError::UnknownTx);
+               }
                let mut transaction = block.txdata.swap_remove(transaction_index as usize);
-               if output_index as usize >= transaction.output.len() { return Err(UtxoLookupError::UnknownTx); }
+               if output_index as usize >= transaction.output.len() {
+                       log_error!(logger, "Could't find output {} in transaction {}", output_index, transaction.txid());
+                       return Err(UtxoLookupError::UnknownTx);
+               }
                Ok(transaction.output.swap_remove(output_index as usize))
        }
 
-       async fn retrieve_block(client: Arc<RestClient>, block_height: u32) -> Result<Block, UtxoLookupError> {
+       async fn retrieve_block(client: Arc<RestClient>, block_height: u32, logger: L) -> Result<Block, UtxoLookupError> {
                let uri = format!("blockhashbyheight/{}.bin", block_height);
                let block_hash_result =
                        client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
                let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
-                       eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
+                       log_error!(logger, "Could't find block hash at height {}: {}", block_height, error.to_string());
                        UtxoLookupError::UnknownChain
                })?.0;
                let block_hash = BlockHash::from_slice(&block_hash).unwrap();
@@ -66,14 +76,14 @@ impl ChainVerifier {
                        },
                        Ok(_) => unreachable!(),
                        Err(error) => {
-                               eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
+                               log_error!(logger, "Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
                                Err(UtxoLookupError::UnknownChain)
                        }
                }
        }
 }
 
-impl UtxoLookup for ChainVerifier {
+impl<L: Deref + Clone + Send + Sync + 'static> UtxoLookup for ChainVerifier<L> where L::Target: Logger {
        fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult {
                let res = UtxoFuture::new();
                let fut = res.clone();
@@ -81,8 +91,9 @@ impl UtxoLookup for ChainVerifier {
                let client_ref = Arc::clone(&self.rest_client);
                let gossip_ref = Arc::clone(&self.outbound_gossiper);
                let pm_ref = self.peer_handler.lock().unwrap().clone();
+               let logger_ref = self.logger.clone();
                tokio::spawn(async move {
-                       let res = Self::retrieve_utxo(client_ref, short_channel_id).await;
+                       let res = Self::retrieve_utxo(client_ref, short_channel_id, logger_ref).await;
                        fut.resolve(&*graph_ref, &*gossip_ref, res);
                        if let Some(pm) = pm_ref { pm.process_events(); }
                });