+ if include_reminders {
+ // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
+
+ log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
+ // Steps:
+ // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
+ // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
+ let reminder_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as f64;
+
+ log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
+ let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs() * 3).unwrap() as f64;
+ let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
+
+ /*
+ What exactly is the below query doing?
+
+ First, the inner query groups all channel updates by their scid/direction combination,
+ and then sorts those in reverse chronological order by the "seen" column.
+
+ Then, each row is annotated based on whether its subsequent row for the same scid/direction
+ combination has a different value for any one of these six fields:
+ disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
+ Those are simply the properties we use to keep track of channel mutations.
+
+ The outer query takes all of those results and selects the first value that has a distinct
+ successor for each scid/direction combination. That yields the first instance at which
+ a given channel configuration was received after any prior mutations.
+
+ Knowing that, we can check whether or not there have been any mutations within the
+ reminder requirement window. Because we only care about that window (and potentially the
+ 2-week-window), we pre-filter the scanned updates by only those that were received within
+ 3x the timeframe that we consider necessitates reminders.
+ */
+
+ let mutated_updates = client.query_raw("
+ SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
+ SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
+ disable<>lead(disable) OVER w1
+ OR
+ cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
+ OR
+ htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
+ OR
+ fee_base_msat<>lead(fee_base_msat) OVER w1
+ OR
+ fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
+ OR
+ htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
+ TRUE
+ ) has_distinct_successor
+ FROM channel_updates
+ WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
+ WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
+ ) _
+ WHERE has_distinct_successor
+ ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
+ ", params).await.unwrap();
+
+ let mut pinned_updates = Box::pin(mutated_updates);
+ let mut older_latest_directional_update_count = 0;
+ while let Some(row_res) = pinned_updates.next().await {
+ let current_row = row_res.unwrap();
+ let seen = current_row.get::<_, i64>("seen") as u32;
+
+ if seen < reminder_threshold_timestamp as u32 {
+ let blob: Vec<u8> = current_row.get("blob_signed");
+ let mut readable = Cursor::new(blob);
+ let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+
+ let scid = unsigned_channel_update.short_channel_id;
+ let direction: bool = current_row.get("direction");
+
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+
+ // We might be able to get away with not using this
+ (*current_channel_delta).requires_reminder = true;
+ older_latest_directional_update_count += 1;
+
+ if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
+ if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
+ // we don't send reminders if we don't have bidirectional update data
+ continue;
+ }
+
+ if let Some(info) = current_channel_info.one_to_two.as_ref() {
+ let flags: u8 = if info.enabled { 0 } else { 2 };
+ let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
+ current_update.serialization_update_flags = Some(flags);
+ }
+
+ if let Some(info) = current_channel_info.two_to_one.as_ref() {
+ let flags: u8 = if info.enabled { 1 } else { 3 };
+ let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
+ current_update.serialization_update_flags = Some(flags);
+ }
+ } else {
+ // we don't send reminders if we don't have the channel
+ continue;
+ }
+
+ log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
+ }
+ }
+ log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);