Block tasks if the DB writes get behind, rather than growing the queue 2022-08-block-on-slow-db
authorMatt Corallo <git@bluematt.me>
Mon, 22 Aug 2022 02:40:23 +0000 (02:40 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 22 Aug 2022 02:45:19 +0000 (02:45 +0000)
src/downloader.rs
src/persistence.rs
src/tracking.rs
src/types.rs

index ccb75c8d6d16c570cd541778cd71e380a32018b2..691a9e678667a7df2ab494079e8d976818784e53 100644 (file)
@@ -1,14 +1,14 @@
 use std::sync::{Arc, RwLock};
-use std::time::{SystemTime, UNIX_EPOCH};
 
 use bitcoin::secp256k1::PublicKey;
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
 use tokio::sync::mpsc;
+use tokio::sync::mpsc::error::TrySendError;
 
 use crate::{GossipChainAccess, TestLogger};
-use crate::types::{DetectedGossipMessage, GossipMessage};
+use crate::types::GossipMessage;
 
 pub(crate) struct GossipCounter {
        pub(crate) channel_announcements: u64,
@@ -31,7 +31,7 @@ impl GossipCounter {
 pub(crate) struct GossipRouter {
        pub(crate) native_router: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<TestLogger>>>, GossipChainAccess, Arc<TestLogger>>>,
        pub(crate) counter: RwLock<GossipCounter>,
-       pub(crate) sender: mpsc::Sender<DetectedGossipMessage>,
+       pub(crate) sender: mpsc::Sender<GossipMessage>,
 }
 
 impl MessageSendEventsProvider for GossipRouter {
@@ -46,8 +46,6 @@ impl RoutingMessageHandler for GossipRouter {
        }
 
        fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
-               let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-
                let mut counter = self.counter.write().unwrap();
 
                let output_value = self.native_router.handle_channel_announcement(msg).map_err(|error| {
@@ -61,33 +59,29 @@ impl RoutingMessageHandler for GossipRouter {
 
                counter.channel_announcements += 1;
                let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
-               let detected_gossip_message = DetectedGossipMessage {
-                       message: gossip_message,
-                       timestamp_seen: timestamp_seen as u32,
-               };
-               let sender = self.sender.clone();
-               tokio::spawn(async move {
-                       let _ = sender.send(detected_gossip_message).await;
-               });
+               if let Err(err) = self.sender.try_send(gossip_message) {
+                       let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
+                       tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
+                               self.sender.send(gossip_message).await.unwrap();
+                       })});
+               }
 
                Ok(output_value)
        }
 
        fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
-               let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
                let output_value = self.native_router.handle_channel_update(msg)?;
 
                let mut counter = self.counter.write().unwrap();
                counter.channel_updates += 1;
                let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
-               let detected_gossip_message = DetectedGossipMessage {
-                       message: gossip_message,
-                       timestamp_seen: timestamp_seen as u32,
-               };
-               let sender = self.sender.clone();
-               tokio::spawn(async move {
-                       let _ = sender.send(detected_gossip_message).await;
-               });
+
+               if let Err(err) = self.sender.try_send(gossip_message) {
+                       let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
+                       tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
+                               self.sender.send(gossip_message).await.unwrap();
+                       })});
+               }
 
                Ok(output_value)
        }
index ded79b50086e722e3289d11f1a8e1b00ddef00ed..14a0173b6301e09e34ed89cee38dbac3511d760b 100644 (file)
@@ -8,11 +8,11 @@ use tokio::sync::mpsc;
 use tokio_postgres::NoTls;
 
 use crate::{config, hex_utils, TestLogger};
-use crate::types::{DetectedGossipMessage, GossipMessage};
+use crate::types::GossipMessage;
 
 pub(crate) struct GossipPersister {
-       pub(crate) gossip_persistence_sender: mpsc::Sender<DetectedGossipMessage>,
-       gossip_persistence_receiver: mpsc::Receiver<DetectedGossipMessage>,
+       pub(crate) gossip_persistence_sender: mpsc::Sender<GossipMessage>,
+       gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
        server_sync_completion_sender: mpsc::Sender<()>,
        network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
 }
@@ -20,7 +20,7 @@ pub(crate) struct GossipPersister {
 impl GossipPersister {
        pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self {
                let (gossip_persistence_sender, gossip_persistence_receiver) =
-                       mpsc::channel::<DetectedGossipMessage>(10000);
+                       mpsc::channel::<GossipMessage>(100);
                GossipPersister {
                        gossip_persistence_sender,
                        gossip_persistence_receiver,
@@ -93,7 +93,7 @@ impl GossipPersister {
                // TODO: it would be nice to have some sort of timeout here so after 10 seconds of
                // inactivity, some sort of message could be broadcast signaling the activation of request
                // processing
-               while let Some(detected_gossip_message) = &self.gossip_persistence_receiver.recv().await {
+               while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
                        i += 1; // count the persisted gossip messages
 
                        if i == 1 || i % persistence_log_threshold == 0 {
@@ -111,7 +111,7 @@ impl GossipPersister {
                                latest_graph_cache_time = Some(Instant::now());
                        }
 
-                       match &detected_gossip_message.message {
+                       match &gossip_message {
                                GossipMessage::InitialSyncComplete => {
                                        // signal to the server that it may now serve dynamic responses and calculate
                                        // snapshots
index 9c249232be5d3f37c1f08c5257906040893212f5..77d0e9a7b629406b42a74404e4d3730af36de313 100644 (file)
@@ -1,6 +1,6 @@
 use std::net::SocketAddr;
 use std::sync::{Arc, RwLock};
-use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+use std::time::{Duration, Instant};
 
 use bitcoin::hashes::hex::ToHex;
 use bitcoin::secp256k1::{PublicKey, SecretKey};
@@ -15,10 +15,10 @@ use tokio::sync::mpsc;
 
 use crate::{config, TestLogger};
 use crate::downloader::{GossipCounter, GossipRouter};
-use crate::types::{DetectedGossipMessage, GossipChainAccess, GossipMessage, GossipPeerManager};
+use crate::types::{GossipChainAccess, GossipMessage, GossipPeerManager};
 use crate::verifier::ChainVerifier;
 
-pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
+pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
        let mut key = [0; 32];
        let mut random_data = [0; 32];
        thread_rng().fill_bytes(&mut key);
@@ -95,7 +95,6 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGos
                        let sleep = tokio::time::sleep(Duration::from_secs(5));
                        sleep.await;
 
-                       let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
                        let router_clone = Arc::clone(&local_router);
 
                        {
@@ -144,13 +143,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGos
 
                        if needs_to_notify_persister {
                                needs_to_notify_persister = false;
-                               let sender = local_persistence_sender.clone();
-                               tokio::spawn(async move {
-                                       let _ = sender.send(DetectedGossipMessage {
-                                               timestamp_seen: current_timestamp as u32,
-                                               message: GossipMessage::InitialSyncComplete,
-                                       }).await;
-                               });
+                               local_persistence_sender.send(GossipMessage::InitialSyncComplete).await.unwrap();
                        }
                }
        });
index 63a2adba1ac06ac2f5ec39d5ce19509b7a86a854..7b66921090db179d455dfa9777d95a35dbff1c9e 100644 (file)
@@ -10,17 +10,13 @@ use crate::verifier::ChainVerifier;
 pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
 pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, Arc<ErroringMessageHandler>, Arc<GossipRouter>, Arc<TestLogger>, Arc<IgnoringMessageHandler>>>;
 
+#[derive(Debug)]
 pub(crate) enum GossipMessage {
        ChannelAnnouncement(ChannelAnnouncement),
        ChannelUpdate(ChannelUpdate),
        InitialSyncComplete,
 }
 
-pub(crate) struct DetectedGossipMessage {
-       pub(crate) timestamp_seen: u32,
-       pub(crate) message: GossipMessage,
-}
-
 pub(crate) struct TestLogger {}
 
 impl TestLogger {