X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=inline;f=src%2Fpersistence.rs;h=8bb7b188af72fb692aa218a20b1ede9bd24c116a;hb=0ff0a2a8a0530b185a6094fff7094dced137a862;hp=d961b30ea19b19f7d0d7e7bdb1489419f4b5ee30;hpb=b466192ee447c3729920f9c7f6d765545421dab5;p=rapid-gossip-sync-server diff --git a/src/persistence.rs b/src/persistence.rs index d961b30..8bb7b18 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,40 +1,38 @@ use std::fs::OpenOptions; use std::io::{BufWriter, Write}; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; +use lightning::log_info; use lightning::routing::gossip::NetworkGraph; +use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use tokio::sync::mpsc; -use tokio_postgres::NoTls; -use crate::{config, hex_utils, TestLogger}; +use crate::config; use crate::types::GossipMessage; -pub(crate) struct GossipPersister { +const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15); + +pub(crate) struct GossipPersister where L::Target: Logger { gossip_persistence_receiver: mpsc::Receiver, - network_graph: Arc>, + network_graph: Arc>, + logger: L } -impl GossipPersister { - pub fn new(network_graph: Arc>) -> (Self, mpsc::Sender) { +impl GossipPersister where L::Target: Logger { + pub fn new(network_graph: Arc>, logger: L) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); (GossipPersister { gossip_persistence_receiver, - network_graph + network_graph, + logger }, gossip_persistence_sender) } pub(crate) async fn persist_gossip(&mut self) { - let connection_config = config::db_connection_config(); - let (mut client, connection) = - connection_config.connect(NoTls).await.unwrap(); - - tokio::spawn(async move { - if let Err(e) = connection.await { - panic!("connection error: {}", e); - } - }); + let mut client = crate::connect_to_db().await; { // initialize the database @@ -50,6 +48,11 @@ impl GossipPersister { config::upgrade_db(cur_schema[0].get(0), &mut client).await; } + let preparation = client.execute("set time zone UTC", &[]).await; + if let Err(preparation_error) = preparation { + panic!("db preparation error: {}", preparation_error); + } + let initialization = client .execute( // TODO: figure out a way to fix the id value without Postgres complaining about @@ -97,7 +100,7 @@ impl GossipPersister { i += 1; // count the persisted gossip messages if latest_persistence_log.elapsed().as_secs() >= 60 { - println!("Persisting gossip message #{}", i); + log_info!(self.logger, "Persisting gossip message #{}", i); latest_persistence_log = Instant::now(); } @@ -109,49 +112,28 @@ impl GossipPersister { match &gossip_message { GossipMessage::ChannelAnnouncement(announcement) => { - let scid = announcement.contents.short_channel_id; - let scid_hex = hex_utils::hex_str(&scid.to_be_bytes()); - // scid is 8 bytes - // block height is the first three bytes - // to obtain block height, shift scid right by 5 bytes (40 bits) - let block_height = (scid >> 5 * 8) as i32; - let chain_hash = announcement.contents.chain_hash.as_ref(); - let chain_hash_hex = hex_utils::hex_str(chain_hash); + let scid = announcement.contents.short_channel_id as i64; // start with the type prefix, which is already known a priori let mut announcement_signed = Vec::new(); announcement.write(&mut announcement_signed).unwrap(); - let result = client + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client .execute("INSERT INTO channel_announcements (\ short_channel_id, \ - block_height, \ - chain_hash, \ announcement_signed \ - ) VALUES ($1, $2, $3, $4) ON CONFLICT (short_channel_id) DO NOTHING", &[ - &scid_hex, - &block_height, - &chain_hash_hex, + ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[ + &scid, &announcement_signed - ]).await; - if result.is_err() { - panic!("error: {}", result.err().unwrap()); - } + ])).await.unwrap().unwrap(); } GossipMessage::ChannelUpdate(update) => { - let scid = update.contents.short_channel_id; - let scid_hex = hex_utils::hex_str(&scid.to_be_bytes()); - - let chain_hash = update.contents.chain_hash.as_ref(); - let chain_hash_hex = hex_utils::hex_str(chain_hash); + let scid = update.contents.short_channel_id as i64; let timestamp = update.contents.timestamp as i64; - let channel_flags = update.contents.flags as i32; - let direction = channel_flags & 1; - let disable = (channel_flags & 2) > 0; - - let composite_index = format!("{}:{}:{}", scid_hex, timestamp, direction); + let direction = (update.contents.flags & 1) == 1; + let disable = (update.contents.flags & 2) > 0; let cltv_expiry_delta = update.contents.cltv_expiry_delta as i32; let htlc_minimum_msat = update.contents.htlc_minimum_msat as i64; @@ -164,10 +146,8 @@ impl GossipPersister { let mut update_signed = Vec::new(); update.write(&mut update_signed).unwrap(); - let result = client + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client .execute("INSERT INTO channel_updates (\ - composite_index, \ - chain_hash, \ short_channel_id, \ timestamp, \ channel_flags, \ @@ -179,12 +159,10 @@ impl GossipPersister { fee_proportional_millionths, \ htlc_maximum_msat, \ blob_signed \ - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (composite_index) DO NOTHING", &[ - &composite_index, - &chain_hash_hex, - &scid_hex, + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", &[ + &scid, ×tamp, - &channel_flags, + &(update.contents.flags as i16), &direction, &disable, &cltv_expiry_delta, @@ -193,17 +171,14 @@ impl GossipPersister { &fee_proportional_millionths, &htlc_maximum_msat, &update_signed - ]).await; - if result.is_err() { - panic!("error: {}", result.err().unwrap()); - } + ])).await.unwrap().unwrap(); } } } } fn persist_network_graph(&self) { - println!("Caching network graph…"); + log_info!(self.logger, "Caching network graph…"); let cache_path = config::network_graph_cache_path(); let file = OpenOptions::new() .create(true) @@ -211,10 +186,10 @@ impl GossipPersister { .truncate(true) .open(&cache_path) .unwrap(); - self.network_graph.remove_stale_channels(); + self.network_graph.remove_stale_channels_and_tracking(); let mut writer = BufWriter::new(file); self.network_graph.write(&mut writer).unwrap(); writer.flush().unwrap(); - println!("Cached network graph!"); + log_info!(self.logger, "Cached network graph!"); } }