X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fpersistence.rs;h=8bb7b188af72fb692aa218a20b1ede9bd24c116a;hb=0ff0a2a8a0530b185a6094fff7094dced137a862;hp=14a0173b6301e09e34ed89cee38dbac3511d760b;hpb=981811f42b970bc4de57e5c59827255ef0dc9560;p=rapid-gossip-sync-server diff --git a/src/persistence.rs b/src/persistence.rs index 14a0173..8bb7b18 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,44 +1,38 @@ use std::fs::OpenOptions; -use std::io::BufWriter; +use std::io::{BufWriter, Write}; +use std::ops::Deref; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; +use lightning::log_info; use lightning::routing::gossip::NetworkGraph; +use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use tokio::sync::mpsc; -use tokio_postgres::NoTls; -use crate::{config, hex_utils, TestLogger}; +use crate::config; use crate::types::GossipMessage; -pub(crate) struct GossipPersister { - pub(crate) gossip_persistence_sender: mpsc::Sender, +const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15); + +pub(crate) struct GossipPersister where L::Target: Logger { gossip_persistence_receiver: mpsc::Receiver, - server_sync_completion_sender: mpsc::Sender<()>, - network_graph: Arc>>, + network_graph: Arc>, + logger: L } -impl GossipPersister { - pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc>>) -> Self { +impl GossipPersister where L::Target: Logger { + pub fn new(network_graph: Arc>, logger: L) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); - GossipPersister { - gossip_persistence_sender, + (GossipPersister { gossip_persistence_receiver, - server_sync_completion_sender, - network_graph - } + network_graph, + logger + }, gossip_persistence_sender) } pub(crate) async fn persist_gossip(&mut self) { - let connection_config = config::db_connection_config(); - let (client, connection) = - connection_config.connect(NoTls).await.unwrap(); - - tokio::spawn(async move { - if let Err(e) = connection.await { - panic!("connection error: {}", e); - } - }); + let mut client = crate::connect_to_db().await; { // initialize the database @@ -49,6 +43,16 @@ impl GossipPersister { panic!("db init error: {}", initialization_error); } + let cur_schema = client.query("SELECT db_schema FROM config WHERE id = $1", &[&1]).await.unwrap(); + if !cur_schema.is_empty() { + config::upgrade_db(cur_schema[0].get(0), &mut client).await; + } + + let preparation = client.execute("set time zone UTC", &[]).await; + if let Err(preparation_error) = preparation { + panic!("db preparation error: {}", preparation_error); + } + let initialization = client .execute( // TODO: figure out a way to fix the id value without Postgres complaining about @@ -85,107 +89,51 @@ impl GossipPersister { } } - // print log statement every 10,000 messages - let mut persistence_log_threshold = 10000; + // print log statement every minute + let mut latest_persistence_log = Instant::now() - Duration::from_secs(60); let mut i = 0u32; - let mut server_sync_completion_sent = false; - let mut latest_graph_cache_time: Option = None; + let mut latest_graph_cache_time = Instant::now(); // TODO: it would be nice to have some sort of timeout here so after 10 seconds of // inactivity, some sort of message could be broadcast signaling the activation of request // processing while let Some(gossip_message) = &self.gossip_persistence_receiver.recv().await { i += 1; // count the persisted gossip messages - if i == 1 || i % persistence_log_threshold == 0 { - println!("Persisting gossip message #{}", i); + if latest_persistence_log.elapsed().as_secs() >= 60 { + log_info!(self.logger, "Persisting gossip message #{}", i); + latest_persistence_log = Instant::now(); } - if let Some(last_cache_time) = latest_graph_cache_time { - // has it been ten minutes? Just cache it - if last_cache_time.elapsed().as_secs() >= 600 { - self.persist_network_graph(); - latest_graph_cache_time = Some(Instant::now()); - } - } else { - // initialize graph cache timer - latest_graph_cache_time = Some(Instant::now()); + // has it been ten minutes? Just cache it + if latest_graph_cache_time.elapsed().as_secs() >= 600 { + self.persist_network_graph(); + latest_graph_cache_time = Instant::now(); } match &gossip_message { - GossipMessage::InitialSyncComplete => { - // signal to the server that it may now serve dynamic responses and calculate - // snapshots - // we take this detour through the persister to ensure that all previous - // messages have already been persisted to the database - println!("Persister caught up with gossip!"); - i -= 1; // this wasn't an actual gossip message that needed persisting - persistence_log_threshold = 50; - if !server_sync_completion_sent { - server_sync_completion_sent = true; - self.server_sync_completion_sender.send(()).await.unwrap(); - println!("Server has been notified of persistence completion."); - } - - // now, cache the persisted network graph - // also persist the network graph here - let mut too_soon = false; - if let Some(latest_graph_cache_time) = latest_graph_cache_time { - let time_since_last_cached = latest_graph_cache_time.elapsed().as_secs(); - // don't cache more frequently than every 2 minutes - too_soon = time_since_last_cached < 120; - } - if too_soon { - println!("Network graph has been cached too recently."); - }else { - latest_graph_cache_time = Some(Instant::now()); - self.persist_network_graph(); - } - } GossipMessage::ChannelAnnouncement(announcement) => { - - let scid = announcement.contents.short_channel_id; - let scid_hex = hex_utils::hex_str(&scid.to_be_bytes()); - // scid is 8 bytes - // block height is the first three bytes - // to obtain block height, shift scid right by 5 bytes (40 bits) - let block_height = (scid >> 5 * 8) as i32; - let chain_hash = announcement.contents.chain_hash.as_ref(); - let chain_hash_hex = hex_utils::hex_str(chain_hash); + let scid = announcement.contents.short_channel_id as i64; // start with the type prefix, which is already known a priori - let mut announcement_signed = Vec::new(); // vec![1, 0]; + let mut announcement_signed = Vec::new(); announcement.write(&mut announcement_signed).unwrap(); - let result = client + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client .execute("INSERT INTO channel_announcements (\ short_channel_id, \ - block_height, \ - chain_hash, \ announcement_signed \ - ) VALUES ($1, $2, $3, $4) ON CONFLICT (short_channel_id) DO NOTHING", &[ - &scid_hex, - &block_height, - &chain_hash_hex, + ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[ + &scid, &announcement_signed - ]).await; - if result.is_err() { - panic!("error: {}", result.err().unwrap()); - } + ])).await.unwrap().unwrap(); } GossipMessage::ChannelUpdate(update) => { - let scid = update.contents.short_channel_id; - let scid_hex = hex_utils::hex_str(&scid.to_be_bytes()); - - let chain_hash = update.contents.chain_hash.as_ref(); - let chain_hash_hex = hex_utils::hex_str(chain_hash); + let scid = update.contents.short_channel_id as i64; let timestamp = update.contents.timestamp as i64; - let channel_flags = update.contents.flags as i32; - let direction = channel_flags & 1; - let disable = (channel_flags & 2) > 0; - - let composite_index = format!("{}:{}:{}", scid_hex, timestamp, direction); + let direction = (update.contents.flags & 1) == 1; + let disable = (update.contents.flags & 2) > 0; let cltv_expiry_delta = update.contents.cltv_expiry_delta as i32; let htlc_minimum_msat = update.contents.htlc_minimum_msat as i64; @@ -195,13 +143,11 @@ impl GossipPersister { let htlc_maximum_msat = update.contents.htlc_maximum_msat as i64; // start with the type prefix, which is already known a priori - let mut update_signed = Vec::new(); // vec![1, 2]; + let mut update_signed = Vec::new(); update.write(&mut update_signed).unwrap(); - let result = client + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client .execute("INSERT INTO channel_updates (\ - composite_index, \ - chain_hash, \ short_channel_id, \ timestamp, \ channel_flags, \ @@ -213,12 +159,10 @@ impl GossipPersister { fee_proportional_millionths, \ htlc_maximum_msat, \ blob_signed \ - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (composite_index) DO NOTHING", &[ - &composite_index, - &chain_hash_hex, - &scid_hex, + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", &[ + &scid, ×tamp, - &channel_flags, + &(update.contents.flags as i16), &direction, &disable, &cltv_expiry_delta, @@ -227,17 +171,14 @@ impl GossipPersister { &fee_proportional_millionths, &htlc_maximum_msat, &update_signed - ]).await; - if result.is_err() { - panic!("error: {}", result.err().unwrap()); - } + ])).await.unwrap().unwrap(); } } } } fn persist_network_graph(&self) { - println!("Caching network graph…"); + log_info!(self.logger, "Caching network graph…"); let cache_path = config::network_graph_cache_path(); let file = OpenOptions::new() .create(true) @@ -245,9 +186,10 @@ impl GossipPersister { .truncate(true) .open(&cache_path) .unwrap(); - self.network_graph.remove_stale_channels(); + self.network_graph.remove_stale_channels_and_tracking(); let mut writer = BufWriter::new(file); self.network_graph.write(&mut writer).unwrap(); - println!("Cached network graph!"); + writer.flush().unwrap(); + log_info!(self.logger, "Cached network graph!"); } }