From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Sun, 13 Aug 2023 05:30:42 +0000 (+0000) Subject: Merge pull request #51 from arik-so/2023/08/pre_test_refactors X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=55e1b5def33838c2b690cff9823ed4b7bc0afbca;hp=5e6a00a0509afe388b0dad721928cddd75dbcee2;p=rapid-gossip-sync-server Merge pull request #51 from arik-so/2023/08/pre_test_refactors Allow arbitrary logger types. --- diff --git a/Cargo.toml b/Cargo.toml index ec3f3a6..99aeed7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,9 @@ edition = "2021" [dependencies] bitcoin = "0.29" -lightning = { version = "0.0.116-alpha1" } -lightning-block-sync = { version = "0.0.116-alpha1", features=["rest-client"] } -lightning-net-tokio = { version = "0.0.116-alpha1" } +lightning = { version = "0.0.116" } +lightning-block-sync = { version = "0.0.116", features=["rest-client"] } +lightning-net-tokio = { version = "0.0.116" } tokio = { version = "1.25", features = ["full"] } tokio-postgres = { version="=0.7.5" } futures = "0.3" diff --git a/src/config.rs b/src/config.rs index b0e3a7c..804b0f8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -35,6 +35,19 @@ pub(crate) fn network() -> Network { } } +pub(crate) fn log_level() -> lightning::util::logger::Level { + let level = env::var("RAPID_GOSSIP_SYNC_SERVER_LOG_LEVEL").unwrap_or("info".to_string()).to_lowercase(); + match level.as_str() { + "gossip" => lightning::util::logger::Level::Gossip, + "trace" => lightning::util::logger::Level::Trace, + "debug" => lightning::util::logger::Level::Debug, + "info" => lightning::util::logger::Level::Info, + "warn" => lightning::util::logger::Level::Warn, + "error" => lightning::util::logger::Level::Error, + _ => panic!("Invalid log level"), + } +} + pub(crate) fn network_graph_cache_path() -> String { format!("{}/network_graph.bin", cache_path()) } diff --git a/src/downloader.rs b/src/downloader.rs index ac11ec7..8d5cdb0 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -1,3 +1,4 @@ +use std::ops::Deref; use std::sync::{Arc, RwLock}; use bitcoin::secp256k1::PublicKey; @@ -5,10 +6,10 @@ use lightning::events::{MessageSendEvent, MessageSendEventsProvider}; use lightning::ln::features::{InitFeatures, NodeFeatures}; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; +use lightning::util::logger::Logger; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use crate::TestLogger; use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager}; use crate::verifier::ChainVerifier; @@ -30,28 +31,28 @@ impl GossipCounter { } } -pub(crate) struct GossipRouter { - native_router: P2PGossipSync>, GossipChainAccess, TestLogger>, +pub(crate) struct GossipRouter where L::Target: Logger { + native_router: P2PGossipSync>, GossipChainAccess, L>, pub(crate) counter: RwLock, sender: mpsc::Sender, - verifier: Arc, - outbound_gossiper: Arc>, GossipChainAccess, TestLogger>>, + verifier: Arc>, + outbound_gossiper: Arc>, GossipChainAccess, L>>, } -impl GossipRouter { - pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender) -> Self { - let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new())); - let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper))); +impl GossipRouter where L::Target: Logger { + pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender, logger: L) -> Self { + let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone())); + let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone())); Self { - native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()), + native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()), outbound_gossiper, counter: RwLock::new(GossipCounter::new()), sender, - verifier, + verifier } } - pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) { + pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) { self.verifier.set_ph(peer_handler); } @@ -83,7 +84,7 @@ impl GossipRouter { } } -impl MessageSendEventsProvider for GossipRouter { +impl MessageSendEventsProvider for GossipRouter where L::Target: Logger { fn get_and_clear_pending_msg_events(&self) -> Vec { let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events(); for ev in gossip_evs { @@ -102,7 +103,7 @@ impl MessageSendEventsProvider for GossipRouter { } } -impl RoutingMessageHandler for GossipRouter { +impl RoutingMessageHandler for GossipRouter where L::Target: Logger { fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { self.native_router.handle_node_announcement(msg) } diff --git a/src/lib.rs b/src/lib.rs index 37bbe4c..b6281bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,9 +12,12 @@ extern crate core; use std::collections::{HashMap, HashSet}; use std::fs::File; use std::io::BufReader; +use std::ops::Deref; use std::sync::Arc; +use lightning::log_info; use lightning::routing::gossip::{NetworkGraph, NodeId}; +use lightning::util::logger::Logger; use lightning::util::ser::{ReadableArgs, Writeable}; use tokio::sync::mpsc; use crate::lookup::DeltaSet; @@ -22,10 +25,9 @@ use crate::lookup::DeltaSet; use crate::persistence::GossipPersister; use crate::serialization::UpdateSerialization; use crate::snapshot::Snapshotter; -use crate::types::TestLogger; +use crate::types::RGSSLogger; mod downloader; -mod types; mod tracking; mod lookup; mod persistence; @@ -35,14 +37,17 @@ mod config; mod hex_utils; mod verifier; +pub mod types; + /// The purpose of this prefix is to identify the serialization format, should other rapid gossip /// sync formats arise in the future. /// /// The fourth byte is the protocol version in case our format gets updated. const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1]; -pub struct RapidSyncProcessor { - network_graph: Arc>, +pub struct RapidSyncProcessor where L::Target: Logger { + network_graph: Arc>, + logger: L } pub struct SerializedResponse { @@ -54,27 +59,27 @@ pub struct SerializedResponse { pub update_count_incremental: u32, } -impl RapidSyncProcessor { - pub fn new() -> Self { +impl RapidSyncProcessor where L::Target: Logger { + pub fn new(logger: L) -> Self { let network = config::network(); - let logger = TestLogger::new(); let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) { - println!("Initializing from cached network graph…"); + log_info!(logger, "Initializing from cached network graph…"); let mut buffered_reader = BufReader::new(file); - let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger); + let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger.clone()); if let Ok(network_graph) = network_graph_result { - println!("Initialized from cached network graph!"); + log_info!(logger, "Initialized from cached network graph!"); network_graph } else { - println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap()); - NetworkGraph::new(network, logger) + log_info!(logger, "Initialization from cached network graph failed: {}", network_graph_result.err().unwrap()); + NetworkGraph::new(network, logger.clone()) } } else { - NetworkGraph::new(network, logger) + NetworkGraph::new(network, logger.clone()) }; let arc_network_graph = Arc::new(network_graph); Self { network_graph: arc_network_graph, + logger } } @@ -83,12 +88,12 @@ impl RapidSyncProcessor { let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1); if config::DOWNLOAD_NEW_GOSSIP { - let (mut persister, persistence_sender) = GossipPersister::new(Arc::clone(&self.network_graph)); + let (mut persister, persistence_sender) = GossipPersister::new(self.network_graph.clone(), self.logger.clone()); - println!("Starting gossip download"); + log_info!(self.logger, "Starting gossip download"); tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender, - Arc::clone(&self.network_graph))); - println!("Starting gossip db persistence listener"); + Arc::clone(&self.network_graph), self.logger.clone())); + log_info!(self.logger, "Starting gossip db persistence listener"); tokio::spawn(async move { persister.persist_gossip().await; }); } else { sync_completion_sender.send(()).await.unwrap(); @@ -98,10 +103,10 @@ impl RapidSyncProcessor { if sync_completion.is_none() { panic!("Sync failed!"); } - println!("Initial sync complete!"); + log_info!(self.logger, "Initial sync complete!"); // start the gossip snapshotting service - Snapshotter::new(Arc::clone(&self.network_graph)).snapshot_gossip().await; + Snapshotter::new(Arc::clone(&self.network_graph), self.logger.clone()).snapshot_gossip().await; } } @@ -126,7 +131,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { let chain_hash = genesis_block.block_hash(); chain_hash.write(&mut blob).unwrap(); - let blob_timestamp = Snapshotter::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32; + let blob_timestamp = Snapshotter::>::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32; blob_timestamp.write(&mut blob).unwrap(); 0u32.write(&mut blob).unwrap(); // node count @@ -136,7 +141,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { blob } -async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32) -> SerializedResponse { +async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32, logger: L) -> SerializedResponse where L::Target: Logger { let (client, connection) = lookup::connect_to_db().await; network_graph.remove_stale_channels_and_tracking(); @@ -170,12 +175,12 @@ async fn serialize_delta(network_graph: Arc>, last_sync }; let mut delta_set = DeltaSet::new(); - lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await; - println!("announcement channel count: {}", delta_set.len()); - lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp).await; - println!("update-fetched channel count: {}", delta_set.len()); - lookup::filter_delta_set(&mut delta_set); - println!("update-filtered channel count: {}", delta_set.len()); + lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, logger.clone()).await; + log_info!(logger, "announcement channel count: {}", delta_set.len()); + lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await; + log_info!(logger, "update-fetched channel count: {}", delta_set.len()); + lookup::filter_delta_set(&mut delta_set, logger.clone()); + log_info!(logger, "update-filtered channel count: {}", delta_set.len()); let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp); // process announcements @@ -246,8 +251,8 @@ async fn serialize_delta(network_graph: Arc>, last_sync prefixed_output.append(&mut output); - println!("duplicated node ids: {}", duplicate_node_ids); - println!("latest seen timestamp: {:?}", serialization_details.latest_seen); + log_info!(logger, "duplicated node ids: {}", duplicate_node_ids); + log_info!(logger, "latest seen timestamp: {:?}", serialization_details.latest_seen); SerializedResponse { data: prefixed_output, diff --git a/src/lookup.rs b/src/lookup.rs index c554f9f..1c6b418 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, HashSet}; use std::io::Cursor; -use std::ops::Add; +use std::ops::{Add, Deref}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; @@ -11,8 +11,10 @@ use tokio_postgres::{Client, Connection, NoTls, Socket}; use tokio_postgres::tls::NoTlsStream; use futures::StreamExt; +use lightning::log_info; +use lightning::util::logger::Logger; -use crate::{config, TestLogger}; +use crate::config; use crate::serialization::MutatedProperties; /// The delta set needs to be a BTreeMap so the keys are sorted. @@ -75,12 +77,12 @@ pub(super) async fn connect_to_db() -> (Client, Connection) /// whether they had been seen before. /// Also include all announcements for which the first update was announced /// after `last_sync_timestamp` -pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32) { - println!("Obtaining channel ids from network graph"); +pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger { + log_info!(logger, "Obtaining channel ids from network graph"); let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); let channel_ids = { let read_only_graph = network_graph.read_only(); - println!("Retrieved read-only network graph copy"); + log_info!(logger, "Retrieved read-only network graph copy"); let channel_iterator = read_only_graph.channels().unordered_iter(); channel_iterator .filter(|c| c.1.announcement_message.is_some()) @@ -88,7 +90,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ .collect::>() }; - println!("Obtaining corresponding database entries"); + log_info!(logger, "Obtaining corresponding database entries"); // get all the channel announcements that are currently in the network graph let announcement_rows = client.query_raw("SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap(); let mut pinned_rows = Box::pin(announcement_rows); @@ -113,7 +115,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ { // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA - println!("Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync"); + log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync"); // Steps: // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction) @@ -155,7 +157,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ { // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT - println!("Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago"); + log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago"); // Steps: // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction) @@ -213,7 +215,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ } } -pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32) { +pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger { let start = Instant::now(); let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); @@ -235,7 +237,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli ", [last_sync_timestamp_object]).await.unwrap(); let mut pinned_rows = Box::pin(reference_rows); - println!("Fetched reference rows in {:?}", start.elapsed()); + log_info!(logger, "Fetched reference rows in {:?}", start.elapsed()); let mut last_seen_update_ids: Vec = Vec::new(); let mut non_intermediate_ids: HashSet = HashSet::new(); @@ -263,7 +265,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli reference_row_count += 1; } - println!("Processed {} reference rows (delta size: {}) in {:?}", + log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}", reference_row_count, delta_set.len(), start.elapsed()); // get all the intermediate channel updates @@ -276,7 +278,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli WHERE seen >= $1 ", [last_sync_timestamp_object]).await.unwrap(); let mut pinned_updates = Box::pin(intermediate_updates); - println!("Fetched intermediate rows in {:?}", start.elapsed()); + log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed()); let mut previous_scid = u64::MAX; let mut previously_seen_directions = (false, false); @@ -351,10 +353,10 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli } } } - println!("Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); + log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); } -pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) { +pub(super) fn filter_delta_set(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger { let original_length = delta_set.len(); let keys: Vec = delta_set.keys().cloned().collect(); for k in keys { @@ -386,6 +388,6 @@ pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) { let new_length = delta_set.len(); if original_length != new_length { - println!("length modified!"); + log_info!(logger, "length modified!"); } } diff --git a/src/main.rs b/src/main.rs index e3468be..dae4d3e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; use rapid_gossip_sync_server::RapidSyncProcessor; +use rapid_gossip_sync_server::types::RGSSLogger; #[tokio::main] async fn main() { - RapidSyncProcessor::new().start_sync().await; + let logger = Arc::new(RGSSLogger::new()); + RapidSyncProcessor::new(logger).start_sync().await; } diff --git a/src/persistence.rs b/src/persistence.rs index ac66733..f638894 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,29 +1,34 @@ use std::fs::OpenOptions; use std::io::{BufWriter, Write}; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; +use lightning::log_info; use lightning::routing::gossip::NetworkGraph; +use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use tokio::sync::mpsc; use tokio_postgres::NoTls; -use crate::{config, TestLogger}; +use crate::config; use crate::types::GossipMessage; const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15); -pub(crate) struct GossipPersister { +pub(crate) struct GossipPersister where L::Target: Logger { gossip_persistence_receiver: mpsc::Receiver, - network_graph: Arc>, + network_graph: Arc>, + logger: L } -impl GossipPersister { - pub fn new(network_graph: Arc>) -> (Self, mpsc::Sender) { +impl GossipPersister where L::Target: Logger { + pub fn new(network_graph: Arc>, logger: L) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); (GossipPersister { gossip_persistence_receiver, - network_graph + network_graph, + logger }, gossip_persistence_sender) } @@ -99,7 +104,7 @@ impl GossipPersister { i += 1; // count the persisted gossip messages if latest_persistence_log.elapsed().as_secs() >= 60 { - println!("Persisting gossip message #{}", i); + log_info!(self.logger, "Persisting gossip message #{}", i); latest_persistence_log = Instant::now(); } @@ -177,7 +182,7 @@ impl GossipPersister { } fn persist_network_graph(&self) { - println!("Caching network graph…"); + log_info!(self.logger, "Caching network graph…"); let cache_path = config::network_graph_cache_path(); let file = OpenOptions::new() .create(true) @@ -189,6 +194,6 @@ impl GossipPersister { let mut writer = BufWriter::new(file); self.network_graph.write(&mut writer).unwrap(); writer.flush().unwrap(); - println!("Cached network graph!"); + log_info!(self.logger, "Cached network graph!"); } } diff --git a/src/snapshot.rs b/src/snapshot.rs index ac80079..96c1e4d 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -1,25 +1,29 @@ use std::collections::HashMap; use std::fs; +use std::ops::Deref; use std::os::unix::fs::symlink; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use lightning::log_info; use lightning::routing::gossip::NetworkGraph; +use lightning::util::logger::Logger; -use crate::{config, TestLogger}; +use crate::config; use crate::config::cache_path; -pub(crate) struct Snapshotter { - network_graph: Arc>, +pub(crate) struct Snapshotter where L::Target: Logger { + network_graph: Arc>, + logger: L } -impl Snapshotter { - pub fn new(network_graph: Arc>) -> Self { - Self { network_graph } +impl Snapshotter where L::Target: Logger { + pub fn new(network_graph: Arc>, logger: L) -> Self { + Self { network_graph, logger } } pub(crate) async fn snapshot_gossip(&self) { - println!("Initiating snapshotting service"); + log_info!(self.logger, "Initiating snapshotting service"); let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX]; let round_day_seconds = config::SNAPSHOT_CALCULATION_INTERVAL as u64; @@ -35,7 +39,7 @@ impl Snapshotter { // 1. get the current timestamp let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, round_day_seconds); - println!("Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp); + log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp); // 2. sleep until the next round 24 hours // 3. refresh all snapshots @@ -75,14 +79,14 @@ impl Snapshotter { for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps { let network_graph_clone = self.network_graph.clone(); { - println!("Calculating {}-day snapshot", day_range); + log_info!(self.logger, "Calculating {}-day snapshot", day_range); // calculate the snapshot - let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32).await; + let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await; // persist the snapshot and update the symlink let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp); let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename); - println!("Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental); + log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental); fs::write(&snapshot_path, snapshot.data).unwrap(); snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename); } @@ -97,7 +101,7 @@ impl Snapshotter { let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp); let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename); - println!("Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path); + log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path); symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap(); } @@ -126,7 +130,7 @@ impl Snapshotter { }; let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp); - println!("Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path); + log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path); symlink(&relative_snapshot_path, &symlink_path).unwrap(); } @@ -148,7 +152,7 @@ impl Snapshotter { let remainder = current_time % round_day_seconds; let time_until_next_day = round_day_seconds - remainder; - println!("Sleeping until next snapshot capture: {}s", time_until_next_day); + log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day); // add in an extra five seconds to assure the rounding down works correctly let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5)); sleep.await; diff --git a/src/tracking.rs b/src/tracking.rs index 8d2668f..c53fc5b 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -1,6 +1,7 @@ use std::collections::hash_map::RandomState; use std::hash::{BuildHasher, Hasher}; use std::net::SocketAddr; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -10,17 +11,21 @@ use lightning; use lightning::ln::peer_handler::{ ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager, }; +use lightning::{log_error, log_info, log_warn}; use lightning::routing::gossip::NetworkGraph; use lightning::sign::KeysManager; +use lightning::util::logger::Logger; use tokio::sync::mpsc; -use crate::{config, TestLogger}; +use crate::config; use crate::downloader::GossipRouter; use crate::types::{GossipMessage, GossipPeerManager}; -pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, +pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, completion_sender: mpsc::Sender<()>, - network_graph: Arc>) { + network_graph: Arc>, + logger: L +) where L::Target: Logger { let mut key = [42; 32]; let mut random_data = [43; 32]; // Get something psuedo-random from std. @@ -33,7 +38,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender 600 { - eprintln!("No new gossip messages in 10 minutes! Something's amiss!"); + log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!"); } previous_announcement_count = counter.channel_announcements; @@ -142,19 +148,19 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender bool { - eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); +async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager, logger: L) -> bool where L::Target: Logger { + log_info!(logger, "Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); let connection = lightning_net_tokio::connect_outbound( Arc::clone(&peer_manager), current_peer.0, current_peer.1, ).await; if let Some(disconnection_future) = connection { - eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string()); + log_info!(logger, "Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string()); tokio::spawn(async move { disconnection_future.await; loop { - eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); + log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); if let Some(disconnection_future) = lightning_net_tokio::connect_outbound( Arc::clone(&peer_manager), current_peer.0, @@ -167,7 +173,7 @@ async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: Gossi }); true } else { - eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string()); + log_error!(logger, "Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string()); false } } diff --git a/src/types.rs b/src/types.rs index 77a53c4..0b03081 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,16 +1,16 @@ use std::sync::Arc; -use std::ops::Deref; use lightning::sign::KeysManager; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate}; use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager}; use lightning::util::logger::{Logger, Record}; +use crate::config; use crate::downloader::GossipRouter; use crate::verifier::ChainVerifier; -pub(crate) type GossipChainAccess = Arc; -pub(crate) type GossipPeerManager = Arc, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc>>; +pub(crate) type GossipChainAccess = Arc>; +pub(crate) type GossipPeerManager = Arc>, IgnoringMessageHandler, L, IgnoringMessageHandler, Arc>>; #[derive(Debug)] pub(crate) enum GossipMessage { @@ -19,21 +19,20 @@ pub(crate) enum GossipMessage { } #[derive(Clone, Copy)] -pub(crate) struct TestLogger {} -impl Deref for TestLogger { - type Target = Self; - fn deref(&self) -> &Self { self } -} +pub struct RGSSLogger {} -impl TestLogger { - pub(crate) fn new() -> TestLogger { +impl RGSSLogger { + pub fn new() -> RGSSLogger { Self {} } } -impl Logger for TestLogger { +impl Logger for RGSSLogger { fn log(&self, record: &Record) { - // TODO: allow log level threshold to be set + let threshold = config::log_level(); + if record.level < threshold { + return; + } println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args); } } diff --git a/src/verifier.rs b/src/verifier.rs index 4bda871..6813ff7 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -1,60 +1,70 @@ use std::convert::TryInto; +use std::ops::Deref; use std::sync::Arc; use std::sync::Mutex; use bitcoin::{BlockHash, TxOut}; use bitcoin::blockdata::block::Block; use bitcoin::hashes::Hash; +use lightning::log_error; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError}; +use lightning::util::logger::Logger; use lightning_block_sync::{BlockData, BlockSource}; use lightning_block_sync::http::BinaryResponse; use lightning_block_sync::rest::RestClient; use crate::config; -use crate::TestLogger; use crate::types::GossipPeerManager; -pub(crate) struct ChainVerifier { +pub(crate) struct ChainVerifier where L::Target: Logger { rest_client: Arc, - graph: Arc>, - outbound_gossiper: Arc>, Arc, TestLogger>>, - peer_handler: Mutex>, + graph: Arc>, + outbound_gossiper: Arc>, Arc, L>>, + peer_handler: Mutex>>, + logger: L } struct RestBinaryResponse(Vec); -impl ChainVerifier { - pub(crate) fn new(graph: Arc>, outbound_gossiper: Arc>, Arc, TestLogger>>) -> Self { +impl ChainVerifier where L::Target: Logger { + pub(crate) fn new(graph: Arc>, outbound_gossiper: Arc>, Arc, L>>, logger: L) -> Self { ChainVerifier { rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()), outbound_gossiper, graph, peer_handler: Mutex::new(None), + logger } } - pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) { + pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) { *self.peer_handler.lock().unwrap() = Some(peer_handler); } - async fn retrieve_utxo(client: Arc, short_channel_id: u64) -> Result { + async fn retrieve_utxo(client: Arc, short_channel_id: u64, logger: L) -> Result { let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32; let output_index = (short_channel_id & 0xffff) as u16; - let mut block = Self::retrieve_block(client, block_height).await?; - if transaction_index as usize >= block.txdata.len() { return Err(UtxoLookupError::UnknownTx); } + let mut block = Self::retrieve_block(client, block_height, logger.clone()).await?; + if transaction_index as usize >= block.txdata.len() { + log_error!(logger, "Could't find transaction {} in block {}", transaction_index, block_height); + return Err(UtxoLookupError::UnknownTx); + } let mut transaction = block.txdata.swap_remove(transaction_index as usize); - if output_index as usize >= transaction.output.len() { return Err(UtxoLookupError::UnknownTx); } + if output_index as usize >= transaction.output.len() { + log_error!(logger, "Could't find output {} in transaction {}", output_index, transaction.txid()); + return Err(UtxoLookupError::UnknownTx); + } Ok(transaction.output.swap_remove(output_index as usize)) } - async fn retrieve_block(client: Arc, block_height: u32) -> Result { + async fn retrieve_block(client: Arc, block_height: u32, logger: L) -> Result { let uri = format!("blockhashbyheight/{}.bin", block_height); let block_hash_result = client.request_resource::(&uri).await; let block_hash: Vec = block_hash_result.map_err(|error| { - eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string()); + log_error!(logger, "Could't find block hash at height {}: {}", block_height, error.to_string()); UtxoLookupError::UnknownChain })?.0; let block_hash = BlockHash::from_slice(&block_hash).unwrap(); @@ -66,14 +76,14 @@ impl ChainVerifier { }, Ok(_) => unreachable!(), Err(error) => { - eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash); + log_error!(logger, "Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash); Err(UtxoLookupError::UnknownChain) } } } } -impl UtxoLookup for ChainVerifier { +impl UtxoLookup for ChainVerifier where L::Target: Logger { fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult { let res = UtxoFuture::new(); let fut = res.clone(); @@ -81,8 +91,9 @@ impl UtxoLookup for ChainVerifier { let client_ref = Arc::clone(&self.rest_client); let gossip_ref = Arc::clone(&self.outbound_gossiper); let pm_ref = self.peer_handler.lock().unwrap().clone(); + let logger_ref = self.logger.clone(); tokio::spawn(async move { - let res = Self::retrieve_utxo(client_ref, short_channel_id).await; + let res = Self::retrieve_utxo(client_ref, short_channel_id, logger_ref).await; fut.resolve(&*graph_ref, &*gossip_ref, res); if let Some(pm) = pm_ref { pm.process_events(); } });