From 2488d9dfb30511675e192e08517fe517a6a96388 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 8 May 2024 23:58:29 -0700 Subject: [PATCH] Detect beginning of static channel status streaks. Previously, whenever a channel hasn't received updates in 6+ days, we would automatically send incremental reminders that only contain the update flags. However, if a channel has been received updates within that six-day-timeframe, but all those updates were identical, it would result in no updates being added to the snapshot because the mutation set ended up empty and that data would get purged from the serialization. In order to avoid the reminder logic being duped by channels simply being consistent, we now look up the beginning of the latest continuous stretch of non-mutating channel updates. If a channel's details have not been altered in more than six days, we now send reminders no matter the frequency with which channel updates have been received since. --- src/lookup.rs | 138 +++++++++++++++++++++++++++++-------------- src/serialization.rs | 4 ++ 2 files changed, 97 insertions(+), 45 deletions(-) diff --git a/src/lookup.rs b/src/lookup.rs index 0878860..fbc1672 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -31,10 +31,15 @@ pub(super) struct UpdateDelta { } pub(super) struct DirectedUpdateDelta { + /// the last update we saw prior to the user-provided timestamp pub(super) last_update_before_seen: Option, - pub(super) mutated_properties: MutatedProperties, + /// the latest update we saw overall pub(super) latest_update_after_seen: Option, - pub(super) serialization_update_flags: Option, + /// the set of all mutated properties across all updates between the last seen by the user and + /// the latest one known to us + pub(super) mutated_properties: MutatedProperties, + /// Specifically for reminder updates, the flag-only value to send to the client + pub(super) serialization_update_flags: Option } pub(super) struct ChannelDelta { @@ -166,56 +171,99 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction) let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64; - let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = - [&channel_ids, &reminder_threshold_timestamp]; - let older_latest_directional_updates = client.query_raw(" - SELECT short_channel_id FROM ( - SELECT DISTINCT ON (short_channel_id) * - FROM ( - SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen - FROM channel_updates - WHERE short_channel_id = any($1) - ORDER BY short_channel_id ASC, direction ASC, seen DESC - ) AS directional_last_seens - ORDER BY short_channel_id ASC, seen ASC - ) AS distinct_chans - WHERE distinct_chans.seen <= TO_TIMESTAMP($2) - ", params).await.unwrap(); - let mut pinned_updates = Box::pin(older_latest_directional_updates); - + log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)"); + let reminder_lookup_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE * 3).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64; + let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp]; + + /* + What exactly is the below query doing? + + First, the inner query groups all channel updates by their scid/direction combination, + and then sorts those in reverse chronological order by the "seen" column. + + Then, each row is annotated based on whether its subsequent row for the same scid/direction + combination has a different value for any one of these six fields: + disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat + Those are simply the properties we use to keep track of channel mutations. + + The outer query takes all of those results and selects the first value that has a distinct + successor for each scid/direction combination. That yields the first instance at which + a given channel configuration was received after any prior mutations. + + Knowing that, we can check whether or not there have been any mutations within the + reminder requirement window. Because we only care about that window (and potentially the + 2-week-window), we pre-filter the scanned updates by only those that were received within + 3x the timeframe that we consider necessitates reminders. + */ + + let mutated_updates = client.query_raw(" + SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM ( + SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE ( + disable<>lead(disable) OVER w1 + OR + cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1 + OR + htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1 + OR + fee_base_msat<>lead(fee_base_msat) OVER w1 + OR + fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1 + OR + htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1, + TRUE + ) has_distinct_successor + FROM channel_updates + WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2) + WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC) + ) _ + WHERE has_distinct_successor + ORDER BY short_channel_id ASC, direction ASC, timestamp DESC + ", params).await.unwrap(); + + let mut pinned_updates = Box::pin(mutated_updates); let mut older_latest_directional_update_count = 0; while let Some(row_res) = pinned_updates.next().await { let current_row = row_res.unwrap(); - let scid: i64 = current_row.get("short_channel_id"); - - // annotate this channel as requiring that reminders be sent to the client - let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default()); - - // way might be able to get away with not using this - (*current_channel_delta).requires_reminder = true; - - if let Some(current_channel_info) = network_graph.read_only().channel(scid as u64) { - if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() { - // we don't send reminders if we don't have bidirectional update data + let seen = current_row.get::<_, i64>("seen") as u32; + + if seen < reminder_threshold_timestamp as u32 { + let blob: Vec = current_row.get("blob_signed"); + let mut readable = Cursor::new(blob); + let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; + + let scid = unsigned_channel_update.short_channel_id; + let direction: bool = current_row.get("direction"); + + let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default()); + + // We might be able to get away with not using this + (*current_channel_delta).requires_reminder = true; + older_latest_directional_update_count += 1; + + if let Some(current_channel_info) = network_graph.read_only().channel(scid) { + if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() { + // we don't send reminders if we don't have bidirectional update data + continue; + } + + if let Some(info) = current_channel_info.one_to_two.as_ref() { + let flags: u8 = if info.enabled { 0 } else { 2 }; + let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default()); + current_update.serialization_update_flags = Some(flags); + } + + if let Some(info) = current_channel_info.two_to_one.as_ref() { + let flags: u8 = if info.enabled { 1 } else { 3 }; + let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()); + current_update.serialization_update_flags = Some(flags); + } + } else { + // we don't send reminders if we don't have the channel continue; } - if let Some(info) = current_channel_info.one_to_two.as_ref() { - let flags: u8 = if info.enabled { 0 } else { 2 }; - let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default()); - current_update.serialization_update_flags = Some(flags); - } - - if let Some(info) = current_channel_info.two_to_one.as_ref() { - let flags: u8 = if info.enabled { 1 } else { 3 }; - let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()); - current_update.serialization_update_flags = Some(flags); - } - } else { - // we don't send reminders if we don't have the channel - continue; + log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction); } - older_latest_directional_update_count += 1; } log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count); } diff --git a/src/serialization.rs b/src/serialization.rs index 0119ea3..b9306d8 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -181,6 +181,10 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) // we don't count flags as mutated properties serialization_set.updates.push( UpdateSerialization::Incremental(latest_update, mutated_properties)); + } else if channel_delta.requires_reminder { + if let Some(flags) = updates.serialization_update_flags { + serialization_set.updates.push(UpdateSerialization::Reminder(scid, flags)); + } } } else { // serialize the full update -- 2.39.5