Include old updates when necessary.
[rapid-gossip-sync-server] / src / serialization.rs
index 847c754f8cd209d59228650657a876321677f85f..7e58b02e5d587a8a2e63a3e41712ac67feb17770 100644 (file)
@@ -1,9 +1,12 @@
 use std::cmp::max;
 use std::collections::HashMap;
+use std::time::{SystemTime, UNIX_EPOCH};
 
 use bitcoin::BlockHash;
+use bitcoin::hashes::Hash;
 use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate};
 use lightning::util::ser::{BigSize, Writeable};
+use crate::config;
 
 use crate::lookup::{DeltaSet, DirectedUpdateDelta};
 
@@ -57,11 +60,6 @@ impl Default for MutatedProperties {
        }
 }
 
-pub(super) struct UpdateSerialization {
-       pub(super) update: UnsignedChannelUpdate,
-       pub(super) mechanism: UpdateSerializationMechanism,
-}
-
 impl MutatedProperties {
        /// Does not include flags because the flag byte is always sent in full
        fn len(&self) -> u8 {
@@ -75,9 +73,27 @@ impl MutatedProperties {
        }
 }
 
-pub(super) enum UpdateSerializationMechanism {
-       Full,
-       Incremental(MutatedProperties),
+pub(super) enum UpdateSerialization {
+       Full(UnsignedChannelUpdate),
+       Incremental(UnsignedChannelUpdate, MutatedProperties),
+       Reminder(u64, u8),
+}
+impl UpdateSerialization {
+       pub(super) fn scid(&self) -> u64 {
+               match self {
+                       UpdateSerialization::Full(latest_update)|
+                       UpdateSerialization::Incremental(latest_update, _) => latest_update.short_channel_id,
+                       UpdateSerialization::Reminder(scid, _) => *scid,
+               }
+       }
+
+       fn flags(&self) -> u8 {
+               match self {
+                       UpdateSerialization::Full(latest_update)|
+                       UpdateSerialization::Incremental(latest_update, _) => latest_update.flags,
+                       UpdateSerialization::Reminder(_, flags) => *flags,
+               }
+       }
 }
 
 struct FullUpdateValueHistograms {
@@ -93,7 +109,7 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)
                announcements: vec![],
                updates: vec![],
                full_update_defaults: Default::default(),
-               chain_hash: Default::default(),
+               chain_hash: BlockHash::all_zeros(),
                latest_seen: 0,
        };
 
@@ -115,7 +131,10 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)
                *full_update_histograms.htlc_maximum_msat.entry(full_update.htlc_maximum_msat).or_insert(0) += 1;
        };
 
-       for (_scid, channel_delta) in delta_set.into_iter() {
+       // if the previous seen update happened more than 6 days ago, the client may have pruned it, and an incremental update wouldn't work
+       let non_incremental_previous_update_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as u32;
+
+       for (scid, channel_delta) in delta_set.into_iter() {
 
                // any announcement chain hash is gonna be the same value. Just set it from the first one.
                let channel_announcement_delta = channel_delta.announcement.as_ref().unwrap();
@@ -126,12 +145,12 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)
 
                let current_announcement_seen = channel_announcement_delta.seen;
                let is_new_announcement = current_announcement_seen >= last_sync_timestamp;
-               let is_newly_updated_announcement = if let Some(first_update_seen) = channel_delta.first_update_seen {
+               let is_newly_included_announcement = if let Some(first_update_seen) = channel_delta.first_bidirectional_updates_seen {
                        first_update_seen >= last_sync_timestamp
                } else {
                        false
                };
-               let send_announcement = is_new_announcement || is_newly_updated_announcement;
+               let send_announcement = is_new_announcement || is_newly_included_announcement;
                if send_announcement {
                        serialization_set.latest_seen = max(serialization_set.latest_seen, current_announcement_seen);
                        serialization_set.announcements.push(channel_delta.announcement.unwrap().announcement);
@@ -144,39 +163,38 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)
                        if let Some(updates) = directed_updates {
                                if let Some(latest_update_delta) = updates.latest_update_after_seen {
                                        let latest_update = latest_update_delta.update;
+                                       assert_eq!(latest_update.short_channel_id, scid, "Update in DB had wrong SCID column");
 
                                        // the returned seen timestamp should be the latest of all the returned
                                        // announcements and latest updates
                                        serialization_set.latest_seen = max(serialization_set.latest_seen, latest_update_delta.seen);
 
-                                       if updates.last_update_before_seen.is_some() {
+                                       if let Some(update_delta) = updates.last_update_before_seen {
                                                let mutated_properties = updates.mutated_properties;
-                                               if mutated_properties.len() == 5 {
+                                               if send_announcement || mutated_properties.len() == 5 || update_delta.seen <= non_incremental_previous_update_threshold_timestamp {
                                                        // all five values have changed, it makes more sense to just
                                                        // serialize the update as a full update instead of as a change
                                                        // this way, the default values can be computed more efficiently
                                                        record_full_update_in_histograms(&latest_update);
-                                                       serialization_set.updates.push(UpdateSerialization {
-                                                               update: latest_update,
-                                                               mechanism: UpdateSerializationMechanism::Full,
-                                                       });
+                                                       serialization_set.updates.push(UpdateSerialization::Full(latest_update));
                                                } else if mutated_properties.len() > 0 || mutated_properties.flags {
                                                        // we don't count flags as mutated properties
-                                                       serialization_set.updates.push(UpdateSerialization {
-                                                               update: latest_update,
-                                                               mechanism: UpdateSerializationMechanism::Incremental(mutated_properties),
-                                                       });
+                                                       serialization_set.updates.push(
+                                                               UpdateSerialization::Incremental(latest_update, mutated_properties));
                                                }
                                        } else {
                                                // serialize the full update
                                                record_full_update_in_histograms(&latest_update);
-                                               serialization_set.updates.push(UpdateSerialization {
-                                                       update: latest_update,
-                                                       mechanism: UpdateSerializationMechanism::Full,
-                                               });
+                                               serialization_set.updates.push(UpdateSerialization::Full(latest_update));
                                        }
+                               } else if is_newly_included_announcement {
+                                       if let Some(unannounced_update) = updates.last_update_before_seen {
+                                               serialization_set.updates.push(UpdateSerialization::Full(unannounced_update.update));
+                                       }
+                               } else if let Some(flags) = updates.serialization_update_flags {
+                                       serialization_set.updates.push(UpdateSerialization::Reminder(scid, flags));
                                }
-                       };
+                       }
                };
 
                categorize_directed_update_serialization(direction_a_updates);
@@ -215,18 +233,17 @@ pub fn serialize_stripped_channel_announcement(announcement: &UnsignedChannelAnn
 }
 
 pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, default_values: &DefaultUpdateValues, previous_scid: u64) -> Vec<u8> {
-       let latest_update = &update.update;
-       let mut serialized_flags = latest_update.flags;
+       let mut serialized_flags = update.flags();
 
-       if previous_scid > latest_update.short_channel_id {
+       if previous_scid > update.scid() {
                panic!("unsorted scids!");
        }
 
        let mut delta_serialization = Vec::new();
        let mut prefixed_serialization = Vec::new();
 
-       match &update.mechanism {
-               UpdateSerializationMechanism::Full => {
+       match update {
+               UpdateSerialization::Full(latest_update) => {
                        if latest_update.cltv_expiry_delta != default_values.cltv_expiry_delta {
                                serialized_flags |= 0b_0100_0000;
                                latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap();
@@ -252,8 +269,7 @@ pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, de
                                latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
                        }
                }
-
-               UpdateSerializationMechanism::Incremental(mutated_properties) => {
+               UpdateSerialization::Incremental(latest_update, mutated_properties) => {
                        // indicate that this update is incremental
                        serialized_flags |= 0b_1000_0000;
 
@@ -281,9 +297,13 @@ pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, de
                                serialized_flags |= 0b_0000_0100;
                                latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
                        }
+               },
+               UpdateSerialization::Reminder(_, _) => {
+                       // indicate that this update is incremental
+                       serialized_flags |= 0b_1000_0000;
                }
        }
-       let scid_delta = BigSize(latest_update.short_channel_id - previous_scid);
+       let scid_delta = BigSize(update.scid() - previous_scid);
        scid_delta.write(&mut prefixed_serialization).unwrap();
 
        serialized_flags.write(&mut prefixed_serialization).unwrap();