Do DB insertions in parallel
authorMatt Corallo <git@bluematt.me>
Mon, 29 Jan 2024 17:24:32 +0000 (17:24 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 29 Jan 2024 17:43:55 +0000 (17:43 +0000)
When inserting new gossip into the DB, we block the LDK peer
handling if we get behind. This is mostly okay, but can cause ping
timeouts and reconnections, which isn't ideal. To limit how often
we should see this, here we move to doing the new gossip insertions
in parallel.

src/persistence.rs

index 4ede02f5258cac98cc977f71853f120d17e7aadf..9dc537d835530f6c0bd63870785a11b21b4c031d 100644 (file)
@@ -1,18 +1,20 @@
 use std::fs::OpenOptions;
 use std::io::{BufWriter, Write};
 use std::ops::Deref;
+use std::mem;
 use std::sync::Arc;
 use std::time::{Duration, Instant};
 use lightning::log_info;
 use lightning::routing::gossip::NetworkGraph;
 use lightning::util::logger::Logger;
 use lightning::util::ser::Writeable;
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, Mutex, Semaphore};
 
 use crate::config;
 use crate::types::GossipMessage;
 
 const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15);
+const INSERT_PARALELLISM: usize = 16;
 
 pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
        gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
@@ -93,10 +95,14 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                let mut latest_persistence_log = Instant::now() - Duration::from_secs(60);
                let mut i = 0u32;
                let mut latest_graph_cache_time = Instant::now();
+               let insert_limiter = Arc::new(Semaphore::new(INSERT_PARALELLISM));
+               let connections_cache = Arc::new(Mutex::new(Vec::with_capacity(INSERT_PARALELLISM)));
+               #[cfg(test)]
+               let mut tasks_spawned = Vec::new();
                // TODO: it would be nice to have some sort of timeout here so after 10 seconds of
                // inactivity, some sort of message could be broadcast signaling the activation of request
                // processing
-               while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await {
+               while let Some(gossip_message) = self.gossip_persistence_receiver.recv().await {
                        i += 1; // count the persisted gossip messages
 
                        if latest_persistence_log.elapsed().as_secs() >= 60 {
@@ -109,8 +115,11 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                                self.persist_network_graph();
                                latest_graph_cache_time = Instant::now();
                        }
+                       insert_limiter.acquire().await.unwrap().forget();
 
-                       match &gossip_message {
+                       let limiter_ref = Arc::clone(&insert_limiter);
+                       let connections_cache_ref = Arc::clone(&connections_cache);
+                       match gossip_message {
                                GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
                                        let scid = announcement.contents.short_channel_id as i64;
 
@@ -118,27 +127,44 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                                        let mut announcement_signed = Vec::new();
                                        announcement.write(&mut announcement_signed).unwrap();
 
-                                       if cfg!(test) && seen_override.is_some() {
-                                               tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
-                                                       .execute("INSERT INTO channel_announcements (\
-                                                       short_channel_id, \
-                                                       announcement_signed, \
-                                                       seen \
-                                               ) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
-                                                               &scid,
-                                                               &announcement_signed,
-                                                               &(seen_override.unwrap() as f64)
-                                                       ])).await.unwrap().unwrap();
-                                       } else {
-                                               tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
-                                                       .execute("INSERT INTO channel_announcements (\
-                                                       short_channel_id, \
-                                                       announcement_signed \
-                                               ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
-                                                               &scid,
-                                                               &announcement_signed
-                                                       ])).await.unwrap().unwrap();
-                                       }
+                                       let _task = tokio::spawn(async move {
+                                               let client;
+                                               {
+                                                       let mut connections_set = connections_cache_ref.lock().await;
+                                                       if connections_set.is_empty() {
+                                                               mem::drop(connections_set);
+                                                               client = crate::connect_to_db().await;
+                                                       } else {
+                                                               client = connections_set.pop().unwrap();
+                                                       }
+                                               }
+                                               if cfg!(test) && seen_override.is_some() {
+                                                       tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
+                                                               .execute("INSERT INTO channel_announcements (\
+                                                               short_channel_id, \
+                                                               announcement_signed, \
+                                                               seen \
+                                                       ) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
+                                                                       &scid,
+                                                                       &announcement_signed,
+                                                                       &(seen_override.unwrap() as f64)
+                                                               ])).await.unwrap().unwrap();
+                                               } else {
+                                                       tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
+                                                               .execute("INSERT INTO channel_announcements (\
+                                                               short_channel_id, \
+                                                               announcement_signed \
+                                                       ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
+                                                                       &scid,
+                                                                       &announcement_signed
+                                                               ])).await.unwrap().unwrap();
+                                               }
+                                               let mut connections_set = connections_cache_ref.lock().await;
+                                               connections_set.push(client);
+                                               limiter_ref.add_permits(1);
+                                       });
+                                       #[cfg(test)]
+                                       tasks_spawned.push(_task);
                                }
                                GossipMessage::ChannelUpdate(update, seen_override) => {
                                        let scid = update.contents.short_channel_id as i64;
@@ -193,25 +219,46 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                                        // this may not be used outside test cfg
                                        let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64;
 
-                                       tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
-                                               .execute(insertion_statement, &[
-                                                       &scid,
-                                                       &timestamp,
-                                                       #[cfg(test)]
-                                                               &_seen_timestamp,
-                                                       &(update.contents.flags as i16),
-                                                       &direction,
-                                                       &disable,
-                                                       &cltv_expiry_delta,
-                                                       &htlc_minimum_msat,
-                                                       &fee_base_msat,
-                                                       &fee_proportional_millionths,
-                                                       &htlc_maximum_msat,
-                                                       &update_signed
-                                               ])).await.unwrap().unwrap();
+                                       let _task = tokio::spawn(async move {
+                                               let client;
+                                               {
+                                                       let mut connections_set = connections_cache_ref.lock().await;
+                                                       if connections_set.is_empty() {
+                                                               mem::drop(connections_set);
+                                                               client = crate::connect_to_db().await;
+                                                       } else {
+                                                               client = connections_set.pop().unwrap();
+                                                       }
+                                               }
+                                               tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
+                                                       .execute(insertion_statement, &[
+                                                               &scid,
+                                                               &timestamp,
+                                                               #[cfg(test)]
+                                                                       &_seen_timestamp,
+                                                               &(update.contents.flags as i16),
+                                                               &direction,
+                                                               &disable,
+                                                               &cltv_expiry_delta,
+                                                               &htlc_minimum_msat,
+                                                               &fee_base_msat,
+                                                               &fee_proportional_millionths,
+                                                               &htlc_maximum_msat,
+                                                               &update_signed
+                                                       ])).await.unwrap().unwrap();
+                                               let mut connections_set = connections_cache_ref.lock().await;
+                                               connections_set.push(client);
+                                               limiter_ref.add_permits(1);
+                                       });
+                                       #[cfg(test)]
+                                       tasks_spawned.push(_task);
                                }
                        }
                }
+               #[cfg(test)]
+               for task in tasks_spawned {
+                       task.await;
+               }
        }
 
        fn persist_network_graph(&self) {