From cbf7897549f540ca3017dced125d9b1202b61dad Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Mon, 28 Aug 2023 18:01:03 -0700 Subject: [PATCH] Send full updates after old last seen updates. Previously, whenever we saw that there was a previous update that a client would have seen, we simply calculated the delta set based on which properties have changed, and would most likely send an incremental update set (excepting the case of a new or newly sent announcement, in which case all sent updates are full). However, if the last seen update was old, and there's a chance that a user may have run RGS since, it is possible that due to the 7-day-backdating-mechanism included on the client, the reference update would no longer be present. To fix that, anytime we see that a last seen update is more than six days old, we automatically include a full update. --- src/lookup.rs | 26 ++++++++++++++++---------- src/serialization.rs | 9 +++++++-- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/lookup.rs b/src/lookup.rs index 696b4d0..8def6f1 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -10,7 +10,7 @@ use lightning::util::ser::Readable; use tokio_postgres::Client; use futures::StreamExt; -use lightning::log_info; +use lightning::{log_gossip, log_info}; use lightning::util::logger::Logger; use crate::config; @@ -31,7 +31,7 @@ pub(super) struct UpdateDelta { } pub(super) struct DirectedUpdateDelta { - pub(super) last_update_before_seen: Option, + pub(super) last_update_before_seen: Option, pub(super) mutated_properties: MutatedProperties, pub(super) latest_update_after_seen: Option, pub(super) serialization_update_flags: Option, @@ -229,7 +229,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl // there was an update in either direction that happened after the last sync (to avoid // collecting too many reference updates) let reference_rows = client.query_raw(" - SELECT id, direction, blob_signed FROM channel_updates + SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates WHERE id IN ( SELECT DISTINCT ON (short_channel_id, direction) id FROM channel_updates @@ -256,6 +256,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl non_intermediate_ids.insert(update_id); let direction: bool = current_reference.get("direction"); + let seen = current_reference.get::<_, i64>("seen") as u32; let blob: Vec = current_reference.get("blob_signed"); let mut readable = Cursor::new(blob); let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; @@ -267,7 +268,12 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl } else { (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()) }; - update_delta.last_update_before_seen = Some(unsigned_channel_update); + log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp); + update_delta.last_update_before_seen = Some(UpdateDelta { + seen, + update: unsigned_channel_update, + }); + reference_row_count += 1; } @@ -339,22 +345,22 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl // determine mutations if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() { - if unsigned_channel_update.flags != last_seen_update.flags { + if unsigned_channel_update.flags != last_seen_update.update.flags { update_delta.mutated_properties.flags = true; } - if unsigned_channel_update.cltv_expiry_delta != last_seen_update.cltv_expiry_delta { + if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta { update_delta.mutated_properties.cltv_expiry_delta = true; } - if unsigned_channel_update.htlc_minimum_msat != last_seen_update.htlc_minimum_msat { + if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat { update_delta.mutated_properties.htlc_minimum_msat = true; } - if unsigned_channel_update.fee_base_msat != last_seen_update.fee_base_msat { + if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat { update_delta.mutated_properties.fee_base_msat = true; } - if unsigned_channel_update.fee_proportional_millionths != last_seen_update.fee_proportional_millionths { + if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths { update_delta.mutated_properties.fee_proportional_millionths = true; } - if unsigned_channel_update.htlc_maximum_msat != last_seen_update.htlc_maximum_msat { + if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat { update_delta.mutated_properties.htlc_maximum_msat = true; } } diff --git a/src/serialization.rs b/src/serialization.rs index 86e114b..93bd381 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -1,10 +1,12 @@ use std::cmp::max; use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::BlockHash; use bitcoin::hashes::Hash; use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate}; use lightning::util::ser::{BigSize, Writeable}; +use crate::config; use crate::lookup::{DeltaSet, DirectedUpdateDelta}; @@ -129,6 +131,9 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) *full_update_histograms.htlc_maximum_msat.entry(full_update.htlc_maximum_msat).or_insert(0) += 1; }; + // if the previous seen update happened more than 6 days ago, the client may have pruned it, and an incremental update wouldn't work + let non_incremental_previous_update_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as u32; + for (scid, channel_delta) in delta_set.into_iter() { // any announcement chain hash is gonna be the same value. Just set it from the first one. @@ -164,9 +169,9 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) // announcements and latest updates serialization_set.latest_seen = max(serialization_set.latest_seen, latest_update_delta.seen); - if updates.last_update_before_seen.is_some() { + if let Some(update_delta) = updates.last_update_before_seen { let mutated_properties = updates.mutated_properties; - if mutated_properties.len() == 5 || send_announcement { + if send_announcement || mutated_properties.len() == 5 || update_delta.seen <= non_incremental_previous_update_threshold_timestamp { // all five values have changed, it makes more sense to just // serialize the update as a full update instead of as a change // this way, the default values can be computed more efficiently -- 2.39.5