From 1061d90e370d81dc4375be93ae62894ad8ab1ccf Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 29 Jan 2024 17:24:32 +0000 Subject: [PATCH] Do DB insertions in parallel When inserting new gossip into the DB, we block the LDK peer handling if we get behind. This is mostly okay, but can cause ping timeouts and reconnections, which isn't ideal. To limit how often we should see this, here we move to doing the new gossip insertions in parallel. --- src/persistence.rs | 127 +++++++++++++++++++++++++++++++-------------- 1 file changed, 87 insertions(+), 40 deletions(-) diff --git a/src/persistence.rs b/src/persistence.rs index 4ede02f..9dc537d 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,18 +1,20 @@ use std::fs::OpenOptions; use std::io::{BufWriter, Write}; use std::ops::Deref; +use std::mem; use std::sync::Arc; use std::time::{Duration, Instant}; use lightning::log_info; use lightning::routing::gossip::NetworkGraph; use lightning::util::logger::Logger; use lightning::util::ser::Writeable; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Mutex, Semaphore}; use crate::config; use crate::types::GossipMessage; const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15); +const INSERT_PARALELLISM: usize = 16; pub(crate) struct GossipPersister where L::Target: Logger { gossip_persistence_receiver: mpsc::Receiver, @@ -93,10 +95,14 @@ impl GossipPersister where L::Target: Logger { let mut latest_persistence_log = Instant::now() - Duration::from_secs(60); let mut i = 0u32; let mut latest_graph_cache_time = Instant::now(); + let insert_limiter = Arc::new(Semaphore::new(INSERT_PARALELLISM)); + let connections_cache = Arc::new(Mutex::new(Vec::with_capacity(INSERT_PARALELLISM))); + #[cfg(test)] + let mut tasks_spawned = Vec::new(); // TODO: it would be nice to have some sort of timeout here so after 10 seconds of // inactivity, some sort of message could be broadcast signaling the activation of request // processing - while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await { + while let Some(gossip_message) = self.gossip_persistence_receiver.recv().await { i += 1; // count the persisted gossip messages if latest_persistence_log.elapsed().as_secs() >= 60 { @@ -109,8 +115,11 @@ impl GossipPersister where L::Target: Logger { self.persist_network_graph(); latest_graph_cache_time = Instant::now(); } + insert_limiter.acquire().await.unwrap().forget(); - match &gossip_message { + let limiter_ref = Arc::clone(&insert_limiter); + let connections_cache_ref = Arc::clone(&connections_cache); + match gossip_message { GossipMessage::ChannelAnnouncement(announcement, seen_override) => { let scid = announcement.contents.short_channel_id as i64; @@ -118,27 +127,44 @@ impl GossipPersister where L::Target: Logger { let mut announcement_signed = Vec::new(); announcement.write(&mut announcement_signed).unwrap(); - if cfg!(test) && seen_override.is_some() { - tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client - .execute("INSERT INTO channel_announcements (\ - short_channel_id, \ - announcement_signed, \ - seen \ - ) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[ - &scid, - &announcement_signed, - &(seen_override.unwrap() as f64) - ])).await.unwrap().unwrap(); - } else { - tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client - .execute("INSERT INTO channel_announcements (\ - short_channel_id, \ - announcement_signed \ - ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[ - &scid, - &announcement_signed - ])).await.unwrap().unwrap(); - } + let _task = tokio::spawn(async move { + let client; + { + let mut connections_set = connections_cache_ref.lock().await; + if connections_set.is_empty() { + mem::drop(connections_set); + client = crate::connect_to_db().await; + } else { + client = connections_set.pop().unwrap(); + } + } + if cfg!(test) && seen_override.is_some() { + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute("INSERT INTO channel_announcements (\ + short_channel_id, \ + announcement_signed, \ + seen \ + ) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[ + &scid, + &announcement_signed, + &(seen_override.unwrap() as f64) + ])).await.unwrap().unwrap(); + } else { + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute("INSERT INTO channel_announcements (\ + short_channel_id, \ + announcement_signed \ + ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[ + &scid, + &announcement_signed + ])).await.unwrap().unwrap(); + } + let mut connections_set = connections_cache_ref.lock().await; + connections_set.push(client); + limiter_ref.add_permits(1); + }); + #[cfg(test)] + tasks_spawned.push(_task); } GossipMessage::ChannelUpdate(update, seen_override) => { let scid = update.contents.short_channel_id as i64; @@ -193,25 +219,46 @@ impl GossipPersister where L::Target: Logger { // this may not be used outside test cfg let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64; - tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client - .execute(insertion_statement, &[ - &scid, - ×tamp, - #[cfg(test)] - &_seen_timestamp, - &(update.contents.flags as i16), - &direction, - &disable, - &cltv_expiry_delta, - &htlc_minimum_msat, - &fee_base_msat, - &fee_proportional_millionths, - &htlc_maximum_msat, - &update_signed - ])).await.unwrap().unwrap(); + let _task = tokio::spawn(async move { + let client; + { + let mut connections_set = connections_cache_ref.lock().await; + if connections_set.is_empty() { + mem::drop(connections_set); + client = crate::connect_to_db().await; + } else { + client = connections_set.pop().unwrap(); + } + } + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute(insertion_statement, &[ + &scid, + ×tamp, + #[cfg(test)] + &_seen_timestamp, + &(update.contents.flags as i16), + &direction, + &disable, + &cltv_expiry_delta, + &htlc_minimum_msat, + &fee_base_msat, + &fee_proportional_millionths, + &htlc_maximum_msat, + &update_signed + ])).await.unwrap().unwrap(); + let mut connections_set = connections_cache_ref.lock().await; + connections_set.push(client); + limiter_ref.add_permits(1); + }); + #[cfg(test)] + tasks_spawned.push(_task); } } } + #[cfg(test)] + for task in tasks_spawned { + task.await; + } } fn persist_network_graph(&self) { -- 2.30.2