Remove indirection in initial sync completion
authorMatt Corallo <git@bluematt.me>
Mon, 22 Aug 2022 03:37:49 +0000 (03:37 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 23 Aug 2022 00:23:13 +0000 (00:23 +0000)
Rather than sending a GossipMessage::InitialSyncComplete, which
causes the persistence logic to push to another queue to wake the
snapshoter, we can simply wake the snapshoter directly.

src/lib.rs
src/persistence.rs
src/tracking.rs
src/types.rs

index 07a40c538756bd77df01b7ea64d5546a14bc9f7d..8ab4803773c3206b8ea8863f1d6319496c42e403 100644 (file)
@@ -87,11 +87,11 @@ impl RapidSyncProcessor {
                let initial_sync_complete = self.initial_sync_complete.clone();
 
                if config::DOWNLOAD_NEW_GOSSIP {
-                       let (mut persister, persistence_sender) =
-                               GossipPersister::new(sync_completion_sender, Arc::clone(&self.network_graph));
+                       let (mut persister, persistence_sender) = GossipPersister::new(Arc::clone(&self.network_graph));
 
                        println!("Starting gossip download");
-                       tokio::spawn(tracking::download_gossip(persistence_sender, Arc::clone(&self.network_graph)));
+                       tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender,
+                               Arc::clone(&self.network_graph)));
                        println!("Starting gossip db persistence listener");
                        tokio::spawn(async move { persister.persist_gossip().await; });
                } else {
index ed371203729e82cb574cc1678abf7f033c63e690..ef0a9372f21b28c8a703a5fec9a7844a5b0d1374 100644 (file)
@@ -1,7 +1,7 @@
 use std::fs::OpenOptions;
 use std::io::{BufWriter, Write};
 use std::sync::Arc;
-use std::time::Instant;
+use std::time::{Duration, Instant};
 use lightning::routing::gossip::NetworkGraph;
 use lightning::util::ser::Writeable;
 use tokio::sync::mpsc;
@@ -12,17 +12,15 @@ use crate::types::GossipMessage;
 
 pub(crate) struct GossipPersister {
        gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
-       server_sync_completion_sender: mpsc::Sender<()>,
        network_graph: Arc<NetworkGraph<TestLogger>>,
 }
 
 impl GossipPersister {
-       pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
+       pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
                let (gossip_persistence_sender, gossip_persistence_receiver) =
                        mpsc::channel::<GossipMessage>(100);
                (GossipPersister {
                        gossip_persistence_receiver,
-                       server_sync_completion_sender,
                        network_graph
                }, gossip_persistence_sender)
        }
@@ -83,10 +81,9 @@ impl GossipPersister {
                        }
                }
 
-               // print log statement every 10,000 messages
-               let mut persistence_log_threshold = 10000;
+               // print log statement every minute
+               let mut latest_persistence_log = Instant::now() - Duration::from_secs(60);
                let mut i = 0u32;
-               let mut server_sync_completion_sent = false;
                let mut latest_graph_cache_time = Instant::now();
                // TODO: it would be nice to have some sort of timeout here so after 10 seconds of
                // inactivity, some sort of message could be broadcast signaling the activation of request
@@ -94,8 +91,9 @@ impl GossipPersister {
                while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
                        i += 1; // count the persisted gossip messages
 
-                       if i == 1 || i % persistence_log_threshold == 0 {
+                       if latest_persistence_log.elapsed().as_secs() >= 60 {
                                println!("Persisting gossip message #{}", i);
+                               latest_persistence_log = Instant::now();
                        }
 
                        // has it been ten minutes? Just cache it
@@ -105,22 +103,7 @@ impl GossipPersister {
                        }
 
                        match &gossip_message {
-                               GossipMessage::InitialSyncComplete => {
-                                       // signal to the server that it may now serve dynamic responses and calculate
-                                       // snapshots
-                                       // we take this detour through the persister to ensure that all previous
-                                       // messages have already been persisted to the database
-                                       println!("Persister caught up with gossip!");
-                                       i -= 1; // this wasn't an actual gossip message that needed persisting
-                                       persistence_log_threshold = 50;
-                                       if !server_sync_completion_sent {
-                                               server_sync_completion_sent = true;
-                                               self.server_sync_completion_sender.send(()).await.unwrap();
-                                               println!("Server has been notified of persistence completion.");
-                                       }
-                               }
                                GossipMessage::ChannelAnnouncement(announcement) => {
-
                                        let scid = announcement.contents.short_channel_id;
                                        let scid_hex = hex_utils::hex_str(&scid.to_be_bytes());
                                        // scid is 8 bytes
index 672b75f3764fa5113afd0a98fa72c94a29f451f3..f76db3989815320c28a96906d15d488a5012e375 100644 (file)
@@ -16,7 +16,9 @@ use crate::{config, TestLogger};
 use crate::downloader::GossipRouter;
 use crate::types::{GossipMessage, GossipPeerManager};
 
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>, network_graph: Arc<NetworkGraph<TestLogger>>) {
+pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>,
+               completion_sender: mpsc::Sender<()>,
+               network_graph: Arc<NetworkGraph<TestLogger>>) {
        let mut key = [0; 32];
        let mut random_data = [0; 32];
        thread_rng().fill_bytes(&mut key);
@@ -114,7 +116,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
 
                        if needs_to_notify_persister {
                                needs_to_notify_persister = false;
-                               persistence_sender.send(GossipMessage::InitialSyncComplete).await.unwrap();
+                               completion_sender.send(()).await.unwrap();
                        }
                }
        });
index 9badc1e4c2abe5b87ba878b3d63a446461e8aed0..1bb3821ccf66677689525ac952263c8002406fc2 100644 (file)
@@ -15,7 +15,6 @@ pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketD
 pub(crate) enum GossipMessage {
        ChannelAnnouncement(ChannelAnnouncement),
        ChannelUpdate(ChannelUpdate),
-       InitialSyncComplete,
 }
 
 #[derive(Clone, Copy)]