From c042b6900e72fdad588fb1137644852367f2d2e6 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Tue, 27 Jun 2023 15:57:37 -0400 Subject: [PATCH] Fix first update lookup. --- src/lookup.rs | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/lookup.rs b/src/lookup.rs index 25ea854..fbd89fa 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -97,13 +97,26 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ }); } - println!("Obtaining channel announcements whose first channel updates had not been seen yet"); + println!("Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync"); + /// Steps: + /// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction + /// — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction) + /// This will allow us to mark the first time updates in both directions were seen // here is where the channels whose first update in either direction occurred after // `last_seen_timestamp` are added to the selection - let unannounced_rows = client.query("SELECT blob_signed, seen FROM (SELECT DISTINCT ON (short_channel_id) short_channel_id, blob_signed, seen FROM channel_updates ORDER BY short_channel_id ASC, seen ASC) AS first_seens WHERE first_seens.seen >= $1", &[&last_sync_timestamp_object]).await.unwrap(); - for current_row in unannounced_rows { - + let newer_oldest_directional_updates = client.query(" + SELECT DISTINCT ON (short_channel_id) * + FROM ( + SELECT DISTINCT ON (short_channel_id, direction) blob_signed + FROM channel_updates + WHERE short_channel_id = any($1) + ORDER BY seen ASC, short_channel_id ASC, direction ASC + ) AS directional_last_seens + ORDER BY short_channel_id ASC, seen DESC + ", &[&channel_ids]).await.unwrap(); + + for current_row in newer_oldest_directional_updates { let blob: Vec = current_row.get("blob_signed"); let mut readable = Cursor::new(blob); let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents; @@ -111,8 +124,12 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ let current_seen_timestamp_object: SystemTime = current_row.get("seen"); let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; - let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default()); - (*current_channel_delta).first_update_seen = Some(current_seen_timestamp); + if (current_seen_timestamp > last_sync_timestamp) { + // the newer of the two oldest seen directional updates came after last sync timestamp + let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default()); + // first time a channel was seen in both directions + (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp); + } } } -- 2.39.5