use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
-use crate::{GossipChainAccess, TestLogger};
-use crate::types::GossipMessage;
+use crate::TestLogger;
+use crate::types::{GossipMessage, GossipChainAccess};
+use crate::verifier::ChainVerifier;
pub(crate) struct GossipCounter {
pub(crate) channel_announcements: u64,
}
pub(crate) struct GossipRouter {
- pub(crate) native_router: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<TestLogger>>>, GossipChainAccess, Arc<TestLogger>>>,
+ native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
pub(crate) counter: RwLock<GossipCounter>,
- pub(crate) sender: mpsc::Sender<GossipMessage>,
+ sender: mpsc::Sender<GossipMessage>,
+}
+
+impl GossipRouter {
+ pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
+ Self {
+ native_router: P2PGossipSync::new(network_graph, Some(Arc::new(ChainVerifier::new())), TestLogger::new()),
+ counter: RwLock::new(GossipCounter::new()),
+ sender
+ }
+ }
}
impl MessageSendEventsProvider for GossipRouter {
let mut counter = self.counter.write().unwrap();
let output_value = self.native_router.handle_channel_announcement(msg).map_err(|error| {
- let error_string = format!("{:?}", error);
- if error_string.contains("announced on an unknown chain"){
- return error;
+ if error.err.contains("didn't match on-chain script") {
+ counter.channel_announcements_with_mismatched_scripts += 1;
}
- counter.channel_announcements_with_mismatched_scripts += 1;
error
})?;
use crate::persistence::GossipPersister;
use crate::serialization::UpdateSerializationMechanism;
use crate::snapshot::Snapshotter;
-use crate::types::{GossipChainAccess, TestLogger};
+use crate::types::TestLogger;
mod downloader;
mod types;
mod verifier;
pub struct RapidSyncProcessor {
- network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
+ network_graph: Arc<NetworkGraph<TestLogger>>,
pub initial_sync_complete: Arc<AtomicBool>,
}
pub fn new() -> Self {
let logger = TestLogger::new();
let mut initial_sync_complete = false;
- let arc_logger = Arc::new(logger);
let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
println!("Initializing from cached network graph…");
let mut buffered_reader = BufReader::new(file);
- let network_graph_result = NetworkGraph::read(&mut buffered_reader, Arc::clone(&arc_logger));
+ let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger);
if let Ok(network_graph) = network_graph_result {
initial_sync_complete = true;
network_graph.remove_stale_channels();
network_graph
} else {
println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
- NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), arc_logger)
+ NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), logger)
}
} else {
- NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), arc_logger)
+ NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), logger)
};
let arc_network_graph = Arc::new(network_graph);
- let (_sync_termination_sender, _sync_termination_receiver) = mpsc::channel::<()>(1);
Self {
network_graph: arc_network_graph,
initial_sync_complete: Arc::new(AtomicBool::new(initial_sync_complete)),
let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1);
let initial_sync_complete = self.initial_sync_complete.clone();
- let network_graph = self.network_graph.clone();
- let snapshotter = Snapshotter::new(network_graph.clone());
-
if config::DOWNLOAD_NEW_GOSSIP {
+ let (mut persister, persistence_sender) =
+ GossipPersister::new(sync_completion_sender, Arc::clone(&self.network_graph));
- let mut persister = GossipPersister::new(sync_completion_sender, self.network_graph.clone());
-
- let persistence_sender = persister.gossip_persistence_sender.clone();
println!("Starting gossip download");
- let download_future = tracking::download_gossip(persistence_sender, network_graph.clone());
- tokio::spawn(async move {
- // initiate the whole download stuff in the background
- download_future.await;
- });
+ tokio::spawn(tracking::download_gossip(persistence_sender, Arc::clone(&self.network_graph)));
println!("Starting gossip db persistence listener");
- tokio::spawn(async move {
- // initiate persistence of the gossip data
- let persistence_future = persister.persist_gossip();
- persistence_future.await;
- });
-
+ tokio::spawn(async move { persister.persist_gossip().await; });
} else {
sync_completion_sender.send(()).await.unwrap();
}
println!("Initial sync complete!");
// start the gossip snapshotting service
- snapshotter.snapshot_gossip().await;
+ Snapshotter::new(Arc::clone(&self.network_graph)).snapshot_gossip().await;
}
}
}
}
-async fn serialize_delta(network_graph: Arc<NetworkGraph<Arc<TestLogger>>>, last_sync_timestamp: u32, consider_intermediate_updates: bool) -> SerializedResponse {
+async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32, consider_intermediate_updates: bool) -> SerializedResponse {
let (client, connection) = lookup::connect_to_db().await;
tokio::spawn(async move {
/// whether they had been seen before.
/// Also include all announcements for which the first update was announced
/// after `last_syc_timestamp`
-pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>, client: &Client, last_sync_timestamp: u32) {
+pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
println!("Obtaining channel ids from network graph");
let channel_ids = {
let channel_iterator = read_only_graph.channels().into_iter();
channel_iterator
.filter(|c| c.1.announcement_message.is_some())
- .map(|c| hex_utils::hex_str(&c.1.announcement_message.clone().unwrap().contents.short_channel_id.to_be_bytes()))
+ .map(|c| hex_utils::hex_str(&c.1.announcement_message.as_ref().unwrap().contents.short_channel_id.to_be_bytes()))
.collect::<Vec<String>>()
};
panic!("Channel direction must be binary!")
};
update_delta.last_update_before_seen = Some(unsigned_channel_update);
-
-
}
println!("Processed reference rows (delta size: {}): {:?}", delta_set.len(), start.elapsed());
let scid = unsigned_channel_update.short_channel_id;
if scid != previous_scid {
- previous_scid = scid.clone();
+ previous_scid = scid;
previously_seen_directions = (false, false);
}
// get the write configuration for this particular channel's directional details
- let current_channel_delta = delta_set.entry(scid.clone()).or_insert(ChannelDelta::default());
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
let update_delta = if direction == 0 {
(*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
} else if direction == 1 {
#[tokio::main]
async fn main() {
- let processor = RapidSyncProcessor::new();
- processor.start_sync().await;
+ RapidSyncProcessor::new().start_sync().await;
}
use std::fs::OpenOptions;
-use std::io::BufWriter;
+use std::io::{BufWriter, Write};
use std::sync::Arc;
use std::time::Instant;
use lightning::routing::gossip::NetworkGraph;
use crate::types::GossipMessage;
pub(crate) struct GossipPersister {
- pub(crate) gossip_persistence_sender: mpsc::Sender<GossipMessage>,
gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
server_sync_completion_sender: mpsc::Sender<()>,
- network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
+ network_graph: Arc<NetworkGraph<TestLogger>>,
}
impl GossipPersister {
- pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self {
+ pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
let (gossip_persistence_sender, gossip_persistence_receiver) =
mpsc::channel::<GossipMessage>(100);
- GossipPersister {
- gossip_persistence_sender,
+ (GossipPersister {
gossip_persistence_receiver,
server_sync_completion_sender,
network_graph
- }
+ }, gossip_persistence_sender)
}
pub(crate) async fn persist_gossip(&mut self) {
let mut persistence_log_threshold = 10000;
let mut i = 0u32;
let mut server_sync_completion_sent = false;
- let mut latest_graph_cache_time: Option<Instant> = None;
+ let mut latest_graph_cache_time = Instant::now();
// TODO: it would be nice to have some sort of timeout here so after 10 seconds of
// inactivity, some sort of message could be broadcast signaling the activation of request
// processing
println!("Persisting gossip message #{}", i);
}
- if let Some(last_cache_time) = latest_graph_cache_time {
- // has it been ten minutes? Just cache it
- if last_cache_time.elapsed().as_secs() >= 600 {
- self.persist_network_graph();
- latest_graph_cache_time = Some(Instant::now());
- }
- } else {
- // initialize graph cache timer
- latest_graph_cache_time = Some(Instant::now());
+ // has it been ten minutes? Just cache it
+ if latest_graph_cache_time.elapsed().as_secs() >= 600 {
+ self.persist_network_graph();
+ latest_graph_cache_time = Instant::now();
}
match &gossip_message {
self.server_sync_completion_sender.send(()).await.unwrap();
println!("Server has been notified of persistence completion.");
}
-
- // now, cache the persisted network graph
- // also persist the network graph here
- let mut too_soon = false;
- if let Some(latest_graph_cache_time) = latest_graph_cache_time {
- let time_since_last_cached = latest_graph_cache_time.elapsed().as_secs();
- // don't cache more frequently than every 2 minutes
- too_soon = time_since_last_cached < 120;
- }
- if too_soon {
- println!("Network graph has been cached too recently.");
- }else {
- latest_graph_cache_time = Some(Instant::now());
- self.persist_network_graph();
- }
}
GossipMessage::ChannelAnnouncement(announcement) => {
self.network_graph.remove_stale_channels();
let mut writer = BufWriter::new(file);
self.network_graph.write(&mut writer).unwrap();
+ writer.flush().unwrap();
println!("Cached network graph!");
}
}
use crate::{config, TestLogger};
pub(crate) struct Snapshotter {
- network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
+ network_graph: Arc<NetworkGraph<TestLogger>>,
}
impl Snapshotter {
- pub fn new(network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self {
+ pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> Self {
Self { network_graph }
}
fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
-
let remainder = timestamp_seen % round_day_seconds;
let time_until_next_day = round_day_seconds - remainder;
use std::net::SocketAddr;
-use std::sync::{Arc, RwLock};
+use std::sync::Arc;
use std::time::{Duration, Instant};
use bitcoin::hashes::hex::ToHex;
use lightning::ln::peer_handler::{
ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
};
-use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::gossip::NetworkGraph;
use rand::{Rng, thread_rng};
use tokio::sync::mpsc;
use crate::{config, TestLogger};
-use crate::downloader::{GossipCounter, GossipRouter};
-use crate::types::{GossipChainAccess, GossipMessage, GossipPeerManager};
-use crate::verifier::ChainVerifier;
+use crate::downloader::GossipRouter;
+use crate::types::{GossipMessage, GossipPeerManager};
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
+pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>, network_graph: Arc<NetworkGraph<TestLogger>>) {
let mut key = [0; 32];
let mut random_data = [0; 32];
thread_rng().fill_bytes(&mut key);
thread_rng().fill_bytes(&mut random_data);
let our_node_secret = SecretKey::from_slice(&key).unwrap();
- let _arc_chain_access = None::<GossipChainAccess>;
- let arc_chain_access = Some(Arc::new(ChainVerifier::new()));
- let ignorer = IgnoringMessageHandler {};
- let arc_ignorer = Arc::new(ignorer);
-
- let errorer = ErroringMessageHandler::new();
- let arc_errorer = Arc::new(errorer);
-
- let logger = TestLogger::new();
- let arc_logger = Arc::new(logger);
-
- let router = P2PGossipSync::new(
- network_graph.clone(),
- arc_chain_access,
- Arc::clone(&arc_logger),
- );
- let arc_router = Arc::new(router);
- let wrapped_router = GossipRouter {
- native_router: arc_router,
- counter: RwLock::new(GossipCounter::new()),
- sender: persistence_sender.clone(),
- };
- let arc_wrapped_router = Arc::new(wrapped_router);
+ let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
let message_handler = MessageHandler {
- chan_handler: arc_errorer,
- route_handler: arc_wrapped_router.clone(),
+ chan_handler: ErroringMessageHandler::new(),
+ route_handler: Arc::clone(&router),
};
- let peer_handler = PeerManager::new(
+ let peer_handler = Arc::new(PeerManager::new(
message_handler,
our_node_secret,
&random_data,
- Arc::clone(&arc_logger),
- arc_ignorer,
- );
- let arc_peer_handler = Arc::new(peer_handler);
+ TestLogger::new(),
+ IgnoringMessageHandler {},
+ ));
- println!("Connecting to Lightning peers…");
+ println!("Connecting to Lightning peers...");
let peers = config::ln_peers();
let mut connected_peer_count = 0;
for current_peer in peers {
- let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&arc_peer_handler)).await;
+ let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&peer_handler)).await;
if initial_connection_succeeded {
connected_peer_count += 1;
}
println!("Connected to {} Lightning peers!", connected_peer_count);
- let local_router = arc_wrapped_router.clone();
- let local_persistence_sender = persistence_sender.clone();
tokio::spawn(async move {
let mut previous_announcement_count = 0u64;
let mut previous_update_count = 0u64;
let sleep = tokio::time::sleep(Duration::from_secs(5));
sleep.await;
- let router_clone = Arc::clone(&local_router);
-
{
- let counter = router_clone.counter.read().unwrap();
+ let counter = router.counter.read().unwrap();
let total_message_count = counter.channel_announcements + counter.channel_updates;
let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
if needs_to_notify_persister {
needs_to_notify_persister = false;
- local_persistence_sender.send(GossipMessage::InitialSyncComplete).await.unwrap();
+ persistence_sender.send(GossipMessage::InitialSyncComplete).await.unwrap();
}
}
});
use std::sync::Arc;
+use std::ops::Deref;
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager};
use crate::verifier::ChainVerifier;
pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
-pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, Arc<ErroringMessageHandler>, Arc<GossipRouter>, Arc<TestLogger>, Arc<IgnoringMessageHandler>>>;
+pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, TestLogger, IgnoringMessageHandler>>;
#[derive(Debug)]
pub(crate) enum GossipMessage {
InitialSyncComplete,
}
+#[derive(Clone, Copy)]
pub(crate) struct TestLogger {}
+impl Deref for TestLogger {
+ type Target = Self;
+ fn deref(&self) -> &Self { self }
+}
impl TestLogger {
pub(crate) fn new() -> TestLogger {
use std::convert::TryInto;
-use std::sync::Arc;
use bitcoin::{BlockHash, TxOut};
use bitcoin::blockdata::block::Block;
use crate::config;
pub(crate) struct ChainVerifier {
- rest_client: Arc<RestClient>,
+ rest_client: RestClient,
}
struct RestBinaryResponse(Vec<u8>);
impl ChainVerifier {
pub(crate) fn new() -> Self {
- let rest_client = RestClient::new(config::bitcoin_rest_endpoint()).unwrap();
ChainVerifier {
- rest_client: Arc::new(rest_client),
+ rest_client: RestClient::new(config::bitcoin_rest_endpoint()).unwrap(),
}
}
fn retrieve_block(&self, block_height: u32) -> Result<Block, AccessError> {
- let rest_client = self.rest_client.clone();
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
- let block_hash_result = rest_client.request_resource::<BinaryResponse, RestBinaryResponse>(&format!("blockhashbyheight/{}.bin", block_height)).await;
+ let uri = format!("blockhashbyheight/{}.bin", block_height);
+ let block_hash_result =
+ self.rest_client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
AccessError::UnknownChain
})?.0;
let block_hash = BlockHash::from_slice(&block_hash).unwrap();
- let block_result = rest_client.get_block(&block_hash).await;
+ let block_result = self.rest_client.get_block(&block_hash).await;
let block = block_result.map_err(|error| {
eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
AccessError::UnknownChain