From 944a2409f823c4bd196493259df43904d26c7478 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 22 Aug 2022 03:37:49 +0000 Subject: [PATCH] Remove indirection in initial sync completion Rather than sending a GossipMessage::InitialSyncComplete, which causes the persistence logic to push to another queue to wake the snapshoter, we can simply wake the snapshoter directly. --- src/lib.rs | 6 +++--- src/persistence.rs | 29 ++++++----------------------- src/tracking.rs | 6 ++++-- src/types.rs | 1 - 4 files changed, 13 insertions(+), 29 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 07a40c5..8ab4803 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -87,11 +87,11 @@ impl RapidSyncProcessor { let initial_sync_complete = self.initial_sync_complete.clone(); if config::DOWNLOAD_NEW_GOSSIP { - let (mut persister, persistence_sender) = - GossipPersister::new(sync_completion_sender, Arc::clone(&self.network_graph)); + let (mut persister, persistence_sender) = GossipPersister::new(Arc::clone(&self.network_graph)); println!("Starting gossip download"); - tokio::spawn(tracking::download_gossip(persistence_sender, Arc::clone(&self.network_graph))); + tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender, + Arc::clone(&self.network_graph))); println!("Starting gossip db persistence listener"); tokio::spawn(async move { persister.persist_gossip().await; }); } else { diff --git a/src/persistence.rs b/src/persistence.rs index ed37120..ef0a937 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,7 +1,7 @@ use std::fs::OpenOptions; use std::io::{BufWriter, Write}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use lightning::routing::gossip::NetworkGraph; use lightning::util::ser::Writeable; use tokio::sync::mpsc; @@ -12,17 +12,15 @@ use crate::types::GossipMessage; pub(crate) struct GossipPersister { gossip_persistence_receiver: mpsc::Receiver, - server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc>, } impl GossipPersister { - pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc>) -> (Self, mpsc::Sender) { + pub fn new(network_graph: Arc>) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); (GossipPersister { gossip_persistence_receiver, - server_sync_completion_sender, network_graph }, gossip_persistence_sender) } @@ -83,10 +81,9 @@ impl GossipPersister { } } - // print log statement every 10,000 messages - let mut persistence_log_threshold = 10000; + // print log statement every minute + let mut latest_persistence_log = Instant::now() - Duration::from_secs(60); let mut i = 0u32; - let mut server_sync_completion_sent = false; let mut latest_graph_cache_time = Instant::now(); // TODO: it would be nice to have some sort of timeout here so after 10 seconds of // inactivity, some sort of message could be broadcast signaling the activation of request @@ -94,8 +91,9 @@ impl GossipPersister { while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await { i += 1; // count the persisted gossip messages - if i == 1 || i % persistence_log_threshold == 0 { + if latest_persistence_log.elapsed().as_secs() >= 60 { println!("Persisting gossip message #{}", i); + latest_persistence_log = Instant::now(); } // has it been ten minutes? Just cache it @@ -105,22 +103,7 @@ impl GossipPersister { } match &gossip_message { - GossipMessage::InitialSyncComplete => { - // signal to the server that it may now serve dynamic responses and calculate - // snapshots - // we take this detour through the persister to ensure that all previous - // messages have already been persisted to the database - println!("Persister caught up with gossip!"); - i -= 1; // this wasn't an actual gossip message that needed persisting - persistence_log_threshold = 50; - if !server_sync_completion_sent { - server_sync_completion_sent = true; - self.server_sync_completion_sender.send(()).await.unwrap(); - println!("Server has been notified of persistence completion."); - } - } GossipMessage::ChannelAnnouncement(announcement) => { - let scid = announcement.contents.short_channel_id; let scid_hex = hex_utils::hex_str(&scid.to_be_bytes()); // scid is 8 bytes diff --git a/src/tracking.rs b/src/tracking.rs index 672b75f..f76db39 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -16,7 +16,9 @@ use crate::{config, TestLogger}; use crate::downloader::GossipRouter; use crate::types::{GossipMessage, GossipPeerManager}; -pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, network_graph: Arc>) { +pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, + completion_sender: mpsc::Sender<()>, + network_graph: Arc>) { let mut key = [0; 32]; let mut random_data = [0; 32]; thread_rng().fill_bytes(&mut key); @@ -114,7 +116,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender