use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::ops::Deref;
-use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};
use lightning::log_info;
use lightning::routing::gossip::NetworkGraph;
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
+use tokio::runtime::Runtime;
use tokio::sync::{mpsc, Mutex, Semaphore};
use crate::config;
pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
network_graph: Arc<NetworkGraph<L>>,
+ tokio_runtime: Runtime,
logger: L
}
pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
let (gossip_persistence_sender, gossip_persistence_receiver) =
mpsc::channel::<GossipMessage>(100);
+ let runtime = Runtime::new().unwrap();
(GossipPersister {
gossip_persistence_receiver,
network_graph,
+ tokio_runtime: runtime,
logger
}, gossip_persistence_sender)
}
pub(crate) async fn persist_gossip(&mut self) {
- let mut client = crate::connect_to_db().await;
+ { // initialize the database
+ // this client instance is only used once
+ let mut client = crate::connect_to_db().await;
- {
- // initialize the database
let initialization = client
.execute(config::db_config_table_creation_query(), &[])
.await;
insert_limiter.acquire().await.unwrap().forget();
let limiter_ref = Arc::clone(&insert_limiter);
+ let client = {
+ let mut connections_set = connections_cache.lock().await;
+ let client = if connections_set.is_empty() {
+ crate::connect_to_db().await
+ } else {
+ connections_set.pop().unwrap()
+ };
+ client
+ };
+
let connections_cache_ref = Arc::clone(&connections_cache);
match gossip_message {
GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
let mut announcement_signed = Vec::new();
announcement.write(&mut announcement_signed).unwrap();
- let _task = tokio::spawn(async move {
- let client;
- {
- let mut connections_set = connections_cache_ref.lock().await;
- if connections_set.is_empty() {
- mem::drop(connections_set);
- client = crate::connect_to_db().await;
- } else {
- client = connections_set.pop().unwrap();
- }
- }
+ let _task = self.tokio_runtime.spawn(async move {
if cfg!(test) && seen_override.is_some() {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
// this may not be used outside test cfg
let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64;
- let _task = tokio::spawn(async move {
- let client;
- {
- let mut connections_set = connections_cache_ref.lock().await;
- if connections_set.is_empty() {
- mem::drop(connections_set);
- client = crate::connect_to_db().await;
- } else {
- client = connections_set.pop().unwrap();
- }
- }
+ let _task = self.tokio_runtime.spawn(async move {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute(insertion_statement, &[
&scid,
}
#[cfg(test)]
for task in tasks_spawned {
- task.await;
+ task.await.unwrap();
}
}