pub(super) struct DirectedUpdateDelta {
+ /// the last update we saw prior to the user-provided timestamp
pub(super) last_update_before_seen: Option<UpdateDelta>,
- pub(super) mutated_properties: MutatedProperties,
+ /// the latest update we saw overall
pub(super) latest_update_after_seen: Option<UpdateDelta>,
- pub(super) serialization_update_flags: Option<u8>,
+ /// the set of all mutated properties across all updates between the last seen by the user and
+ /// the latest one known to us
+ pub(super) mutated_properties: MutatedProperties,
+ /// Specifically for reminder updates, the flag-only value to send to the client
+ pub(super) serialization_update_flags: Option<u8>
pub(super) struct ChannelDelta {
// — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
- let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
- [&channel_ids, &reminder_threshold_timestamp];
- let older_latest_directional_updates = client.query_raw("
- SELECT short_channel_id FROM (
- SELECT DISTINCT ON (short_channel_id) *
- FROM (
- SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
- FROM channel_updates
- WHERE short_channel_id = any($1)
- ORDER BY short_channel_id ASC, direction ASC, seen DESC
- ) AS directional_last_seens
- ORDER BY short_channel_id ASC, seen ASC
- ) AS distinct_chans
- WHERE distinct_chans.seen <= TO_TIMESTAMP($2)
- ", params).await.unwrap();
- let mut pinned_updates = Box::pin(older_latest_directional_updates);
+ log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
+ let reminder_lookup_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE * 3).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
+ let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
+ /*
+ What exactly is the below query doing?
+ First, the inner query groups all channel updates by their scid/direction combination,
+ and then sorts those in reverse chronological order by the "seen" column.
+ Then, each row is annotated based on whether its subsequent row for the same scid/direction
+ combination has a different value for any one of these six fields:
+ disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
+ Those are simply the properties we use to keep track of channel mutations.
+ The outer query takes all of those results and selects the first value that has a distinct
+ successor for each scid/direction combination. That yields the first instance at which
+ a given channel configuration was received after any prior mutations.
+ Knowing that, we can check whether or not there have been any mutations within the
+ reminder requirement window. Because we only care about that window (and potentially the
+ 2-week-window), we pre-filter the scanned updates by only those that were received within
+ 3x the timeframe that we consider necessitates reminders.
+ */
+ let mutated_updates = client.query_raw("
+ SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
+ SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
+ disable<>lead(disable) OVER w1
+ OR
+ cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
+ OR
+ htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
+ OR
+ fee_base_msat<>lead(fee_base_msat) OVER w1
+ OR
+ fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
+ OR
+ htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
+ ) has_distinct_successor
+ FROM channel_updates
+ WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
+ WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
+ ) _
+ WHERE has_distinct_successor
+ ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
+ ", params).await.unwrap();
+ let mut pinned_updates = Box::pin(mutated_updates);
let mut older_latest_directional_update_count = 0;
while let Some(row_res) = pinned_updates.next().await {
let current_row = row_res.unwrap();
- let scid: i64 = current_row.get("short_channel_id");
- // annotate this channel as requiring that reminders be sent to the client
- let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
- // way might be able to get away with not using this
- (*current_channel_delta).requires_reminder = true;
- if let Some(current_channel_info) = network_graph.read_only().channel(scid as u64) {
- if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
- // we don't send reminders if we don't have bidirectional update data
+ let seen = current_row.get::<_, i64>("seen") as u32;
+ if seen < reminder_threshold_timestamp as u32 {
+ let blob: Vec<u8> = current_row.get("blob_signed");
+ let mut readable = Cursor::new(blob);
+ let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+ let scid = unsigned_channel_update.short_channel_id;
+ let direction: bool = current_row.get("direction");
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+ // We might be able to get away with not using this
+ (*current_channel_delta).requires_reminder = true;
+ older_latest_directional_update_count += 1;
+ if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
+ if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
+ // we don't send reminders if we don't have bidirectional update data
+ continue;
+ }
+ if let Some(info) = current_channel_info.one_to_two.as_ref() {
+ let flags: u8 = if info.enabled { 0 } else { 2 };
+ let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
+ current_update.serialization_update_flags = Some(flags);
+ }
+ if let Some(info) = current_channel_info.two_to_one.as_ref() {
+ let flags: u8 = if info.enabled { 1 } else { 3 };
+ let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
+ current_update.serialization_update_flags = Some(flags);
+ }
+ } else {
+ // we don't send reminders if we don't have the channel
- if let Some(info) = current_channel_info.one_to_two.as_ref() {
- let flags: u8 = if info.enabled { 0 } else { 2 };
- let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
- current_update.serialization_update_flags = Some(flags);
- }
- if let Some(info) = current_channel_info.two_to_one.as_ref() {
- let flags: u8 = if info.enabled { 1 } else { 3 };
- let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
- current_update.serialization_update_flags = Some(flags);
- }
- } else {
- // we don't send reminders if we don't have the channel
- continue;
+ log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
- older_latest_directional_update_count += 1;
log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);