+
+ println!("Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
+ /// Steps:
+ /// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
+ /// — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
+ let current_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+ let reminder_threshold_timestamp = current_timestamp.saturating_sub(config::CHANNEL_REMINDER_AGE);
+ let mut channels_requiring_reminders: Vec<i64> = vec![];
+
+ let older_latest_directional_updates = client.query("
+ SELECT DISTINCT ON (short_channel_id) *
+ FROM (
+ SELECT DISTINCT ON (short_channel_id, direction) *
+ FROM channel_updates
+ WHERE short_channel_id = any($1)
+ ORDER BY short_channel_id ASC, direction ASC, seen DESC
+ ) AS directional_last_seens
+ ORDER BY short_channel_id ASC, seen ASC
+ ", &[&channel_ids]).await.unwrap();
+
+ for current_row in older_latest_directional_updates {
+ let blob: Vec<u8> = current_row.get("blob_signed");
+ let mut readable = Cursor::new(blob);
+ let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+ let scid = unsigned_update.short_channel_id;
+ let current_seen_timestamp_object: SystemTime = current_row.get("seen");
+ let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+
+ if current_seen_timestamp <= reminder_threshold_timestamp {
+ // annotate this channel as requiring that reminders be sent to the client
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+
+ // way might be able to get away with not using this
+ (*current_channel_delta).requires_reminder = true;
+
+ // get the latest seen update in both directions for this channel
+ channels_requiring_reminders.push(scid as i64);
+ }
+ }
+
+ println!("Fetching latest update data for channels requiring reminders");
+ let latest_reminder_updates = client.query("
+ SELECT DISTINCT ON (short_channel_id, direction) *
+ FROM channel_updates
+ WHERE short_channel_id = any($1)
+ ORDER BY short_channel_id ASC, direction ASC, seen DESC
+ ", &[&channels_requiring_reminders]).await.unwrap();
+
+ for current_update in latest_reminder_updates {
+ let blob: Vec<u8> = current_update.get("blob_signed");
+ let mut readable = Cursor::new(blob);
+ let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+ let scid = unsigned_update.short_channel_id;
+ let direction: bool = current_update.get("direction");
+
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+ let update_delta = if !direction {
+ (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
+ } else {
+ (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
+ };
+ update_delta.serialization_update_flags = Some(unsigned_update.flags);
+ }