Detect beginning of static channel status streaks.
authorArik Sosman <git@arik.io>
Thu, 9 May 2024 06:58:29 +0000 (23:58 -0700)
committerArik Sosman <git@arik.io>
Fri, 10 May 2024 16:40:30 +0000 (09:40 -0700)
Previously, whenever a channel hasn't received updates in
6+ days, we would automatically send incremental reminders
that only contain the update flags.

However, if a channel has been received updates
within that six-day-timeframe, but all those updates were
identical, it would result in no updates being added to the
snapshot because the mutation set ended up empty and that
data would get purged from the serialization.

In order to avoid the reminder logic being duped by
channels simply being consistent, we now look up the
beginning of the latest continuous stretch of non-mutating
channel updates. If a channel's details have not been
altered in more than six days, we now send reminders no
matter the frequency with which channel updates have been
received since.

src/lookup.rs
src/serialization.rs

index 0878860020a61d373d311f6cff150d101c389c48..fbc16729bb96cccccd7e6fec7d7ec72d0f349fcf 100644 (file)
@@ -31,10 +31,15 @@ pub(super) struct UpdateDelta {
 }
 
 pub(super) struct DirectedUpdateDelta {
+       /// the last update we saw prior to the user-provided timestamp
        pub(super) last_update_before_seen: Option<UpdateDelta>,
-       pub(super) mutated_properties: MutatedProperties,
+       /// the latest update we saw overall
        pub(super) latest_update_after_seen: Option<UpdateDelta>,
-       pub(super) serialization_update_flags: Option<u8>,
+       /// the set of all mutated properties across all updates between the last seen by the user and
+       /// the latest one known to us
+       pub(super) mutated_properties: MutatedProperties,
+       /// Specifically for reminder updates, the flag-only value to send to the client
+       pub(super) serialization_update_flags: Option<u8>
 }
 
 pub(super) struct ChannelDelta {
@@ -166,56 +171,99 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
                // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
                let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
 
-               let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
-                       [&channel_ids, &reminder_threshold_timestamp];
-               let older_latest_directional_updates = client.query_raw("
-                       SELECT short_channel_id FROM (
-                               SELECT DISTINCT ON (short_channel_id) *
-                               FROM (
-                                       SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
-                                       FROM channel_updates
-                                       WHERE short_channel_id = any($1)
-                                       ORDER BY short_channel_id ASC, direction ASC, seen DESC
-                               ) AS directional_last_seens
-                               ORDER BY short_channel_id ASC, seen ASC
-                       ) AS distinct_chans
-                       WHERE distinct_chans.seen <= TO_TIMESTAMP($2)
-                       ", params).await.unwrap();
-               let mut pinned_updates = Box::pin(older_latest_directional_updates);
-
+               log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
+               let reminder_lookup_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE * 3).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
+               let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
+
+               /*
+               What exactly is the below query doing?
+
+               First, the inner query groups all channel updates by their scid/direction combination,
+               and then sorts those in reverse chronological order by the "seen" column.
+
+               Then, each row is annotated based on whether its subsequent row for the same scid/direction
+               combination has a different value for any one of these six fields:
+               disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
+               Those are simply the properties we use to keep track of channel mutations.
+
+               The outer query takes all of those results and selects the first value that has a distinct
+               successor for each scid/direction combination. That yields the first instance at which
+               a given channel configuration was received after any prior mutations.
+
+               Knowing that, we can check whether or not there have been any mutations within the
+               reminder requirement window. Because we only care about that window (and potentially the
+               2-week-window), we pre-filter the scanned updates by only those that were received within
+               3x the timeframe that we consider necessitates reminders.
+               */
+
+               let mutated_updates = client.query_raw("
+               SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
+                       SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
+                               disable<>lead(disable) OVER w1
+                                       OR
+                               cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
+                                       OR
+                               htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
+                                       OR
+                               fee_base_msat<>lead(fee_base_msat) OVER w1
+                                       OR
+                               fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
+                                       OR
+                               htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
+                               TRUE
+                       ) has_distinct_successor
+                       FROM channel_updates
+                       WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
+                       WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
+               ) _
+               WHERE has_distinct_successor
+               ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
+               ", params).await.unwrap();
+
+               let mut pinned_updates = Box::pin(mutated_updates);
                let mut older_latest_directional_update_count = 0;
                while let Some(row_res) = pinned_updates.next().await {
                        let current_row = row_res.unwrap();
-                       let scid: i64 = current_row.get("short_channel_id");
-
-                       // annotate this channel as requiring that reminders be sent to the client
-                       let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
-
-                       // way might be able to get away with not using this
-                       (*current_channel_delta).requires_reminder = true;
-
-                       if let Some(current_channel_info) = network_graph.read_only().channel(scid as u64) {
-                               if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
-                                       // we don't send reminders if we don't have bidirectional update data
+                       let seen = current_row.get::<_, i64>("seen") as u32;
+
+                       if seen < reminder_threshold_timestamp as u32 {
+                               let blob: Vec<u8> = current_row.get("blob_signed");
+                               let mut readable = Cursor::new(blob);
+                               let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+
+                               let scid = unsigned_channel_update.short_channel_id;
+                               let direction: bool = current_row.get("direction");
+
+                               let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+
+                               // We might be able to get away with not using this
+                               (*current_channel_delta).requires_reminder = true;
+                               older_latest_directional_update_count += 1;
+
+                               if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
+                                       if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
+                                               // we don't send reminders if we don't have bidirectional update data
+                                               continue;
+                                       }
+
+                                       if let Some(info) = current_channel_info.one_to_two.as_ref() {
+                                               let flags: u8 = if info.enabled { 0 } else { 2 };
+                                               let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
+                                               current_update.serialization_update_flags = Some(flags);
+                                       }
+
+                                       if let Some(info) = current_channel_info.two_to_one.as_ref() {
+                                               let flags: u8 = if info.enabled { 1 } else { 3 };
+                                               let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
+                                               current_update.serialization_update_flags = Some(flags);
+                                       }
+                               } else {
+                                       // we don't send reminders if we don't have the channel
                                        continue;
                                }
 
-                               if let Some(info) = current_channel_info.one_to_two.as_ref() {
-                                       let flags: u8 = if info.enabled { 0 } else { 2 };
-                                       let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
-                                       current_update.serialization_update_flags = Some(flags);
-                               }
-
-                               if let Some(info) = current_channel_info.two_to_one.as_ref() {
-                                       let flags: u8 = if info.enabled { 1 } else { 3 };
-                                       let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
-                                       current_update.serialization_update_flags = Some(flags);
-                               }
-                       } else {
-                               // we don't send reminders if we don't have the channel
-                               continue;
+                               log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
                        }
-                       older_latest_directional_update_count += 1;
                }
                log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
        }
index 0119ea3b79a66875e94eb91db4e29e46e68d668b..b9306d8f54762aa66bee33cb92f515da0dbce687 100644 (file)
@@ -181,6 +181,10 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)
                                                        // we don't count flags as mutated properties
                                                        serialization_set.updates.push(
                                                                UpdateSerialization::Incremental(latest_update, mutated_properties));
+                                               } else if channel_delta.requires_reminder {
+                                                       if let Some(flags) = updates.serialization_update_flags {
+                                                               serialization_set.updates.push(UpdateSerialization::Reminder(scid, flags));
+                                                       }
                                                }
                                        } else {
                                                // serialize the full update