X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=src%2Fpersistence.rs;h=8bb7b188af72fb692aa218a20b1ede9bd24c116a;hb=0ff0a2a8a0530b185a6094fff7094dced137a862;hp=22abf0216d8575692a1e3e7f9b7d4ce66844df66;hpb=2cf9129a187a66cfed10f9583c14fc8ee7339a18;p=rapid-gossip-sync-server diff --git a/src/persistence.rs b/src/persistence.rs index 22abf02..8bb7b18 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,43 +1,38 @@ use std::fs::OpenOptions; use std::io::{BufWriter, Write}; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; +use lightning::log_info; use lightning::routing::gossip::NetworkGraph; use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use tokio::sync::mpsc; -use tokio_postgres::NoTls; use crate::config; use crate::types::GossipMessage; const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15); -pub(crate) struct GossipPersister { +pub(crate) struct GossipPersister where L::Target: Logger { gossip_persistence_receiver: mpsc::Receiver, - network_graph: Arc>>, + network_graph: Arc>, + logger: L } -impl GossipPersister { - pub fn new(network_graph: Arc>>) -> (Self, mpsc::Sender) { +impl GossipPersister where L::Target: Logger { + pub fn new(network_graph: Arc>, logger: L) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); (GossipPersister { gossip_persistence_receiver, - network_graph + network_graph, + logger }, gossip_persistence_sender) } pub(crate) async fn persist_gossip(&mut self) { - let connection_config = config::db_connection_config(); - let (mut client, connection) = - connection_config.connect(NoTls).await.unwrap(); - - tokio::spawn(async move { - if let Err(e) = connection.await { - panic!("connection error: {}", e); - } - }); + let mut client = crate::connect_to_db().await; { // initialize the database @@ -53,6 +48,11 @@ impl GossipPersister { config::upgrade_db(cur_schema[0].get(0), &mut client).await; } + let preparation = client.execute("set time zone UTC", &[]).await; + if let Err(preparation_error) = preparation { + panic!("db preparation error: {}", preparation_error); + } + let initialization = client .execute( // TODO: figure out a way to fix the id value without Postgres complaining about @@ -100,7 +100,7 @@ impl GossipPersister { i += 1; // count the persisted gossip messages if latest_persistence_log.elapsed().as_secs() >= 60 { - println!("Persisting gossip message #{}", i); + log_info!(self.logger, "Persisting gossip message #{}", i); latest_persistence_log = Instant::now(); } @@ -178,7 +178,7 @@ impl GossipPersister { } fn persist_network_graph(&self) { - println!("Caching network graph…"); + log_info!(self.logger, "Caching network graph…"); let cache_path = config::network_graph_cache_path(); let file = OpenOptions::new() .create(true) @@ -190,6 +190,6 @@ impl GossipPersister { let mut writer = BufWriter::new(file); self.network_graph.write(&mut writer).unwrap(); writer.flush().unwrap(); - println!("Cached network graph!"); + log_info!(self.logger, "Cached network graph!"); } }