let initial_sync_complete = self.initial_sync_complete.clone();
if config::DOWNLOAD_NEW_GOSSIP {
- let (mut persister, persistence_sender) =
- GossipPersister::new(sync_completion_sender, Arc::clone(&self.network_graph));
+ let (mut persister, persistence_sender) = GossipPersister::new(Arc::clone(&self.network_graph));
println!("Starting gossip download");
- tokio::spawn(tracking::download_gossip(persistence_sender, Arc::clone(&self.network_graph)));
+ tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender,
+ Arc::clone(&self.network_graph)));
println!("Starting gossip db persistence listener");
tokio::spawn(async move { persister.persist_gossip().await; });
} else {
use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::sync::Arc;
-use std::time::Instant;
+use std::time::{Duration, Instant};
use lightning::routing::gossip::NetworkGraph;
use lightning::util::ser::Writeable;
use tokio::sync::mpsc;
pub(crate) struct GossipPersister {
gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
- server_sync_completion_sender: mpsc::Sender<()>,
network_graph: Arc<NetworkGraph<TestLogger>>,
}
impl GossipPersister {
- pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
+ pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
let (gossip_persistence_sender, gossip_persistence_receiver) =
mpsc::channel::<GossipMessage>(100);
(GossipPersister {
gossip_persistence_receiver,
- server_sync_completion_sender,
network_graph
}, gossip_persistence_sender)
}
}
}
- // print log statement every 10,000 messages
- let mut persistence_log_threshold = 10000;
+ // print log statement every minute
+ let mut latest_persistence_log = Instant::now() - Duration::from_secs(60);
let mut i = 0u32;
- let mut server_sync_completion_sent = false;
let mut latest_graph_cache_time = Instant::now();
// TODO: it would be nice to have some sort of timeout here so after 10 seconds of
// inactivity, some sort of message could be broadcast signaling the activation of request
while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
i += 1; // count the persisted gossip messages
- if i == 1 || i % persistence_log_threshold == 0 {
+ if latest_persistence_log.elapsed().as_secs() >= 60 {
println!("Persisting gossip message #{}", i);
+ latest_persistence_log = Instant::now();
}
// has it been ten minutes? Just cache it
}
match &gossip_message {
- GossipMessage::InitialSyncComplete => {
- // signal to the server that it may now serve dynamic responses and calculate
- // snapshots
- // we take this detour through the persister to ensure that all previous
- // messages have already been persisted to the database
- println!("Persister caught up with gossip!");
- i -= 1; // this wasn't an actual gossip message that needed persisting
- persistence_log_threshold = 50;
- if !server_sync_completion_sent {
- server_sync_completion_sent = true;
- self.server_sync_completion_sender.send(()).await.unwrap();
- println!("Server has been notified of persistence completion.");
- }
- }
GossipMessage::ChannelAnnouncement(announcement) => {
-
let scid = announcement.contents.short_channel_id;
let scid_hex = hex_utils::hex_str(&scid.to_be_bytes());
// scid is 8 bytes
use crate::downloader::GossipRouter;
use crate::types::{GossipMessage, GossipPeerManager};
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>, network_graph: Arc<NetworkGraph<TestLogger>>) {
+pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>,
+ completion_sender: mpsc::Sender<()>,
+ network_graph: Arc<NetworkGraph<TestLogger>>) {
let mut key = [0; 32];
let mut random_data = [0; 32];
thread_rng().fill_bytes(&mut key);
if needs_to_notify_persister {
needs_to_notify_persister = false;
- persistence_sender.send(GossipMessage::InitialSyncComplete).await.unwrap();
+ completion_sender.send(()).await.unwrap();
}
}
});