Merge pull request #57 from arik-so/2023/08/old-incremental-update-fix
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Wed, 6 Sep 2023 22:16:26 +0000 (22:16 +0000)
committerGitHub <noreply@github.com>
Wed, 6 Sep 2023 22:16:26 +0000 (22:16 +0000)
Send full updates after old last seen updates.

src/lookup.rs
src/serialization.rs

index 696b4d041ab508b53c7074884a63b89047d4957a..8def6f13750fa5cf556ac6c1a1106b6d4b70635a 100644 (file)
@@ -10,7 +10,7 @@ use lightning::util::ser::Readable;
 use tokio_postgres::Client;
 
 use futures::StreamExt;
-use lightning::log_info;
+use lightning::{log_gossip, log_info};
 use lightning::util::logger::Logger;
 
 use crate::config;
@@ -31,7 +31,7 @@ pub(super) struct UpdateDelta {
 }
 
 pub(super) struct DirectedUpdateDelta {
-       pub(super) last_update_before_seen: Option<UnsignedChannelUpdate>,
+       pub(super) last_update_before_seen: Option<UpdateDelta>,
        pub(super) mutated_properties: MutatedProperties,
        pub(super) latest_update_after_seen: Option<UpdateDelta>,
        pub(super) serialization_update_flags: Option<u8>,
@@ -229,7 +229,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
        // there was an update in either direction that happened after the last sync (to avoid
        // collecting too many reference updates)
        let reference_rows = client.query_raw("
-               SELECT id, direction, blob_signed FROM channel_updates
+               SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
                WHERE id IN (
                        SELECT DISTINCT ON (short_channel_id, direction) id
                        FROM channel_updates
@@ -256,6 +256,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
                non_intermediate_ids.insert(update_id);
 
                let direction: bool = current_reference.get("direction");
+               let seen = current_reference.get::<_, i64>("seen") as u32;
                let blob: Vec<u8> = current_reference.get("blob_signed");
                let mut readable = Cursor::new(blob);
                let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
@@ -267,7 +268,12 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
                } else {
                        (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
                };
-               update_delta.last_update_before_seen = Some(unsigned_channel_update);
+               log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
+               update_delta.last_update_before_seen = Some(UpdateDelta {
+                       seen,
+                       update: unsigned_channel_update,
+               });
+
                reference_row_count += 1;
        }
 
@@ -339,22 +345,22 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
 
                // determine mutations
                if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
-                       if unsigned_channel_update.flags != last_seen_update.flags {
+                       if unsigned_channel_update.flags != last_seen_update.update.flags {
                                update_delta.mutated_properties.flags = true;
                        }
-                       if unsigned_channel_update.cltv_expiry_delta != last_seen_update.cltv_expiry_delta {
+                       if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
                                update_delta.mutated_properties.cltv_expiry_delta = true;
                        }
-                       if unsigned_channel_update.htlc_minimum_msat != last_seen_update.htlc_minimum_msat {
+                       if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
                                update_delta.mutated_properties.htlc_minimum_msat = true;
                        }
-                       if unsigned_channel_update.fee_base_msat != last_seen_update.fee_base_msat {
+                       if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
                                update_delta.mutated_properties.fee_base_msat = true;
                        }
-                       if unsigned_channel_update.fee_proportional_millionths != last_seen_update.fee_proportional_millionths {
+                       if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
                                update_delta.mutated_properties.fee_proportional_millionths = true;
                        }
-                       if unsigned_channel_update.htlc_maximum_msat != last_seen_update.htlc_maximum_msat {
+                       if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
                                update_delta.mutated_properties.htlc_maximum_msat = true;
                        }
                }
index 86e114bdb8466e9b29a8164b098eb3036aa57794..93bd381d39e4b2291b54061c0f9913cc7ae39eb8 100644 (file)
@@ -1,10 +1,12 @@
 use std::cmp::max;
 use std::collections::HashMap;
+use std::time::{SystemTime, UNIX_EPOCH};
 
 use bitcoin::BlockHash;
 use bitcoin::hashes::Hash;
 use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate};
 use lightning::util::ser::{BigSize, Writeable};
+use crate::config;
 
 use crate::lookup::{DeltaSet, DirectedUpdateDelta};
 
@@ -129,6 +131,9 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)
                *full_update_histograms.htlc_maximum_msat.entry(full_update.htlc_maximum_msat).or_insert(0) += 1;
        };
 
+       // if the previous seen update happened more than 6 days ago, the client may have pruned it, and an incremental update wouldn't work
+       let non_incremental_previous_update_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as u32;
+
        for (scid, channel_delta) in delta_set.into_iter() {
 
                // any announcement chain hash is gonna be the same value. Just set it from the first one.
@@ -164,9 +169,9 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)
                                        // announcements and latest updates
                                        serialization_set.latest_seen = max(serialization_set.latest_seen, latest_update_delta.seen);
 
-                                       if updates.last_update_before_seen.is_some() {
+                                       if let Some(update_delta) = updates.last_update_before_seen {
                                                let mutated_properties = updates.mutated_properties;
-                                               if mutated_properties.len() == 5 || send_announcement {
+                                               if send_announcement || mutated_properties.len() == 5 || update_delta.seen <= non_incremental_previous_update_threshold_timestamp {
                                                        // all five values have changed, it makes more sense to just
                                                        // serialize the update as a full update instead of as a change
                                                        // this way, the default values can be computed more efficiently