From: Matt Corallo Date: Sun, 2 Jul 2023 16:12:25 +0000 (+0000) Subject: Let postgres filter timestamps rather than doing it in Rust X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=60ad4958a2144b229ed09359afbb3711c192053b;p=rapid-gossip-sync-server Let postgres filter timestamps rather than doing it in Rust --- diff --git a/src/config.rs b/src/config.rs index acf7ac9..ffbfb45 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,6 +4,7 @@ use std::convert::TryInto; use std::env; use std::io::Cursor; use std::net::{SocketAddr, ToSocketAddrs}; +use std::time::Duration; use bitcoin::Network; use bitcoin::hashes::hex::FromHex; @@ -19,7 +20,7 @@ pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hou /// If the last update in either direction was more than six days ago, we send a reminder /// That reminder may be either in the form of a channel announcement, or in the form of empty /// updates in both directions. -pub(crate) const CHANNEL_REMINDER_AGE: u32 = 6 * 24 * 3600; +pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60); pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true; pub(crate) fn network() -> Network { diff --git a/src/lookup.rs b/src/lookup.rs index 779b253..6429d5c 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -75,6 +75,7 @@ pub(super) async fn connect_to_db() -> (Client, Connection) /// after `last_sync_timestamp` pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32) { println!("Obtaining channel ids from network graph"); + let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); let channel_ids = { let read_only_graph = network_graph.read_only(); println!("Retrieved read-only network graph copy"); @@ -117,27 +118,28 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ // here is where the channels whose first update in either direction occurred after // `last_seen_timestamp` are added to the selection let newer_oldest_directional_updates = client.query(" - SELECT DISTINCT ON (short_channel_id) * - FROM ( - SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen - FROM channel_updates - WHERE short_channel_id = any($1) - ORDER BY short_channel_id ASC, direction ASC, seen ASC - ) AS directional_last_seens - ORDER BY short_channel_id ASC, seen DESC - ", &[&channel_ids]).await.unwrap(); + SELECT * FROM ( + SELECT DISTINCT ON (short_channel_id) * + FROM ( + SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen + FROM channel_updates + WHERE short_channel_id = any($1) + ORDER BY short_channel_id ASC, direction ASC, seen ASC + ) AS directional_last_seens + ORDER BY short_channel_id ASC, seen DESC + ) AS distinct_chans + WHERE distinct_chans.seen >= $2 + ", &[&channel_ids, &last_sync_timestamp_object]).await.unwrap(); for current_row in newer_oldest_directional_updates { let scid: i64 = current_row.get("short_channel_id"); let current_seen_timestamp_object: SystemTime = current_row.get("seen"); let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; - if current_seen_timestamp > last_sync_timestamp { - // the newer of the two oldest seen directional updates came after last sync timestamp - let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); - // first time a channel was seen in both directions - (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp); - } + // the newer of the two oldest seen directional updates came after last sync timestamp + let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); + // first time a channel was seen in both directions + (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp); } } @@ -148,54 +150,52 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ // Steps: // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction) - let current_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; - let reminder_threshold_timestamp = current_timestamp.saturating_sub(config::CHANNEL_REMINDER_AGE); + let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap(); let read_only_graph = network_graph.read_only(); let older_latest_directional_updates = client.query(" - SELECT DISTINCT ON (short_channel_id) * - FROM ( - SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen - FROM channel_updates - WHERE short_channel_id = any($1) - ORDER BY short_channel_id ASC, direction ASC, seen DESC - ) AS directional_last_seens - ORDER BY short_channel_id ASC, seen ASC - ", &[&channel_ids]).await.unwrap(); + SELECT short_channel_id FROM ( + SELECT DISTINCT ON (short_channel_id) * + FROM ( + SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen + FROM channel_updates + WHERE short_channel_id = any($1) + ORDER BY short_channel_id ASC, direction ASC, seen DESC + ) AS directional_last_seens + ORDER BY short_channel_id ASC, seen ASC + ) AS distinct_chans + WHERE distinct_chans.seen <= $2 + ", &[&channel_ids, &reminder_threshold_timestamp]).await.unwrap(); for current_row in older_latest_directional_updates { let scid: i64 = current_row.get("short_channel_id"); - let current_seen_timestamp_object: SystemTime = current_row.get("seen"); - let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; - if current_seen_timestamp <= reminder_threshold_timestamp { - // annotate this channel as requiring that reminders be sent to the client - let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); - - // way might be able to get away with not using this - (*current_channel_delta).requires_reminder = true; - - if let Some(current_channel_info) = read_only_graph.channel(scid as u64) { - if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() { - // we don't send reminders if we don't have bidirectional update data - continue; - } - - if let Some(info) = current_channel_info.one_to_two.as_ref() { - let flags: u8 = if info.enabled { 0 } else { 2 }; - let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default()); - current_update.serialization_update_flags = Some(flags); - } - - if let Some(info) = current_channel_info.two_to_one.as_ref() { - let flags: u8 = if info.enabled { 1 } else { 3 }; - let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()); - current_update.serialization_update_flags = Some(flags); - } - } else { - // we don't send reminders if we don't have the channel + // annotate this channel as requiring that reminders be sent to the client + let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); + + // way might be able to get away with not using this + (*current_channel_delta).requires_reminder = true; + + if let Some(current_channel_info) = read_only_graph.channel(scid as u64) { + if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() { + // we don't send reminders if we don't have bidirectional update data continue; } + + if let Some(info) = current_channel_info.one_to_two.as_ref() { + let flags: u8 = if info.enabled { 0 } else { 2 }; + let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default()); + current_update.serialization_update_flags = Some(flags); + } + + if let Some(info) = current_channel_info.two_to_one.as_ref() { + let flags: u8 = if info.enabled { 1 } else { 3 }; + let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()); + current_update.serialization_update_flags = Some(flags); + } + } else { + // we don't send reminders if we don't have the channel + continue; } } }