use lightning::ln::features::{InitFeatures, NodeFeatures};
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
+use lightning::util::logger::Logger;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
-use crate::TestLogger;
use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
use crate::verifier::ChainVerifier;
}
}
-pub(crate) struct GossipRouter {
- native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
+pub(crate) struct GossipRouter<L: Logger + Send + Sync + 'static> {
+ native_router: P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, GossipChainAccess<L>, Arc<L>>,
pub(crate) counter: RwLock<GossipCounter>,
sender: mpsc::Sender<GossipMessage>,
- verifier: Arc<ChainVerifier>,
- outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
+ verifier: Arc<ChainVerifier<L>>,
+ outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, GossipChainAccess<L>, Arc<L>>>,
}
-impl GossipRouter {
- pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
- let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
+impl<L: Logger + Send + Sync> GossipRouter<L> {
+ pub(crate) fn new(network_graph: Arc<NetworkGraph<Arc<L>>>, sender: mpsc::Sender<GossipMessage>, logger: Arc<L>) -> Self {
+ let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone()));
let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
Self {
- native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
+ native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()),
outbound_gossiper,
counter: RwLock::new(GossipCounter::new()),
sender,
- verifier,
+ verifier
}
}
- pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
+ pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager<L>) {
self.verifier.set_ph(peer_handler);
}
}
}
-impl MessageSendEventsProvider for GossipRouter {
+impl<L: Logger + Send + Sync> MessageSendEventsProvider for GossipRouter<L> {
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
for ev in gossip_evs {
}
}
-impl RoutingMessageHandler for GossipRouter {
+impl<L: Logger + Send + Sync> RoutingMessageHandler for GossipRouter<L> {
fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
self.native_router.handle_node_announcement(msg)
}
use std::sync::Arc;
use lightning::routing::gossip::{NetworkGraph, NodeId};
+use lightning::util::logger::Logger;
use lightning::util::ser::{ReadableArgs, Writeable};
use tokio::sync::mpsc;
use crate::lookup::DeltaSet;
use crate::persistence::GossipPersister;
use crate::serialization::UpdateSerialization;
use crate::snapshot::Snapshotter;
-use crate::types::TestLogger;
+use crate::types::RGSSLogger;
mod downloader;
-mod types;
mod tracking;
mod lookup;
mod persistence;
mod hex_utils;
mod verifier;
+pub mod types;
+
/// The purpose of this prefix is to identify the serialization format, should other rapid gossip
/// sync formats arise in the future.
///
/// The fourth byte is the protocol version in case our format gets updated.
const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];
-pub struct RapidSyncProcessor {
- network_graph: Arc<NetworkGraph<TestLogger>>,
+pub struct RapidSyncProcessor<L: Logger> {
+ network_graph: Arc<NetworkGraph<Arc<L>>>,
+ logger: Arc<L>
}
pub struct SerializedResponse {
pub update_count_incremental: u32,
}
-impl RapidSyncProcessor {
- pub fn new() -> Self {
+impl<L: Logger + Send + Sync + 'static> RapidSyncProcessor<L> {
+ pub fn new(logger: Arc<L>) -> Self {
let network = config::network();
- let logger = TestLogger::new();
let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
println!("Initializing from cached network graph…");
let mut buffered_reader = BufReader::new(file);
- let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger);
+ let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger.clone());
if let Ok(network_graph) = network_graph_result {
println!("Initialized from cached network graph!");
network_graph
} else {
println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
- NetworkGraph::new(network, logger)
+ NetworkGraph::new(network, logger.clone())
}
} else {
- NetworkGraph::new(network, logger)
+ NetworkGraph::new(network, logger.clone())
};
let arc_network_graph = Arc::new(network_graph);
Self {
network_graph: arc_network_graph,
+ logger
}
}
println!("Starting gossip download");
tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender,
- Arc::clone(&self.network_graph)));
+ Arc::clone(&self.network_graph), Arc::clone(&self.logger)));
println!("Starting gossip db persistence listener");
tokio::spawn(async move { persister.persist_gossip().await; });
} else {
let chain_hash = genesis_block.block_hash();
chain_hash.write(&mut blob).unwrap();
- let blob_timestamp = Snapshotter::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
+ let blob_timestamp = Snapshotter::<RGSSLogger>::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
blob_timestamp.write(&mut blob).unwrap();
0u32.write(&mut blob).unwrap(); // node count
blob
}
-async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32) -> SerializedResponse {
+async fn serialize_delta<L: Logger>(network_graph: Arc<NetworkGraph<Arc<L>>>, last_sync_timestamp: u32) -> SerializedResponse {
let (client, connection) = lookup::connect_to_db().await;
network_graph.remove_stale_channels_and_tracking();
use tokio_postgres::tls::NoTlsStream;
use futures::StreamExt;
+use lightning::util::logger::Logger;
-use crate::{config, TestLogger};
+use crate::config;
use crate::serialization::MutatedProperties;
/// The delta set needs to be a BTreeMap so the keys are sorted.
/// whether they had been seen before.
/// Also include all announcements for which the first update was announced
/// after `last_sync_timestamp`
-pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
+pub(super) async fn fetch_channel_announcements<L: Logger>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<Arc<L>>>, client: &Client, last_sync_timestamp: u32) {
println!("Obtaining channel ids from network graph");
let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
let channel_ids = {
+use std::sync::Arc;
use rapid_gossip_sync_server::RapidSyncProcessor;
+use rapid_gossip_sync_server::types::RGSSLogger;
#[tokio::main]
async fn main() {
- RapidSyncProcessor::new().start_sync().await;
+ let logger = Arc::new(RGSSLogger::new());
+ RapidSyncProcessor::new(logger).start_sync().await;
}
use std::sync::Arc;
use std::time::{Duration, Instant};
use lightning::routing::gossip::NetworkGraph;
+use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use tokio::sync::mpsc;
use tokio_postgres::NoTls;
-use crate::{config, TestLogger};
+use crate::config;
use crate::types::GossipMessage;
const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15);
-pub(crate) struct GossipPersister {
+pub(crate) struct GossipPersister<L: Logger> {
gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
- network_graph: Arc<NetworkGraph<TestLogger>>,
+ network_graph: Arc<NetworkGraph<Arc<L>>>,
}
-impl GossipPersister {
- pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
+impl<L: Logger> GossipPersister<L> {
+ pub fn new(network_graph: Arc<NetworkGraph<Arc<L>>>) -> (Self, mpsc::Sender<GossipMessage>) {
let (gossip_persistence_sender, gossip_persistence_receiver) =
mpsc::channel::<GossipMessage>(100);
(GossipPersister {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use lightning::routing::gossip::NetworkGraph;
+use lightning::util::logger::Logger;
-use crate::{config, TestLogger};
+use crate::config;
use crate::config::cache_path;
-pub(crate) struct Snapshotter {
- network_graph: Arc<NetworkGraph<TestLogger>>,
+pub(crate) struct Snapshotter<L: Logger> {
+ network_graph: Arc<NetworkGraph<Arc<L>>>,
}
-impl Snapshotter {
- pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> Self {
+impl<L: Logger> Snapshotter<L> {
+ pub fn new(network_graph: Arc<NetworkGraph<Arc<L>>>) -> Self {
Self { network_graph }
}
};
use lightning::routing::gossip::NetworkGraph;
use lightning::sign::KeysManager;
+use lightning::util::logger::Logger;
use tokio::sync::mpsc;
-use crate::{config, TestLogger};
+use crate::config;
use crate::downloader::GossipRouter;
use crate::types::{GossipMessage, GossipPeerManager};
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>,
+pub(crate) async fn download_gossip<L: Logger + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
completion_sender: mpsc::Sender<()>,
- network_graph: Arc<NetworkGraph<TestLogger>>) {
+ network_graph: Arc<NetworkGraph<Arc<L>>>,
+ logger: Arc<L>
+) {
let mut key = [42; 32];
let mut random_data = [43; 32];
// Get something psuedo-random from std.
let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
- let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
+ let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone(), logger.clone()));
let message_handler = MessageHandler {
chan_handler: ErroringMessageHandler::new(),
message_handler,
0xdeadbeef,
&random_data,
- TestLogger::new(),
+ logger,
keys_manager,
));
router.set_pm(Arc::clone(&peer_handler));
});
}
-async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
+async fn connect_peer<L: Logger + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>) -> bool {
eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
let connection = lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
use std::sync::Arc;
-use std::ops::Deref;
use lightning::sign::KeysManager;
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
use crate::downloader::GossipRouter;
use crate::verifier::ChainVerifier;
-pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
-pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc<KeysManager>>>;
+pub(crate) type GossipChainAccess<L> = Arc<ChainVerifier<L>>;
+pub(crate) type GossipPeerManager<L> = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter<L>>, IgnoringMessageHandler, Arc<L>, IgnoringMessageHandler, Arc<KeysManager>>>;
#[derive(Debug)]
pub(crate) enum GossipMessage {
}
#[derive(Clone, Copy)]
-pub(crate) struct TestLogger {}
-impl Deref for TestLogger {
- type Target = Self;
- fn deref(&self) -> &Self { self }
-}
+pub struct RGSSLogger {}
-impl TestLogger {
- pub(crate) fn new() -> TestLogger {
+impl RGSSLogger {
+ pub fn new() -> RGSSLogger {
Self {}
}
}
-impl Logger for TestLogger {
+impl Logger for RGSSLogger {
fn log(&self, record: &Record) {
// TODO: allow log level threshold to be set
println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
use bitcoin::hashes::Hash;
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError};
+use lightning::util::logger::Logger;
use lightning_block_sync::{BlockData, BlockSource};
use lightning_block_sync::http::BinaryResponse;
use lightning_block_sync::rest::RestClient;
use crate::config;
-use crate::TestLogger;
use crate::types::GossipPeerManager;
-pub(crate) struct ChainVerifier {
+pub(crate) struct ChainVerifier<L: Logger + Send + Sync + 'static> {
rest_client: Arc<RestClient>,
- graph: Arc<NetworkGraph<TestLogger>>,
- outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>,
- peer_handler: Mutex<Option<GossipPeerManager>>,
+ graph: Arc<NetworkGraph<Arc<L>>>,
+ outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<Self>, Arc<L>>>,
+ peer_handler: Mutex<Option<GossipPeerManager<L>>>,
}
struct RestBinaryResponse(Vec<u8>);
-impl ChainVerifier {
- pub(crate) fn new(graph: Arc<NetworkGraph<TestLogger>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>) -> Self {
+impl<L: Logger + Send + Sync + 'static> ChainVerifier<L> {
+ pub(crate) fn new(graph: Arc<NetworkGraph<Arc<L>>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<Self>, Arc<L>>>) -> Self {
ChainVerifier {
rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()),
outbound_gossiper,
peer_handler: Mutex::new(None),
}
}
- pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) {
+ pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager<L>) {
*self.peer_handler.lock().unwrap() = Some(peer_handler);
}
}
}
-impl UtxoLookup for ChainVerifier {
+impl<L: Logger + Send + Sync + 'static> UtxoLookup for ChainVerifier<L> {
fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult {
let res = UtxoFuture::new();
let fut = res.clone();