Fix expired value being sent as the latest update.
[rapid-gossip-sync-server] / src / lookup.rs
index fa66103ec53a3af9f657a37ffbbd7a18266c8fcd..0878860020a61d373d311f6cff150d101c389c48 100644 (file)
@@ -7,11 +7,10 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH};
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
 use lightning::routing::gossip::NetworkGraph;
 use lightning::util::ser::Readable;
-use tokio_postgres::{Client, Connection, NoTls, Socket};
-use tokio_postgres::tls::NoTlsStream;
+use tokio_postgres::Client;
 
 use futures::StreamExt;
-use lightning::log_info;
+use lightning::{log_gossip, log_info};
 use lightning::util::logger::Logger;
 
 use crate::config;
@@ -32,7 +31,7 @@ pub(super) struct UpdateDelta {
 }
 
 pub(super) struct DirectedUpdateDelta {
-       pub(super) last_update_before_seen: Option<UnsignedChannelUpdate>,
+       pub(super) last_update_before_seen: Option<UpdateDelta>,
        pub(super) mutated_properties: MutatedProperties,
        pub(super) latest_update_after_seen: Option<UpdateDelta>,
        pub(super) serialization_update_flags: Option<u8>,
@@ -68,11 +67,6 @@ impl Default for DirectedUpdateDelta {
        }
 }
 
-pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>) {
-       let connection_config = config::db_connection_config();
-       connection_config.connect(NoTls).await.unwrap()
-}
-
 /// Fetch all the channel announcements that are presently in the network graph, regardless of
 /// whether they had been seen before.
 /// Also include all announcements for which the first update was announced
@@ -95,7 +89,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
 
        log_info!(logger, "Obtaining corresponding database entries");
        // get all the channel announcements that are currently in the network graph
-       let announcement_rows = client.query_raw("SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
+       let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
        let mut pinned_rows = Box::pin(announcement_rows);
 
        let mut announcement_count = 0;
@@ -106,8 +100,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
                let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
 
                let scid = unsigned_announcement.short_channel_id;
-               let current_seen_timestamp_object: SystemTime = current_announcement_row.get("seen");
-               let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+               let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
 
                let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
                (*current_channel_delta).announcement = Some(AnnouncementDelta {
@@ -133,7 +126,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
                let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
                        [&channel_ids, &last_sync_timestamp_float];
                let newer_oldest_directional_updates = client.query_raw("
-                       SELECT * FROM (
+                       SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
                                SELECT DISTINCT ON (short_channel_id) *
                                FROM (
                                        SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
@@ -152,8 +145,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
                        let current_row = row_res.unwrap();
 
                        let scid: i64 = current_row.get("short_channel_id");
-                       let current_seen_timestamp_object: SystemTime = current_row.get("seen");
-                       let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+                       let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
 
                        // the newer of the two oldest seen directional updates came after last sync timestamp
                        let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
@@ -237,7 +229,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
        // there was an update in either direction that happened after the last sync (to avoid
        // collecting too many reference updates)
        let reference_rows = client.query_raw("
-               SELECT id, direction, blob_signed FROM channel_updates
+               SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
                WHERE id IN (
                        SELECT DISTINCT ON (short_channel_id, direction) id
                        FROM channel_updates
@@ -264,6 +256,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
                non_intermediate_ids.insert(update_id);
 
                let direction: bool = current_reference.get("direction");
+               let seen = current_reference.get::<_, i64>("seen") as u32;
                let blob: Vec<u8> = current_reference.get("blob_signed");
                let mut readable = Cursor::new(blob);
                let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
@@ -275,7 +268,12 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
                } else {
                        (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
                };
-               update_delta.last_update_before_seen = Some(unsigned_channel_update);
+               log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
+               update_delta.last_update_before_seen = Some(UpdateDelta {
+                       seen,
+                       update: unsigned_channel_update,
+               });
+
                reference_row_count += 1;
        }
 
@@ -287,9 +285,10 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
        // have been omitted)
 
        let intermediate_updates = client.query_raw("
-               SELECT id, direction, blob_signed, seen
+               SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
                FROM channel_updates
                WHERE seen >= TO_TIMESTAMP($1)
+               ORDER BY short_channel_id ASC, timestamp DESC
                ", [last_sync_timestamp_float]).await.unwrap();
        let mut pinned_updates = Box::pin(intermediate_updates);
        log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
@@ -308,8 +307,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
                intermediate_update_count += 1;
 
                let direction: bool = intermediate_update.get("direction");
-               let current_seen_timestamp_object: SystemTime = intermediate_update.get("seen");
-               let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+               let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
                let blob: Vec<u8> = intermediate_update.get("blob_signed");
                let mut readable = Cursor::new(blob);
                let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
@@ -347,22 +345,22 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
 
                // determine mutations
                if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
-                       if unsigned_channel_update.flags != last_seen_update.flags {
+                       if unsigned_channel_update.flags != last_seen_update.update.flags {
                                update_delta.mutated_properties.flags = true;
                        }
-                       if unsigned_channel_update.cltv_expiry_delta != last_seen_update.cltv_expiry_delta {
+                       if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
                                update_delta.mutated_properties.cltv_expiry_delta = true;
                        }
-                       if unsigned_channel_update.htlc_minimum_msat != last_seen_update.htlc_minimum_msat {
+                       if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
                                update_delta.mutated_properties.htlc_minimum_msat = true;
                        }
-                       if unsigned_channel_update.fee_base_msat != last_seen_update.fee_base_msat {
+                       if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
                                update_delta.mutated_properties.fee_base_msat = true;
                        }
-                       if unsigned_channel_update.fee_proportional_millionths != last_seen_update.fee_proportional_millionths {
+                       if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
                                update_delta.mutated_properties.fee_proportional_millionths = true;
                        }
-                       if unsigned_channel_update.htlc_maximum_msat != last_seen_update.htlc_maximum_msat {
+                       if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
                                update_delta.mutated_properties.htlc_maximum_msat = true;
                        }
                }