Remove indirection in initial sync completion
[rapid-gossip-sync-server] / src / persistence.rs
index ed371203729e82cb574cc1678abf7f033c63e690..ef0a9372f21b28c8a703a5fec9a7844a5b0d1374 100644 (file)
@@ -1,7 +1,7 @@
 use std::fs::OpenOptions;
 use std::io::{BufWriter, Write};
 use std::sync::Arc;
-use std::time::Instant;
+use std::time::{Duration, Instant};
 use lightning::routing::gossip::NetworkGraph;
 use lightning::util::ser::Writeable;
 use tokio::sync::mpsc;
@@ -12,17 +12,15 @@ use crate::types::GossipMessage;
 
 pub(crate) struct GossipPersister {
        gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
-       server_sync_completion_sender: mpsc::Sender<()>,
        network_graph: Arc<NetworkGraph<TestLogger>>,
 }
 
 impl GossipPersister {
-       pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
+       pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
                let (gossip_persistence_sender, gossip_persistence_receiver) =
                        mpsc::channel::<GossipMessage>(100);
                (GossipPersister {
                        gossip_persistence_receiver,
-                       server_sync_completion_sender,
                        network_graph
                }, gossip_persistence_sender)
        }
@@ -83,10 +81,9 @@ impl GossipPersister {
                        }
                }
 
-               // print log statement every 10,000 messages
-               let mut persistence_log_threshold = 10000;
+               // print log statement every minute
+               let mut latest_persistence_log = Instant::now() - Duration::from_secs(60);
                let mut i = 0u32;
-               let mut server_sync_completion_sent = false;
                let mut latest_graph_cache_time = Instant::now();
                // TODO: it would be nice to have some sort of timeout here so after 10 seconds of
                // inactivity, some sort of message could be broadcast signaling the activation of request
@@ -94,8 +91,9 @@ impl GossipPersister {
                while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
                        i += 1; // count the persisted gossip messages
 
-                       if i == 1 || i % persistence_log_threshold == 0 {
+                       if latest_persistence_log.elapsed().as_secs() >= 60 {
                                println!("Persisting gossip message #{}", i);
+                               latest_persistence_log = Instant::now();
                        }
 
                        // has it been ten minutes? Just cache it
@@ -105,22 +103,7 @@ impl GossipPersister {
                        }
 
                        match &gossip_message {
-                               GossipMessage::InitialSyncComplete => {
-                                       // signal to the server that it may now serve dynamic responses and calculate
-                                       // snapshots
-                                       // we take this detour through the persister to ensure that all previous
-                                       // messages have already been persisted to the database
-                                       println!("Persister caught up with gossip!");
-                                       i -= 1; // this wasn't an actual gossip message that needed persisting
-                                       persistence_log_threshold = 50;
-                                       if !server_sync_completion_sent {
-                                               server_sync_completion_sent = true;
-                                               self.server_sync_completion_sender.send(()).await.unwrap();
-                                               println!("Server has been notified of persistence completion.");
-                                       }
-                               }
                                GossipMessage::ChannelAnnouncement(announcement) => {
-
                                        let scid = announcement.contents.short_channel_id;
                                        let scid_hex = hex_utils::hex_str(&scid.to_be_bytes());
                                        // scid is 8 bytes