use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::sync::Arc;
-use std::time::Instant;
+use std::time::{Duration, Instant};
use lightning::routing::gossip::NetworkGraph;
use lightning::util::ser::Writeable;
use tokio::sync::mpsc;
pub(crate) struct GossipPersister {
gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
- server_sync_completion_sender: mpsc::Sender<()>,
network_graph: Arc<NetworkGraph<TestLogger>>,
}
impl GossipPersister {
- pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
+ pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> (Self, mpsc::Sender<GossipMessage>) {
let (gossip_persistence_sender, gossip_persistence_receiver) =
mpsc::channel::<GossipMessage>(100);
(GossipPersister {
gossip_persistence_receiver,
- server_sync_completion_sender,
network_graph
}, gossip_persistence_sender)
}
}
}
- // print log statement every 10,000 messages
- let mut persistence_log_threshold = 10000;
+ // print log statement every minute
+ let mut latest_persistence_log = Instant::now() - Duration::from_secs(60);
let mut i = 0u32;
- let mut server_sync_completion_sent = false;
let mut latest_graph_cache_time = Instant::now();
// TODO: it would be nice to have some sort of timeout here so after 10 seconds of
// inactivity, some sort of message could be broadcast signaling the activation of request
while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
i += 1; // count the persisted gossip messages
- if i == 1 || i % persistence_log_threshold == 0 {
+ if latest_persistence_log.elapsed().as_secs() >= 60 {
println!("Persisting gossip message #{}", i);
+ latest_persistence_log = Instant::now();
}
// has it been ten minutes? Just cache it
}
match &gossip_message {
- GossipMessage::InitialSyncComplete => {
- // signal to the server that it may now serve dynamic responses and calculate
- // snapshots
- // we take this detour through the persister to ensure that all previous
- // messages have already been persisted to the database
- println!("Persister caught up with gossip!");
- i -= 1; // this wasn't an actual gossip message that needed persisting
- persistence_log_threshold = 50;
- if !server_sync_completion_sent {
- server_sync_completion_sent = true;
- self.server_sync_completion_sender.send(()).await.unwrap();
- println!("Server has been notified of persistence completion.");
- }
- }
GossipMessage::ChannelAnnouncement(announcement) => {
-
let scid = announcement.contents.short_channel_id;
let scid_hex = hex_utils::hex_str(&scid.to_be_bytes());
// scid is 8 bytes