Fix first update lookup.
authorArik Sosman <git@arik.io>
Tue, 27 Jun 2023 19:57:37 +0000 (15:57 -0400)
committerArik Sosman <git@arik.io>
Tue, 27 Jun 2023 20:26:56 +0000 (16:26 -0400)
src/lookup.rs

index 25ea85422bf3ffdae4ee0acc7a7d07eb838269d1..fbd89fa41b9e838bf32e2b199970105b7b644c89 100644 (file)
@@ -97,13 +97,26 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
                });
        }
 
-       println!("Obtaining channel announcements whose first channel updates had not been seen yet");
+       println!("Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
+       /// Steps:
+       /// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
+       /// — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
+       /// This will allow us to mark the first time updates in both directions were seen
 
        // here is where the channels whose first update in either direction occurred after
        // `last_seen_timestamp` are added to the selection
-       let unannounced_rows = client.query("SELECT blob_signed, seen FROM (SELECT DISTINCT ON (short_channel_id) short_channel_id, blob_signed, seen FROM channel_updates ORDER BY short_channel_id ASC, seen ASC) AS first_seens WHERE first_seens.seen >= $1", &[&last_sync_timestamp_object]).await.unwrap();
-       for current_row in unannounced_rows {
-
+       let newer_oldest_directional_updates = client.query("
+               SELECT DISTINCT ON (short_channel_id) *
+               FROM (
+                       SELECT DISTINCT ON (short_channel_id, direction) blob_signed
+                       FROM channel_updates
+                       WHERE short_channel_id = any($1)
+                       ORDER BY seen ASC, short_channel_id ASC, direction ASC
+               ) AS directional_last_seens
+               ORDER BY short_channel_id ASC, seen DESC
+       ", &[&channel_ids]).await.unwrap();
+
+       for current_row in newer_oldest_directional_updates {
                let blob: Vec<u8> = current_row.get("blob_signed");
                let mut readable = Cursor::new(blob);
                let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
@@ -111,8 +124,12 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
                let current_seen_timestamp_object: SystemTime = current_row.get("seen");
                let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
 
-               let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
-               (*current_channel_delta).first_update_seen = Some(current_seen_timestamp);
+               if (current_seen_timestamp > last_sync_timestamp) {
+                       // the newer of the two oldest seen directional updates came after last sync timestamp
+                       let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+                       // first time a channel was seen in both directions
+                       (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
+               }
        }
 }