X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Flookup.rs;h=6429d5c9371cbc3b988cc57411de585771d8c56e;hb=60ad4958a2144b229ed09359afbb3711c192053b;hp=779b253d22e211bd5b06e2e48e5ed7c04fe8394a;hpb=30ded89530e432c026308c536386cb8c8a4d6774;p=rapid-gossip-sync-server diff --git a/src/lookup.rs b/src/lookup.rs index 779b253..6429d5c 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -75,6 +75,7 @@ pub(super) async fn connect_to_db() -> (Client, Connection) /// after `last_sync_timestamp` pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32) { println!("Obtaining channel ids from network graph"); + let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); let channel_ids = { let read_only_graph = network_graph.read_only(); println!("Retrieved read-only network graph copy"); @@ -117,27 +118,28 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ // here is where the channels whose first update in either direction occurred after // `last_seen_timestamp` are added to the selection let newer_oldest_directional_updates = client.query(" - SELECT DISTINCT ON (short_channel_id) * - FROM ( - SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen - FROM channel_updates - WHERE short_channel_id = any($1) - ORDER BY short_channel_id ASC, direction ASC, seen ASC - ) AS directional_last_seens - ORDER BY short_channel_id ASC, seen DESC - ", &[&channel_ids]).await.unwrap(); + SELECT * FROM ( + SELECT DISTINCT ON (short_channel_id) * + FROM ( + SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen + FROM channel_updates + WHERE short_channel_id = any($1) + ORDER BY short_channel_id ASC, direction ASC, seen ASC + ) AS directional_last_seens + ORDER BY short_channel_id ASC, seen DESC + ) AS distinct_chans + WHERE distinct_chans.seen >= $2 + ", &[&channel_ids, &last_sync_timestamp_object]).await.unwrap(); for current_row in newer_oldest_directional_updates { let scid: i64 = current_row.get("short_channel_id"); let current_seen_timestamp_object: SystemTime = current_row.get("seen"); let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; - if current_seen_timestamp > last_sync_timestamp { - // the newer of the two oldest seen directional updates came after last sync timestamp - let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); - // first time a channel was seen in both directions - (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp); - } + // the newer of the two oldest seen directional updates came after last sync timestamp + let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); + // first time a channel was seen in both directions + (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp); } } @@ -148,54 +150,52 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ // Steps: // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction) - let current_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; - let reminder_threshold_timestamp = current_timestamp.saturating_sub(config::CHANNEL_REMINDER_AGE); + let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap(); let read_only_graph = network_graph.read_only(); let older_latest_directional_updates = client.query(" - SELECT DISTINCT ON (short_channel_id) * - FROM ( - SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen - FROM channel_updates - WHERE short_channel_id = any($1) - ORDER BY short_channel_id ASC, direction ASC, seen DESC - ) AS directional_last_seens - ORDER BY short_channel_id ASC, seen ASC - ", &[&channel_ids]).await.unwrap(); + SELECT short_channel_id FROM ( + SELECT DISTINCT ON (short_channel_id) * + FROM ( + SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen + FROM channel_updates + WHERE short_channel_id = any($1) + ORDER BY short_channel_id ASC, direction ASC, seen DESC + ) AS directional_last_seens + ORDER BY short_channel_id ASC, seen ASC + ) AS distinct_chans + WHERE distinct_chans.seen <= $2 + ", &[&channel_ids, &reminder_threshold_timestamp]).await.unwrap(); for current_row in older_latest_directional_updates { let scid: i64 = current_row.get("short_channel_id"); - let current_seen_timestamp_object: SystemTime = current_row.get("seen"); - let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; - if current_seen_timestamp <= reminder_threshold_timestamp { - // annotate this channel as requiring that reminders be sent to the client - let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); - - // way might be able to get away with not using this - (*current_channel_delta).requires_reminder = true; - - if let Some(current_channel_info) = read_only_graph.channel(scid as u64) { - if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() { - // we don't send reminders if we don't have bidirectional update data - continue; - } - - if let Some(info) = current_channel_info.one_to_two.as_ref() { - let flags: u8 = if info.enabled { 0 } else { 2 }; - let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default()); - current_update.serialization_update_flags = Some(flags); - } - - if let Some(info) = current_channel_info.two_to_one.as_ref() { - let flags: u8 = if info.enabled { 1 } else { 3 }; - let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()); - current_update.serialization_update_flags = Some(flags); - } - } else { - // we don't send reminders if we don't have the channel + // annotate this channel as requiring that reminders be sent to the client + let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); + + // way might be able to get away with not using this + (*current_channel_delta).requires_reminder = true; + + if let Some(current_channel_info) = read_only_graph.channel(scid as u64) { + if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() { + // we don't send reminders if we don't have bidirectional update data continue; } + + if let Some(info) = current_channel_info.one_to_two.as_ref() { + let flags: u8 = if info.enabled { 0 } else { 2 }; + let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default()); + current_update.serialization_update_flags = Some(flags); + } + + if let Some(info) = current_channel_info.two_to_one.as_ref() { + let flags: u8 = if info.enabled { 1 } else { 3 }; + let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()); + current_update.serialization_update_flags = Some(flags); + } + } else { + // we don't send reminders if we don't have the channel + continue; } } }