Allow arbitrary logger types.
[dependencies]
bitcoin = "0.29"
-lightning = { version = "0.0.116-alpha1" }
-lightning-block-sync = { version = "0.0.116-alpha1", features=["rest-client"] }
-lightning-net-tokio = { version = "0.0.116-alpha1" }
+lightning = { version = "0.0.116" }
+lightning-block-sync = { version = "0.0.116", features=["rest-client"] }
+lightning-net-tokio = { version = "0.0.116" }
tokio = { version = "1.25", features = ["full"] }
tokio-postgres = { version="=0.7.5" }
futures = "0.3"
}
}
+pub(crate) fn log_level() -> lightning::util::logger::Level {
+ let level = env::var("RAPID_GOSSIP_SYNC_SERVER_LOG_LEVEL").unwrap_or("info".to_string()).to_lowercase();
+ match level.as_str() {
+ "gossip" => lightning::util::logger::Level::Gossip,
+ "trace" => lightning::util::logger::Level::Trace,
+ "debug" => lightning::util::logger::Level::Debug,
+ "info" => lightning::util::logger::Level::Info,
+ "warn" => lightning::util::logger::Level::Warn,
+ "error" => lightning::util::logger::Level::Error,
+ _ => panic!("Invalid log level"),
+ }
+}
+
pub(crate) fn network_graph_cache_path() -> String {
format!("{}/network_graph.bin", cache_path())
}
+use std::ops::Deref;
use std::sync::{Arc, RwLock};
use bitcoin::secp256k1::PublicKey;
use lightning::ln::features::{InitFeatures, NodeFeatures};
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
+use lightning::util::logger::Logger;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
-use crate::TestLogger;
use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
use crate::verifier::ChainVerifier;
}
}
-pub(crate) struct GossipRouter {
- native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
+pub(crate) struct GossipRouter<L: Deref + Clone + Send + Sync + 'static> where L::Target: Logger {
+ native_router: P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>,
pub(crate) counter: RwLock<GossipCounter>,
sender: mpsc::Sender<GossipMessage>,
- verifier: Arc<ChainVerifier>,
- outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
+ verifier: Arc<ChainVerifier<L>>,
+ outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>>,
}
-impl GossipRouter {
- pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
- let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
- let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
+impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
+ pub(crate) fn new(network_graph: Arc<NetworkGraph<L>>, sender: mpsc::Sender<GossipMessage>, logger: L) -> Self {
+ let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone()));
+ let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone()));
Self {
- native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
+ native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()),
outbound_gossiper,
counter: RwLock::new(GossipCounter::new()),
sender,
- verifier,
+ verifier
}
}
- pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
+ pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager<L>) {
self.verifier.set_ph(peer_handler);
}
}
}
-impl MessageSendEventsProvider for GossipRouter {
+impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<L> where L::Target: Logger {
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
for ev in gossip_evs {
}
}
-impl RoutingMessageHandler for GossipRouter {
+impl<L: Deref + Clone + Send + Sync> RoutingMessageHandler for GossipRouter<L> where L::Target: Logger {
fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
self.native_router.handle_node_announcement(msg)
}
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::BufReader;
+use std::ops::Deref;
use std::sync::Arc;
+use lightning::log_info;
use lightning::routing::gossip::{NetworkGraph, NodeId};
+use lightning::util::logger::Logger;
use lightning::util::ser::{ReadableArgs, Writeable};
use tokio::sync::mpsc;
use crate::lookup::DeltaSet;
use crate::persistence::GossipPersister;
use crate::serialization::UpdateSerialization;
use crate::snapshot::Snapshotter;
-use crate::types::TestLogger;
+use crate::types::RGSSLogger;
mod downloader;
-mod types;
mod tracking;
mod lookup;
mod persistence;
mod hex_utils;
mod verifier;
+pub mod types;
+
/// The purpose of this prefix is to identify the serialization format, should other rapid gossip
/// sync formats arise in the future.
///
/// The fourth byte is the protocol version in case our format gets updated.
const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];
-pub struct RapidSyncProcessor {
- network_graph: Arc<NetworkGraph<TestLogger>>,
+pub struct RapidSyncProcessor<L: Deref> where L::Target: Logger {
+ network_graph: Arc<NetworkGraph<L>>,
+ logger: L
}
pub struct SerializedResponse {
pub update_count_incremental: u32,
}
-impl RapidSyncProcessor {
- pub fn new() -> Self {
+impl<L: Deref + Clone + Send + Sync + 'static> RapidSyncProcessor<L> where L::Target: Logger {
+ pub fn new(logger: L) -> Self {
let network = config::network();
- let logger = TestLogger::new();
let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
- println!("Initializing from cached network graph…");
+ log_info!(logger, "Initializing from cached network graph…");
let mut buffered_reader = BufReader::new(file);
- let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger);
+ let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger.clone());
if let Ok(network_graph) = network_graph_result {
- println!("Initialized from cached network graph!");
+ log_info!(logger, "Initialized from cached network graph!");
network_graph
} else {
- println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
- NetworkGraph::new(network, logger)
+ log_info!(logger, "Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
+ NetworkGraph::new(network, logger.clone())
}
} else {
- NetworkGraph::new(network, logger)
+ NetworkGraph::new(network, logger.clone())
};
let arc_network_graph = Arc::new(network_graph);
Self {
network_graph: arc_network_graph,
+ logger
}
}
let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1);
if config::DOWNLOAD_NEW_GOSSIP {
- let (mut persister, persistence_sender) = GossipPersister::new(Arc::clone(&self.network_graph));
+ let (mut persister, persistence_sender) = GossipPersister::new(self.network_graph.clone(), self.logger.clone());
- println!("Starting gossip download");
+ log_info!(self.logger, "Starting gossip download");
tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender,
- Arc::clone(&self.network_graph)));
- println!("Starting gossip db persistence listener");
+ Arc::clone(&self.network_graph), self.logger.clone()));
+ log_info!(self.logger, "Starting gossip db persistence listener");
tokio::spawn(async move { persister.persist_gossip().await; });
} else {
sync_completion_sender.send(()).await.unwrap();
if sync_completion.is_none() {
panic!("Sync failed!");
}
- println!("Initial sync complete!");
+ log_info!(self.logger, "Initial sync complete!");
// start the gossip snapshotting service
- Snapshotter::new(Arc::clone(&self.network_graph)).snapshot_gossip().await;
+ Snapshotter::new(Arc::clone(&self.network_graph), self.logger.clone()).snapshot_gossip().await;
}
}
let chain_hash = genesis_block.block_hash();
chain_hash.write(&mut blob).unwrap();
- let blob_timestamp = Snapshotter::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
+ let blob_timestamp = Snapshotter::<Arc<RGSSLogger>>::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
blob_timestamp.write(&mut blob).unwrap();
0u32.write(&mut blob).unwrap(); // node count
blob
}
-async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32) -> SerializedResponse {
+async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, last_sync_timestamp: u32, logger: L) -> SerializedResponse where L::Target: Logger {
let (client, connection) = lookup::connect_to_db().await;
network_graph.remove_stale_channels_and_tracking();
};
let mut delta_set = DeltaSet::new();
- lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await;
- println!("announcement channel count: {}", delta_set.len());
- lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp).await;
- println!("update-fetched channel count: {}", delta_set.len());
- lookup::filter_delta_set(&mut delta_set);
- println!("update-filtered channel count: {}", delta_set.len());
+ lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, logger.clone()).await;
+ log_info!(logger, "announcement channel count: {}", delta_set.len());
+ lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
+ log_info!(logger, "update-fetched channel count: {}", delta_set.len());
+ lookup::filter_delta_set(&mut delta_set, logger.clone());
+ log_info!(logger, "update-filtered channel count: {}", delta_set.len());
let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp);
// process announcements
prefixed_output.append(&mut output);
- println!("duplicated node ids: {}", duplicate_node_ids);
- println!("latest seen timestamp: {:?}", serialization_details.latest_seen);
+ log_info!(logger, "duplicated node ids: {}", duplicate_node_ids);
+ log_info!(logger, "latest seen timestamp: {:?}", serialization_details.latest_seen);
SerializedResponse {
data: prefixed_output,
use std::collections::{BTreeMap, HashSet};
use std::io::Cursor;
-use std::ops::Add;
+use std::ops::{Add, Deref};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio_postgres::tls::NoTlsStream;
use futures::StreamExt;
+use lightning::log_info;
+use lightning::util::logger::Logger;
-use crate::{config, TestLogger};
+use crate::config;
use crate::serialization::MutatedProperties;
/// The delta set needs to be a BTreeMap so the keys are sorted.
/// whether they had been seen before.
/// Also include all announcements for which the first update was announced
/// after `last_sync_timestamp`
-pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
- println!("Obtaining channel ids from network graph");
+pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
+ log_info!(logger, "Obtaining channel ids from network graph");
let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
let channel_ids = {
let read_only_graph = network_graph.read_only();
- println!("Retrieved read-only network graph copy");
+ log_info!(logger, "Retrieved read-only network graph copy");
let channel_iterator = read_only_graph.channels().unordered_iter();
channel_iterator
.filter(|c| c.1.announcement_message.is_some())
.collect::<Vec<_>>()
};
- println!("Obtaining corresponding database entries");
+ log_info!(logger, "Obtaining corresponding database entries");
// get all the channel announcements that are currently in the network graph
let announcement_rows = client.query_raw("SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
let mut pinned_rows = Box::pin(announcement_rows);
{
// THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
- println!("Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
+ log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
// Steps:
// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
// — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
{
// THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
- println!("Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
+ log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
// Steps:
// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
// — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
}
}
-pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32) {
+pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
let start = Instant::now();
let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
", [last_sync_timestamp_object]).await.unwrap();
let mut pinned_rows = Box::pin(reference_rows);
- println!("Fetched reference rows in {:?}", start.elapsed());
+ log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
let mut last_seen_update_ids: Vec<i32> = Vec::new();
let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
reference_row_count += 1;
}
- println!("Processed {} reference rows (delta size: {}) in {:?}",
+ log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
reference_row_count, delta_set.len(), start.elapsed());
// get all the intermediate channel updates
WHERE seen >= $1
", [last_sync_timestamp_object]).await.unwrap();
let mut pinned_updates = Box::pin(intermediate_updates);
- println!("Fetched intermediate rows in {:?}", start.elapsed());
+ log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
let mut previous_scid = u64::MAX;
let mut previously_seen_directions = (false, false);
}
}
}
- println!("Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
+ log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
}
-pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) {
+pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
let original_length = delta_set.len();
let keys: Vec<u64> = delta_set.keys().cloned().collect();
for k in keys {
let new_length = delta_set.len();
if original_length != new_length {
- println!("length modified!");
+ log_info!(logger, "length modified!");
}
}
+use std::sync::Arc;
use rapid_gossip_sync_server::RapidSyncProcessor;
+use rapid_gossip_sync_server::types::RGSSLogger;
#[tokio::main]
async fn main() {
- RapidSyncProcessor::new().start_sync().await;
+ let logger = Arc::new(RGSSLogger::new());
+ RapidSyncProcessor::new(logger).start_sync().await;
}
use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
+use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
+use lightning::log_info;
use lightning::routing::gossip::NetworkGraph;
+use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use tokio::sync::mpsc;
use tokio_postgres::NoTls;
-use crate::{config, TestLogger};
+use crate::config;
use crate::types::GossipMessage;
const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15);
-pub(crate) struct GossipPersister {
+pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
- network_graph: Arc<NetworkGraph<TestLogger>>,
+ network_graph: Arc<NetworkGraph<L>>,
+ logger: L
}
-impl GossipPersister {
- pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
+impl<L: Deref> GossipPersister<L> where L::Target: Logger {
+ pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
let (gossip_persistence_sender, gossip_persistence_receiver) =
mpsc::channel::<GossipMessage>(100);
(GossipPersister {
gossip_persistence_receiver,
- network_graph
+ network_graph,
+ logger
}, gossip_persistence_sender)
}
i += 1; // count the persisted gossip messages
if latest_persistence_log.elapsed().as_secs() >= 60 {
- println!("Persisting gossip message #{}", i);
+ log_info!(self.logger, "Persisting gossip message #{}", i);
latest_persistence_log = Instant::now();
}
}
fn persist_network_graph(&self) {
- println!("Caching network graph…");
+ log_info!(self.logger, "Caching network graph…");
let cache_path = config::network_graph_cache_path();
let file = OpenOptions::new()
.create(true)
let mut writer = BufWriter::new(file);
self.network_graph.write(&mut writer).unwrap();
writer.flush().unwrap();
- println!("Cached network graph!");
+ log_info!(self.logger, "Cached network graph!");
}
}
use std::collections::HashMap;
use std::fs;
+use std::ops::Deref;
use std::os::unix::fs::symlink;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use lightning::log_info;
use lightning::routing::gossip::NetworkGraph;
+use lightning::util::logger::Logger;
-use crate::{config, TestLogger};
+use crate::config;
use crate::config::cache_path;
-pub(crate) struct Snapshotter {
- network_graph: Arc<NetworkGraph<TestLogger>>,
+pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
+ network_graph: Arc<NetworkGraph<L>>,
+ logger: L
}
-impl Snapshotter {
- pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> Self {
- Self { network_graph }
+impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
+ pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
+ Self { network_graph, logger }
}
pub(crate) async fn snapshot_gossip(&self) {
- println!("Initiating snapshotting service");
+ log_info!(self.logger, "Initiating snapshotting service");
let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
let round_day_seconds = config::SNAPSHOT_CALCULATION_INTERVAL as u64;
// 1. get the current timestamp
let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, round_day_seconds);
- println!("Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
+ log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
// 2. sleep until the next round 24 hours
// 3. refresh all snapshots
for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
let network_graph_clone = self.network_graph.clone();
{
- println!("Calculating {}-day snapshot", day_range);
+ log_info!(self.logger, "Calculating {}-day snapshot", day_range);
// calculate the snapshot
- let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32).await;
+ let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
// persist the snapshot and update the symlink
let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);
let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
- println!("Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
+ log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
fs::write(&snapshot_path, snapshot.data).unwrap();
snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
}
let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
- println!("Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
+ log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
}
};
let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
- println!("Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
+ log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
symlink(&relative_snapshot_path, &symlink_path).unwrap();
}
let remainder = current_time % round_day_seconds;
let time_until_next_day = round_day_seconds - remainder;
- println!("Sleeping until next snapshot capture: {}s", time_until_next_day);
+ log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day);
// add in an extra five seconds to assure the rounding down works correctly
let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
sleep.await;
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
use std::net::SocketAddr;
+use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
use lightning::ln::peer_handler::{
ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
};
+use lightning::{log_error, log_info, log_warn};
use lightning::routing::gossip::NetworkGraph;
use lightning::sign::KeysManager;
+use lightning::util::logger::Logger;
use tokio::sync::mpsc;
-use crate::{config, TestLogger};
+use crate::config;
use crate::downloader::GossipRouter;
use crate::types::{GossipMessage, GossipPeerManager};
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>,
+pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
completion_sender: mpsc::Sender<()>,
- network_graph: Arc<NetworkGraph<TestLogger>>) {
+ network_graph: Arc<NetworkGraph<L>>,
+ logger: L
+) where L::Target: Logger {
let mut key = [42; 32];
let mut random_data = [43; 32];
// Get something psuedo-random from std.
let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
- let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
+ let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
let message_handler = MessageHandler {
chan_handler: ErroringMessageHandler::new(),
message_handler,
0xdeadbeef,
&random_data,
- TestLogger::new(),
+ logger.clone(),
keys_manager,
));
router.set_pm(Arc::clone(&peer_handler));
}
});
- println!("Connecting to Lightning peers...");
+ log_info!(logger, "Connecting to Lightning peers...");
let peers = config::ln_peers();
let mut connected_peer_count = 0;
for current_peer in peers {
- let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&peer_handler)).await;
+ let initial_connection_succeeded = connect_peer(current_peer, peer_handler.clone(), logger.clone()).await;
if initial_connection_succeeded {
connected_peer_count += 1;
}
panic!("Failed to connect to any peer.");
}
- println!("Connected to {} Lightning peers!", connected_peer_count);
+ log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count);
tokio::spawn(async move {
let mut previous_announcement_count = 0u64;
// if we either aren't caught up, or just stopped/started being caught up
if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
- println!(
+ log_info!(
+ logger,
"gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
i,
total_message_count,
counter.channel_updates_without_htlc_max_msats
);
} else {
- println!("Monitoring for gossip…")
+ log_info!(logger, "Monitoring for gossip…")
}
if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
- println!("caught up with gossip!");
+ log_info!(logger, "caught up with gossip!");
needs_to_notify_persister = true;
} else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
- println!("Received new messages since catching up with gossip!");
+ log_info!(logger, "Received new messages since catching up with gossip!");
}
let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
if continuous_caught_up_duration.as_secs() > 600 {
- eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
+ log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
}
previous_announcement_count = counter.channel_announcements;
});
}
-async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
- eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
+async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>, logger: L) -> bool where L::Target: Logger {
+ log_info!(logger, "Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
let connection = lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
current_peer.0,
current_peer.1,
).await;
if let Some(disconnection_future) = connection {
- eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
+ log_info!(logger, "Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
tokio::spawn(async move {
disconnection_future.await;
loop {
- eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
+ log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
current_peer.0,
});
true
} else {
- eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
+ log_error!(logger, "Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
false
}
}
use std::sync::Arc;
-use std::ops::Deref;
use lightning::sign::KeysManager;
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager};
use lightning::util::logger::{Logger, Record};
+use crate::config;
use crate::downloader::GossipRouter;
use crate::verifier::ChainVerifier;
-pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
-pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc<KeysManager>>>;
+pub(crate) type GossipChainAccess<L> = Arc<ChainVerifier<L>>;
+pub(crate) type GossipPeerManager<L> = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter<L>>, IgnoringMessageHandler, L, IgnoringMessageHandler, Arc<KeysManager>>>;
#[derive(Debug)]
pub(crate) enum GossipMessage {
}
#[derive(Clone, Copy)]
-pub(crate) struct TestLogger {}
-impl Deref for TestLogger {
- type Target = Self;
- fn deref(&self) -> &Self { self }
-}
+pub struct RGSSLogger {}
-impl TestLogger {
- pub(crate) fn new() -> TestLogger {
+impl RGSSLogger {
+ pub fn new() -> RGSSLogger {
Self {}
}
}
-impl Logger for TestLogger {
+impl Logger for RGSSLogger {
fn log(&self, record: &Record) {
- // TODO: allow log level threshold to be set
+ let threshold = config::log_level();
+ if record.level < threshold {
+ return;
+ }
println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
}
}
use std::convert::TryInto;
+use std::ops::Deref;
use std::sync::Arc;
use std::sync::Mutex;
use bitcoin::{BlockHash, TxOut};
use bitcoin::blockdata::block::Block;
use bitcoin::hashes::Hash;
+use lightning::log_error;
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError};
+use lightning::util::logger::Logger;
use lightning_block_sync::{BlockData, BlockSource};
use lightning_block_sync::http::BinaryResponse;
use lightning_block_sync::rest::RestClient;
use crate::config;
-use crate::TestLogger;
use crate::types::GossipPeerManager;
-pub(crate) struct ChainVerifier {
+pub(crate) struct ChainVerifier<L: Deref + Clone + Send + Sync + 'static> where L::Target: Logger {
rest_client: Arc<RestClient>,
- graph: Arc<NetworkGraph<TestLogger>>,
- outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>,
- peer_handler: Mutex<Option<GossipPeerManager>>,
+ graph: Arc<NetworkGraph<L>>,
+ outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
+ peer_handler: Mutex<Option<GossipPeerManager<L>>>,
+ logger: L
}
struct RestBinaryResponse(Vec<u8>);
-impl ChainVerifier {
- pub(crate) fn new(graph: Arc<NetworkGraph<TestLogger>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>) -> Self {
+impl<L: Deref + Clone + Send + Sync + 'static> ChainVerifier<L> where L::Target: Logger {
+ pub(crate) fn new(graph: Arc<NetworkGraph<L>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>, logger: L) -> Self {
ChainVerifier {
rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()),
outbound_gossiper,
graph,
peer_handler: Mutex::new(None),
+ logger
}
}
- pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) {
+ pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager<L>) {
*self.peer_handler.lock().unwrap() = Some(peer_handler);
}
- async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
+ async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
let output_index = (short_channel_id & 0xffff) as u16;
- let mut block = Self::retrieve_block(client, block_height).await?;
- if transaction_index as usize >= block.txdata.len() { return Err(UtxoLookupError::UnknownTx); }
+ let mut block = Self::retrieve_block(client, block_height, logger.clone()).await?;
+ if transaction_index as usize >= block.txdata.len() {
+ log_error!(logger, "Could't find transaction {} in block {}", transaction_index, block_height);
+ return Err(UtxoLookupError::UnknownTx);
+ }
let mut transaction = block.txdata.swap_remove(transaction_index as usize);
- if output_index as usize >= transaction.output.len() { return Err(UtxoLookupError::UnknownTx); }
+ if output_index as usize >= transaction.output.len() {
+ log_error!(logger, "Could't find output {} in transaction {}", output_index, transaction.txid());
+ return Err(UtxoLookupError::UnknownTx);
+ }
Ok(transaction.output.swap_remove(output_index as usize))
}
- async fn retrieve_block(client: Arc<RestClient>, block_height: u32) -> Result<Block, UtxoLookupError> {
+ async fn retrieve_block(client: Arc<RestClient>, block_height: u32, logger: L) -> Result<Block, UtxoLookupError> {
let uri = format!("blockhashbyheight/{}.bin", block_height);
let block_hash_result =
client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
- eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
+ log_error!(logger, "Could't find block hash at height {}: {}", block_height, error.to_string());
UtxoLookupError::UnknownChain
})?.0;
let block_hash = BlockHash::from_slice(&block_hash).unwrap();
},
Ok(_) => unreachable!(),
Err(error) => {
- eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
+ log_error!(logger, "Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
Err(UtxoLookupError::UnknownChain)
}
}
}
}
-impl UtxoLookup for ChainVerifier {
+impl<L: Deref + Clone + Send + Sync + 'static> UtxoLookup for ChainVerifier<L> where L::Target: Logger {
fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult {
let res = UtxoFuture::new();
let fut = res.clone();
let client_ref = Arc::clone(&self.rest_client);
let gossip_ref = Arc::clone(&self.outbound_gossiper);
let pm_ref = self.peer_handler.lock().unwrap().clone();
+ let logger_ref = self.logger.clone();
tokio::spawn(async move {
- let res = Self::retrieve_utxo(client_ref, short_channel_id).await;
+ let res = Self::retrieve_utxo(client_ref, short_channel_id, logger_ref).await;
fut.resolve(&*graph_ref, &*gossip_ref, res);
if let Some(pm) = pm_ref { pm.process_events(); }
});