Block tasks if the DB writes get behind, rather than growing the queue
[rapid-gossip-sync-server] / src / downloader.rs
index ccb75c8d6d16c570cd541778cd71e380a32018b2..691a9e678667a7df2ab494079e8d976818784e53 100644 (file)
@@ -1,14 +1,14 @@
 use std::sync::{Arc, RwLock};
-use std::time::{SystemTime, UNIX_EPOCH};
 
 use bitcoin::secp256k1::PublicKey;
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
 use tokio::sync::mpsc;
+use tokio::sync::mpsc::error::TrySendError;
 
 use crate::{GossipChainAccess, TestLogger};
-use crate::types::{DetectedGossipMessage, GossipMessage};
+use crate::types::GossipMessage;
 
 pub(crate) struct GossipCounter {
        pub(crate) channel_announcements: u64,
@@ -31,7 +31,7 @@ impl GossipCounter {
 pub(crate) struct GossipRouter {
        pub(crate) native_router: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<TestLogger>>>, GossipChainAccess, Arc<TestLogger>>>,
        pub(crate) counter: RwLock<GossipCounter>,
-       pub(crate) sender: mpsc::Sender<DetectedGossipMessage>,
+       pub(crate) sender: mpsc::Sender<GossipMessage>,
 }
 
 impl MessageSendEventsProvider for GossipRouter {
@@ -46,8 +46,6 @@ impl RoutingMessageHandler for GossipRouter {
        }
 
        fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
-               let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-
                let mut counter = self.counter.write().unwrap();
 
                let output_value = self.native_router.handle_channel_announcement(msg).map_err(|error| {
@@ -61,33 +59,29 @@ impl RoutingMessageHandler for GossipRouter {
 
                counter.channel_announcements += 1;
                let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
-               let detected_gossip_message = DetectedGossipMessage {
-                       message: gossip_message,
-                       timestamp_seen: timestamp_seen as u32,
-               };
-               let sender = self.sender.clone();
-               tokio::spawn(async move {
-                       let _ = sender.send(detected_gossip_message).await;
-               });
+               if let Err(err) = self.sender.try_send(gossip_message) {
+                       let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
+                       tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
+                               self.sender.send(gossip_message).await.unwrap();
+                       })});
+               }
 
                Ok(output_value)
        }
 
        fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
-               let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
                let output_value = self.native_router.handle_channel_update(msg)?;
 
                let mut counter = self.counter.write().unwrap();
                counter.channel_updates += 1;
                let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
-               let detected_gossip_message = DetectedGossipMessage {
-                       message: gossip_message,
-                       timestamp_seen: timestamp_seen as u32,
-               };
-               let sender = self.sender.clone();
-               tokio::spawn(async move {
-                       let _ = sender.send(detected_gossip_message).await;
-               });
+
+               if let Err(err) = self.sender.try_send(gossip_message) {
+                       let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
+                       tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
+                               self.sender.send(gossip_message).await.unwrap();
+                       })});
+               }
 
                Ok(output_value)
        }