Detect beginning of static channel status streaks.
[rapid-gossip-sync-server] / src / lookup.rs
index 0878860020a61d373d311f6cff150d101c389c48..fbc16729bb96cccccd7e6fec7d7ec72d0f349fcf 100644 (file)
@@ -31,10 +31,15 @@ pub(super) struct UpdateDelta {
 }
 
 pub(super) struct DirectedUpdateDelta {
+       /// the last update we saw prior to the user-provided timestamp
        pub(super) last_update_before_seen: Option<UpdateDelta>,
-       pub(super) mutated_properties: MutatedProperties,
+       /// the latest update we saw overall
        pub(super) latest_update_after_seen: Option<UpdateDelta>,
-       pub(super) serialization_update_flags: Option<u8>,
+       /// the set of all mutated properties across all updates between the last seen by the user and
+       /// the latest one known to us
+       pub(super) mutated_properties: MutatedProperties,
+       /// Specifically for reminder updates, the flag-only value to send to the client
+       pub(super) serialization_update_flags: Option<u8>
 }
 
 pub(super) struct ChannelDelta {
@@ -166,56 +171,99 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
                // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
                let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
 
-               let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
-                       [&channel_ids, &reminder_threshold_timestamp];
-               let older_latest_directional_updates = client.query_raw("
-                       SELECT short_channel_id FROM (
-                               SELECT DISTINCT ON (short_channel_id) *
-                               FROM (
-                                       SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
-                                       FROM channel_updates
-                                       WHERE short_channel_id = any($1)
-                                       ORDER BY short_channel_id ASC, direction ASC, seen DESC
-                               ) AS directional_last_seens
-                               ORDER BY short_channel_id ASC, seen ASC
-                       ) AS distinct_chans
-                       WHERE distinct_chans.seen <= TO_TIMESTAMP($2)
-                       ", params).await.unwrap();
-               let mut pinned_updates = Box::pin(older_latest_directional_updates);
-
+               log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
+               let reminder_lookup_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE * 3).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
+               let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
+
+               /*
+               What exactly is the below query doing?
+
+               First, the inner query groups all channel updates by their scid/direction combination,
+               and then sorts those in reverse chronological order by the "seen" column.
+
+               Then, each row is annotated based on whether its subsequent row for the same scid/direction
+               combination has a different value for any one of these six fields:
+               disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
+               Those are simply the properties we use to keep track of channel mutations.
+
+               The outer query takes all of those results and selects the first value that has a distinct
+               successor for each scid/direction combination. That yields the first instance at which
+               a given channel configuration was received after any prior mutations.
+
+               Knowing that, we can check whether or not there have been any mutations within the
+               reminder requirement window. Because we only care about that window (and potentially the
+               2-week-window), we pre-filter the scanned updates by only those that were received within
+               3x the timeframe that we consider necessitates reminders.
+               */
+
+               let mutated_updates = client.query_raw("
+               SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
+                       SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
+                               disable<>lead(disable) OVER w1
+                                       OR
+                               cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
+                                       OR
+                               htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
+                                       OR
+                               fee_base_msat<>lead(fee_base_msat) OVER w1
+                                       OR
+                               fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
+                                       OR
+                               htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
+                               TRUE
+                       ) has_distinct_successor
+                       FROM channel_updates
+                       WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
+                       WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
+               ) _
+               WHERE has_distinct_successor
+               ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
+               ", params).await.unwrap();
+
+               let mut pinned_updates = Box::pin(mutated_updates);
                let mut older_latest_directional_update_count = 0;
                while let Some(row_res) = pinned_updates.next().await {
                        let current_row = row_res.unwrap();
-                       let scid: i64 = current_row.get("short_channel_id");
-
-                       // annotate this channel as requiring that reminders be sent to the client
-                       let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
-
-                       // way might be able to get away with not using this
-                       (*current_channel_delta).requires_reminder = true;
-
-                       if let Some(current_channel_info) = network_graph.read_only().channel(scid as u64) {
-                               if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
-                                       // we don't send reminders if we don't have bidirectional update data
+                       let seen = current_row.get::<_, i64>("seen") as u32;
+
+                       if seen < reminder_threshold_timestamp as u32 {
+                               let blob: Vec<u8> = current_row.get("blob_signed");
+                               let mut readable = Cursor::new(blob);
+                               let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+
+                               let scid = unsigned_channel_update.short_channel_id;
+                               let direction: bool = current_row.get("direction");
+
+                               let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+
+                               // We might be able to get away with not using this
+                               (*current_channel_delta).requires_reminder = true;
+                               older_latest_directional_update_count += 1;
+
+                               if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
+                                       if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
+                                               // we don't send reminders if we don't have bidirectional update data
+                                               continue;
+                                       }
+
+                                       if let Some(info) = current_channel_info.one_to_two.as_ref() {
+                                               let flags: u8 = if info.enabled { 0 } else { 2 };
+                                               let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
+                                               current_update.serialization_update_flags = Some(flags);
+                                       }
+
+                                       if let Some(info) = current_channel_info.two_to_one.as_ref() {
+                                               let flags: u8 = if info.enabled { 1 } else { 3 };
+                                               let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
+                                               current_update.serialization_update_flags = Some(flags);
+                                       }
+                               } else {
+                                       // we don't send reminders if we don't have the channel
                                        continue;
                                }
 
-                               if let Some(info) = current_channel_info.one_to_two.as_ref() {
-                                       let flags: u8 = if info.enabled { 0 } else { 2 };
-                                       let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
-                                       current_update.serialization_update_flags = Some(flags);
-                               }
-
-                               if let Some(info) = current_channel_info.two_to_one.as_ref() {
-                                       let flags: u8 = if info.enabled { 1 } else { 3 };
-                                       let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
-                                       current_update.serialization_update_flags = Some(flags);
-                               }
-                       } else {
-                               // we don't send reminders if we don't have the channel
-                               continue;
+                               log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
                        }
-                       older_latest_directional_update_count += 1;
                }
                log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
        }