Block tasks if the DB writes get behind, rather than growing the queue
[rapid-gossip-sync-server] / src / persistence.rs
index ded79b50086e722e3289d11f1a8e1b00ddef00ed..14a0173b6301e09e34ed89cee38dbac3511d760b 100644 (file)
@@ -8,11 +8,11 @@ use tokio::sync::mpsc;
 use tokio_postgres::NoTls;
 
 use crate::{config, hex_utils, TestLogger};
-use crate::types::{DetectedGossipMessage, GossipMessage};
+use crate::types::GossipMessage;
 
 pub(crate) struct GossipPersister {
-       pub(crate) gossip_persistence_sender: mpsc::Sender<DetectedGossipMessage>,
-       gossip_persistence_receiver: mpsc::Receiver<DetectedGossipMessage>,
+       pub(crate) gossip_persistence_sender: mpsc::Sender<GossipMessage>,
+       gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
        server_sync_completion_sender: mpsc::Sender<()>,
        network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
 }
@@ -20,7 +20,7 @@ pub(crate) struct GossipPersister {
 impl GossipPersister {
        pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self {
                let (gossip_persistence_sender, gossip_persistence_receiver) =
-                       mpsc::channel::<DetectedGossipMessage>(10000);
+                       mpsc::channel::<GossipMessage>(100);
                GossipPersister {
                        gossip_persistence_sender,
                        gossip_persistence_receiver,
@@ -93,7 +93,7 @@ impl GossipPersister {
                // TODO: it would be nice to have some sort of timeout here so after 10 seconds of
                // inactivity, some sort of message could be broadcast signaling the activation of request
                // processing
-               while let Some(detected_gossip_message) = &self.gossip_persistence_receiver.recv().await {
+               while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
                        i += 1; // count the persisted gossip messages
 
                        if i == 1 || i % persistence_log_threshold == 0 {
@@ -111,7 +111,7 @@ impl GossipPersister {
                                latest_graph_cache_time = Some(Instant::now());
                        }
 
-                       match &detected_gossip_message.message {
+                       match &gossip_message {
                                GossipMessage::InitialSyncComplete => {
                                        // signal to the server that it may now serve dynamic responses and calculate
                                        // snapshots