From 981811f42b970bc4de57e5c59827255ef0dc9560 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 22 Aug 2022 02:40:23 +0000 Subject: [PATCH] Block tasks if the DB writes get behind, rather than growing the queue --- src/downloader.rs | 38 ++++++++++++++++---------------------- src/persistence.rs | 12 ++++++------ src/tracking.rs | 15 ++++----------- src/types.rs | 6 +----- 4 files changed, 27 insertions(+), 44 deletions(-) diff --git a/src/downloader.rs b/src/downloader.rs index ccb75c8..691a9e6 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -1,14 +1,14 @@ use std::sync::{Arc, RwLock}; -use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::secp256k1::PublicKey; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider}; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use crate::{GossipChainAccess, TestLogger}; -use crate::types::{DetectedGossipMessage, GossipMessage}; +use crate::types::GossipMessage; pub(crate) struct GossipCounter { pub(crate) channel_announcements: u64, @@ -31,7 +31,7 @@ impl GossipCounter { pub(crate) struct GossipRouter { pub(crate) native_router: Arc>>, GossipChainAccess, Arc>>, pub(crate) counter: RwLock, - pub(crate) sender: mpsc::Sender, + pub(crate) sender: mpsc::Sender, } impl MessageSendEventsProvider for GossipRouter { @@ -46,8 +46,6 @@ impl RoutingMessageHandler for GossipRouter { } fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result { - let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); - let mut counter = self.counter.write().unwrap(); let output_value = self.native_router.handle_channel_announcement(msg).map_err(|error| { @@ -61,33 +59,29 @@ impl RoutingMessageHandler for GossipRouter { counter.channel_announcements += 1; let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone()); - let detected_gossip_message = DetectedGossipMessage { - message: gossip_message, - timestamp_seen: timestamp_seen as u32, - }; - let sender = self.sender.clone(); - tokio::spawn(async move { - let _ = sender.send(detected_gossip_message).await; - }); + if let Err(err) = self.sender.try_send(gossip_message) { + let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; + tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { + self.sender.send(gossip_message).await.unwrap(); + })}); + } Ok(output_value) } fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result { - let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); let output_value = self.native_router.handle_channel_update(msg)?; let mut counter = self.counter.write().unwrap(); counter.channel_updates += 1; let gossip_message = GossipMessage::ChannelUpdate(msg.clone()); - let detected_gossip_message = DetectedGossipMessage { - message: gossip_message, - timestamp_seen: timestamp_seen as u32, - }; - let sender = self.sender.clone(); - tokio::spawn(async move { - let _ = sender.send(detected_gossip_message).await; - }); + + if let Err(err) = self.sender.try_send(gossip_message) { + let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; + tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { + self.sender.send(gossip_message).await.unwrap(); + })}); + } Ok(output_value) } diff --git a/src/persistence.rs b/src/persistence.rs index ded79b5..14a0173 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -8,11 +8,11 @@ use tokio::sync::mpsc; use tokio_postgres::NoTls; use crate::{config, hex_utils, TestLogger}; -use crate::types::{DetectedGossipMessage, GossipMessage}; +use crate::types::GossipMessage; pub(crate) struct GossipPersister { - pub(crate) gossip_persistence_sender: mpsc::Sender, - gossip_persistence_receiver: mpsc::Receiver, + pub(crate) gossip_persistence_sender: mpsc::Sender, + gossip_persistence_receiver: mpsc::Receiver, server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc>>, } @@ -20,7 +20,7 @@ pub(crate) struct GossipPersister { impl GossipPersister { pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc>>) -> Self { let (gossip_persistence_sender, gossip_persistence_receiver) = - mpsc::channel::(10000); + mpsc::channel::(100); GossipPersister { gossip_persistence_sender, gossip_persistence_receiver, @@ -93,7 +93,7 @@ impl GossipPersister { // TODO: it would be nice to have some sort of timeout here so after 10 seconds of // inactivity, some sort of message could be broadcast signaling the activation of request // processing - while let Some(detected_gossip_message) = &self.gossip_persistence_receiver.recv().await { + while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await { i += 1; // count the persisted gossip messages if i == 1 || i % persistence_log_threshold == 0 { @@ -111,7 +111,7 @@ impl GossipPersister { latest_graph_cache_time = Some(Instant::now()); } - match &detected_gossip_message.message { + match &gossip_message { GossipMessage::InitialSyncComplete => { // signal to the server that it may now serve dynamic responses and calculate // snapshots diff --git a/src/tracking.rs b/src/tracking.rs index 9c24923..77d0e9a 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; use std::sync::{Arc, RwLock}; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant}; use bitcoin::hashes::hex::ToHex; use bitcoin::secp256k1::{PublicKey, SecretKey}; @@ -15,10 +15,10 @@ use tokio::sync::mpsc; use crate::{config, TestLogger}; use crate::downloader::{GossipCounter, GossipRouter}; -use crate::types::{DetectedGossipMessage, GossipChainAccess, GossipMessage, GossipPeerManager}; +use crate::types::{GossipChainAccess, GossipMessage, GossipPeerManager}; use crate::verifier::ChainVerifier; -pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, network_graph: Arc>>) { +pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, network_graph: Arc>>) { let mut key = [0; 32]; let mut random_data = [0; 32]; thread_rng().fill_bytes(&mut key); @@ -95,7 +95,6 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender; pub(crate) type GossipPeerManager = Arc, Arc, Arc, Arc>>; +#[derive(Debug)] pub(crate) enum GossipMessage { ChannelAnnouncement(ChannelAnnouncement), ChannelUpdate(ChannelUpdate), InitialSyncComplete, } -pub(crate) struct DetectedGossipMessage { - pub(crate) timestamp_seen: u32, - pub(crate) message: GossipMessage, -} - pub(crate) struct TestLogger {} impl TestLogger { -- 2.39.5