Merge pull request #63 from arik-so/2023/10/ordering_fix
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Thu, 19 Oct 2023 19:50:38 +0000 (19:50 +0000)
committerGitHub <noreply@github.com>
Thu, 19 Oct 2023 19:50:38 +0000 (19:50 +0000)
Fix update recency issue with expanded test coverage

Cargo.toml
src/config.rs
src/downloader.rs
src/lookup.rs
src/persistence.rs
src/snapshot.rs
src/tests/mod.rs
src/types.rs

index c352d0b4dc75ef67345cd526de71267e61031f9c..f6d05157d9d1bd2c934b35d8b1408c344024569a 100644 (file)
@@ -13,6 +13,7 @@ tokio-postgres = { version = "=0.7.5" }
 futures = "0.3"
 
 [dev-dependencies]
+lightning = { version = "0.0.117", features = ["_test_utils"] }
 lightning-rapid-gossip-sync = { version = "0.0.117" }
 
 [profile.dev]
index 0655aa7d48f9013f2d6171d06e4c83eeb1d99bfa..026e09e26aa9288636b533eae07290484ea0f965 100644 (file)
@@ -15,7 +15,7 @@ use lightning::util::ser::Readable;
 use lightning_block_sync::http::HttpEndpoint;
 use tokio_postgres::Config;
 
-pub(crate) const SCHEMA_VERSION: i32 = 12;
+pub(crate) const SCHEMA_VERSION: i32 = 13;
 pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
 pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
 // generate symlinks based on a 3-hour-granularity
@@ -143,7 +143,7 @@ pub(crate) fn db_index_creation_query() -> &'static str {
        CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_desc_with_id ON channel_updates(short_channel_id ASC, direction ASC, seen DESC) INCLUDE (id);
        CREATE UNIQUE INDEX IF NOT EXISTS channel_updates_key ON channel_updates (short_channel_id, direction, timestamp);
        CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen);
-       CREATE INDEX IF NOT EXISTS channel_updates_timestamp_desc ON channel_updates(timestamp DESC);
+       CREATE INDEX IF NOT EXISTS channel_updates_scid_asc_timestamp_desc ON channel_updates(short_channel_id ASC, timestamp DESC);
        "
 }
 
@@ -282,6 +282,12 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
                tx.execute("UPDATE config SET db_schema = 12 WHERE id = 1", &[]).await.unwrap();
                tx.commit().await.unwrap();
        }
+       if schema >= 1 && schema <= 12 {
+               let tx = client.transaction().await.unwrap();
+               tx.execute("DROP INDEX IF EXISTS channel_updates_timestamp_desc", &[]).await.unwrap();
+               tx.execute("UPDATE config SET db_schema = 13 WHERE id = 1", &[]).await.unwrap();
+               tx.commit().await.unwrap();
+       }
        if schema <= 1 || schema > SCHEMA_VERSION {
                panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
        }
index 8d5cdb0005e7d8e3ea74f5036ee6c20220a9729e..af854c8317d059444b9ee2df6598bb1edbeeccb7 100644 (file)
@@ -62,7 +62,7 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
                        counter.channel_announcements += 1;
                }
 
-               let gossip_message = GossipMessage::ChannelAnnouncement(msg);
+               let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
                        tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
@@ -73,7 +73,7 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
 
        fn new_channel_update(&self, msg: ChannelUpdate) {
                self.counter.write().unwrap().channel_updates += 1;
-               let gossip_message = GossipMessage::ChannelUpdate(msg);
+               let gossip_message = GossipMessage::ChannelUpdate(msg, None);
 
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
index 8def6f13750fa5cf556ac6c1a1106b6d4b70635a..0878860020a61d373d311f6cff150d101c389c48 100644 (file)
@@ -288,7 +288,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
                SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
                FROM channel_updates
                WHERE seen >= TO_TIMESTAMP($1)
-               ORDER BY timestamp DESC
+               ORDER BY short_channel_id ASC, timestamp DESC
                ", [last_sync_timestamp_float]).await.unwrap();
        let mut pinned_updates = Box::pin(intermediate_updates);
        log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
index 8bb7b188af72fb692aa218a20b1ede9bd24c116a..7b451b7ec252970050d908f231603730b13fd50a 100644 (file)
@@ -111,7 +111,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                        }
 
                        match &gossip_message {
-                               GossipMessage::ChannelAnnouncement(announcement) => {
+                               GossipMessage::ChannelAnnouncement(announcement, _) => {
                                        let scid = announcement.contents.short_channel_id as i64;
 
                                        // start with the type prefix, which is already known a priori
@@ -127,7 +127,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                                                        &announcement_signed
                                                ])).await.unwrap().unwrap();
                                }
-                               GossipMessage::ChannelUpdate(update) => {
+                               GossipMessage::ChannelUpdate(update, seen_override) => {
                                        let scid = update.contents.short_channel_id as i64;
 
                                        let timestamp = update.contents.timestamp as i64;
@@ -146,10 +146,11 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                                        let mut update_signed = Vec::new();
                                        update.write(&mut update_signed).unwrap();
 
-                                       tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
-                                               .execute("INSERT INTO channel_updates (\
+                                       let insertion_statement = if cfg!(test) {
+                                               "INSERT INTO channel_updates (\
                                                        short_channel_id, \
                                                        timestamp, \
+                                                       seen, \
                                                        channel_flags, \
                                                        direction, \
                                                        disable, \
@@ -159,9 +160,32 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                                                        fee_proportional_millionths, \
                                                        htlc_maximum_msat, \
                                                        blob_signed \
-                                               ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)  ON CONFLICT DO NOTHING", &[
+                                               ) VALUES ($1, $2, TO_TIMESTAMP($3), $4, $5, $6, $7, $8, $9, $10, $11, $12)  ON CONFLICT DO NOTHING"
+                                       } else {
+                                               "INSERT INTO channel_updates (\
+                                                       short_channel_id, \
+                                                       timestamp, \
+                                                       channel_flags, \
+                                                       direction, \
+                                                       disable, \
+                                                       cltv_expiry_delta, \
+                                                       htlc_minimum_msat, \
+                                                       fee_base_msat, \
+                                                       fee_proportional_millionths, \
+                                                       htlc_maximum_msat, \
+                                                       blob_signed \
+                                               ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)  ON CONFLICT DO NOTHING"
+                                       };
+
+                                       // this may not be used outside test cfg
+                                       let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64;
+
+                                       tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
+                                               .execute(insertion_statement, &[
                                                        &scid,
                                                        &timestamp,
+                                                       #[cfg(test)]
+                                                               &_seen_timestamp,
                                                        &(update.contents.flags as i16),
                                                        &direction,
                                                        &disable,
index 218285652868b56dac11dacedf3cf3c843d10510..07427a6c6d94d7f69fd24d5d90bf70898191463f 100644 (file)
@@ -41,141 +41,9 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                        }
                }
 
-               let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
-               let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
-               let finalized_snapshot_directory = format!("{}/snapshots", cache_path());
-               let finalized_symlink_directory = format!("{}/symlinks", cache_path());
-               let relative_symlink_to_snapshot_path = "../snapshots";
-
                // this is gonna be a never-ending background job
                loop {
-                       // 1. get the current timestamp
-                       let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-                       let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64);
-                       log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
-
-                       // 2. sleep until the next round interval
-                       // 3. refresh all snapshots
-
-                       // the stored snapshots should adhere to the following format
-                       // from one day ago
-                       // from two days ago
-                       // …
-                       // from a week ago
-                       // from two weeks ago
-                       // from three weeks ago
-                       // full
-                       // That means that at any given moment, there should only ever be
-                       // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
-                       // The snapshots, unlike dynamic updates, should account for all intermediate
-                       // channel updates
-
-                       // purge and recreate the pending directories
-                       if fs::metadata(&pending_snapshot_directory).is_ok() {
-                               fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
-                       }
-                       if fs::metadata(&pending_symlink_directory).is_ok() {
-                               fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
-                       }
-                       fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
-                       fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
-
-                       let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
-                       for current_scope in &snapshot_scopes {
-                               let timestamp = reference_timestamp.saturating_sub(current_scope.clone());
-                               snapshot_sync_timestamps.push((current_scope.clone(), timestamp));
-                       };
-
-                       let mut snapshot_filenames_by_scope: HashMap<u64, String> = HashMap::with_capacity(10);
-
-                       for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps {
-                               let network_graph_clone = self.network_graph.clone();
-                               {
-                                       log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
-                                       // calculate the snapshot
-                                       let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
-
-                                       // persist the snapshot and update the symlink
-                                       let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
-                                       let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
-                                       log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
-                                       fs::write(&snapshot_path, snapshot.data).unwrap();
-                                       snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
-                               }
-                       }
-
-                       {
-                               // create dummy symlink
-                               let dummy_filename = "empty_delta.lngossip";
-                               let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
-                               let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
-                               fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
-
-                               let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
-                               let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
-                               log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
-                               symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
-                       }
-
-                       // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
-                       let symlink_count = (reference_timestamp - 1640995200) / config::SYMLINK_GRANULARITY_INTERVAL as u64;
-                       for i in 0..symlink_count {
-                               // let's create non-dummy-symlinks
-
-                               // first, determine which snapshot range should be referenced
-                               let referenced_scope = if i == 0 {
-                                       // special-case 0 to always refer to a full/initial sync
-                                       u64::MAX
-                               } else {
-                                       /*
-                                       We have snapshots for 6-day- and 7-day-intervals, but the next interval is
-                                       14 days. So if somebody requests an update with a timestamp that is 10 days old,
-                                       there is no longer a snapshot for that specific interval.
-
-                                       The correct snapshot will be the next highest interval, i. e. for 14 days.
-
-                                       The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
-                                       return on the first iteration that is at least equal to the requested interval.
-
-                                       Note, however, that the last value in the array is u64::max, which means that
-                                       multiplying it with snapshot_interval will overflow. To avoid that, we use
-                                       saturating_mul.
-                                        */
-
-                                       // find min(x) in snapshot_scopes where i * granularity <= x (the current scope)
-                                       snapshot_scopes.iter().find(|current_scope| {
-                                               i * config::SYMLINK_GRANULARITY_INTERVAL as u64 <= **current_scope
-                                       }).unwrap().clone()
-                               };
-                               log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);
-
-                               let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
-                               let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
-
-                               let canonical_last_sync_timestamp = if i == 0 {
-                                       // special-case 0 to always refer to a full/initial sync
-                                       0
-                               } else {
-                                       reference_timestamp.saturating_sub((config::SYMLINK_GRANULARITY_INTERVAL as u64).saturating_mul(i))
-                               };
-                               let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
-
-                               log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
-                               symlink(&relative_snapshot_path, &symlink_path).unwrap();
-                       }
-
-                       let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
-                       let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-                       fs::write(&update_time_path, format!("{}", update_time)).unwrap();
-
-                       if fs::metadata(&finalized_snapshot_directory).is_ok() {
-                               fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
-                       }
-                       if fs::metadata(&finalized_symlink_directory).is_ok() {
-                               fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
-                       }
-                       fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
-                       fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
+                       self.generate_snapshots(config::SYMLINK_GRANULARITY_INTERVAL as u64, snapshot_interval, &snapshot_scopes, &cache_path(), None).await;
 
                        // constructing the snapshots may have taken a while
                        let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
@@ -194,6 +62,147 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                }
        }
 
+       pub(crate) async fn generate_snapshots(&self, granularity_interval: u64, snapshot_interval: u64, snapshot_scopes: &[u64], cache_path: &str, max_symlink_count: Option<u64>) {
+               let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path);
+               let pending_symlink_directory = format!("{}/symlinks_pending", cache_path);
+               let finalized_snapshot_directory = format!("{}/snapshots", cache_path);
+               let finalized_symlink_directory = format!("{}/symlinks", cache_path);
+               let relative_symlink_to_snapshot_path = "../snapshots";
+
+               // 1. get the current timestamp
+               let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+               let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64);
+               log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
+
+               // 2. sleep until the next round interval
+               // 3. refresh all snapshots
+
+               // the stored snapshots should adhere to the following format
+               // from one day ago
+               // from two days ago
+               // …
+               // from a week ago
+               // from two weeks ago
+               // from three weeks ago
+               // full
+               // That means that at any given moment, there should only ever be
+               // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
+               // The snapshots, unlike dynamic updates, should account for all intermediate
+               // channel updates
+
+               // purge and recreate the pending directories
+               if fs::metadata(&pending_snapshot_directory).is_ok() {
+                       fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
+               }
+               if fs::metadata(&pending_symlink_directory).is_ok() {
+                       fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
+               }
+               fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
+               fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
+
+               let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
+               for current_scope in snapshot_scopes {
+                       let timestamp = reference_timestamp.saturating_sub(current_scope.clone());
+                       snapshot_sync_timestamps.push((current_scope.clone(), timestamp));
+               };
+
+               let mut snapshot_filenames_by_scope: HashMap<u64, String> = HashMap::with_capacity(10);
+
+               for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps {
+                       let network_graph_clone = self.network_graph.clone();
+                       {
+                               log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
+                               // calculate the snapshot
+                               let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
+
+                               // persist the snapshot and update the symlink
+                               let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
+                               let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
+                               log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
+                               fs::write(&snapshot_path, snapshot.data).unwrap();
+                               snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
+                       }
+               }
+
+               {
+                       // create dummy symlink
+                       let dummy_filename = "empty_delta.lngossip";
+                       let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
+                       let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
+                       fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
+
+                       let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
+                       let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
+                       log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
+                       symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
+               }
+
+               // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
+               let mut symlink_count = (reference_timestamp - 1640995200) / granularity_interval;
+               if let Some(max_symlink_count) = max_symlink_count {
+                       // this is primarily useful for testing
+                       symlink_count = std::cmp::min(symlink_count, max_symlink_count);
+               };
+
+               for i in 0..symlink_count {
+                       // let's create non-dummy-symlinks
+
+                       // first, determine which snapshot range should be referenced
+                       let referenced_scope = if i == 0 {
+                               // special-case 0 to always refer to a full/initial sync
+                               u64::MAX
+                       } else {
+                               /*
+                               We have snapshots for 6-day- and 7-day-intervals, but the next interval is
+                               14 days. So if somebody requests an update with a timestamp that is 10 days old,
+                               there is no longer a snapshot for that specific interval.
+
+                               The correct snapshot will be the next highest interval, i. e. for 14 days.
+
+                               The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
+                               return on the first iteration that is at least equal to the requested interval.
+
+                               Note, however, that the last value in the array is u64::max, which means that
+                               multiplying it with snapshot_interval will overflow. To avoid that, we use
+                               saturating_mul.
+                                */
+
+                               // find min(x) in snapshot_scopes where i * granularity <= x (the current scope)
+                               snapshot_scopes.iter().find(|current_scope| {
+                                       i * granularity_interval <= **current_scope
+                               }).unwrap().clone()
+                       };
+                       log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);
+
+                       let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
+                       let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
+
+                       let canonical_last_sync_timestamp = if i == 0 {
+                               // special-case 0 to always refer to a full/initial sync
+                               0
+                       } else {
+                               reference_timestamp.saturating_sub(granularity_interval.saturating_mul(i))
+                       };
+                       let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
+
+                       log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
+                       symlink(&relative_snapshot_path, &symlink_path).unwrap();
+               }
+
+               let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
+               let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+               fs::write(&update_time_path, format!("{}", update_time)).unwrap();
+
+               if fs::metadata(&finalized_snapshot_directory).is_ok() {
+                       fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
+               }
+               if fs::metadata(&finalized_symlink_directory).is_ok() {
+                       fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
+               }
+               fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
+               fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
+       }
+
        pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
                let round_multiple_delta = number % multiple;
                number - round_multiple_delta
index 09147e920b501fd735ee7016c41a90da055b794b..ed3a99629d37841f39bbd506ad488cfb5c96bd49 100644 (file)
@@ -2,6 +2,7 @@
 
 use std::cell::RefCell;
 use std::sync::Arc;
+use std::{fs, thread};
 use std::time::{SystemTime, UNIX_EPOCH};
 use bitcoin::{BlockHash, Network};
 use bitcoin::secp256k1::ecdsa::Signature;
@@ -16,6 +17,7 @@ use lightning::util::ser::Writeable;
 use lightning_rapid_gossip_sync::RapidGossipSync;
 use crate::{config, serialize_delta};
 use crate::persistence::GossipPersister;
+use crate::snapshot::Snapshotter;
 use crate::types::{GossipMessage, tests::TestLogger};
 
 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
@@ -39,7 +41,7 @@ fn current_time() -> u32 {
 
 pub(crate) fn db_test_schema() -> String {
        DB_TEST_SCHEMA.with(|suffix_reference| {
-               let mut suffix_option = suffix_reference.borrow();
+               let suffix_option = suffix_reference.borrow();
                suffix_option.as_ref().unwrap().clone()
        })
 }
@@ -114,7 +116,10 @@ impl SchemaSanitizer {
                        let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
                        let timestamp_seconds = unix_time.as_secs();
                        let timestamp_nanos = unix_time.as_nanos();
-                       let preimage = format!("{}", timestamp_nanos);
+                       // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
+                       let thread_id = thread::current().id();
+                       let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
+                       println!("test schema preimage: {}", preimage);
                        let suffix = Sha256dHash::hash(preimage.as_bytes()).into_inner().to_hex();
                        // the schema must start with a letter
                        let schema = format!("test_{}_{}", timestamp_seconds, suffix);
@@ -136,6 +141,26 @@ impl Drop for SchemaSanitizer {
        }
 }
 
+struct CacheSanitizer {}
+
+impl CacheSanitizer {
+       /// The CacheSanitizer instantiation requires that there be a schema sanitizer
+       fn new(_: &SchemaSanitizer) -> Self {
+               Self {}
+       }
+
+       fn cache_path(&self) -> String {
+               format!("./res/{}/", db_test_schema())
+       }
+}
+
+impl Drop for CacheSanitizer {
+       fn drop(&mut self) {
+               let cache_path = self.cache_path();
+               fs::remove_dir_all(cache_path).unwrap();
+       }
+}
+
 
 async fn clean_test_db() {
        let client = crate::connect_to_db().await;
@@ -168,9 +193,9 @@ async fn test_trivial_setup() {
                network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
                network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
 
-               receiver.send(GossipMessage::ChannelAnnouncement(announcement)).await.unwrap();
-               receiver.send(GossipMessage::ChannelUpdate(update_1)).await.unwrap();
-               receiver.send(GossipMessage::ChannelUpdate(update_2)).await.unwrap();
+               receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
+               receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
+               receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
                drop(receiver);
                persister.persist_gossip().await;
        }
@@ -215,3 +240,574 @@ async fn test_trivial_setup() {
        assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
        assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
 }
+
+#[tokio::test]
+async fn test_full_snapshot_recency() {
+       let _sanitizer = SchemaSanitizer::new();
+       let logger = Arc::new(TestLogger::new());
+       let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let network_graph_arc = Arc::new(network_graph);
+
+       let short_channel_id = 1;
+       let timestamp = current_time();
+       println!("timestamp: {}", timestamp);
+
+       { // seed the db
+               let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
+               let announcement = generate_announcement(short_channel_id);
+               network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
+               receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
+
+               { // direction false
+                       { // first update
+                               let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       { // second update
+                               let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+               }
+               { // direction true
+                       { // first and only update
+                               let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+               }
+
+               drop(receiver);
+               persister.persist_gossip().await;
+       }
+
+       let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let client_graph_arc = Arc::new(client_graph);
+
+       { // sync after initial seed
+               let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
+               logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
+
+               let channel_count = network_graph_arc.read_only().channels().len();
+
+               assert_eq!(channel_count, 1);
+               assert_eq!(serialization.message_count, 3);
+               assert_eq!(serialization.announcement_count, 1);
+               assert_eq!(serialization.update_count, 2);
+
+               let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
+               let update_result = rgs.update_network_graph(&serialization.data).unwrap();
+               // the update result must be a multiple of our snapshot granularity
+               assert_eq!(update_result % config::snapshot_generation_interval(), 0);
+               assert!(update_result < timestamp);
+
+               let readonly_graph = client_graph_arc.read_only();
+               let channels = readonly_graph.channels();
+               let client_channel_count = channels.len();
+               assert_eq!(client_channel_count, 1);
+
+               let first_channel = channels.get(&short_channel_id).unwrap();
+               assert!(&first_channel.announcement_message.is_none());
+               // ensure the update in one direction shows the latest fee
+               assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
+               assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
+       }
+
+       clean_test_db().await;
+}
+
+#[tokio::test]
+async fn test_full_snapshot_recency_with_wrong_seen_order() {
+       let _sanitizer = SchemaSanitizer::new();
+       let logger = Arc::new(TestLogger::new());
+       let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let network_graph_arc = Arc::new(network_graph);
+
+       let short_channel_id = 1;
+       let timestamp = current_time();
+       println!("timestamp: {}", timestamp);
+
+       { // seed the db
+               let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
+               let announcement = generate_announcement(short_channel_id);
+               network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
+               receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
+
+               { // direction false
+                       { // first update, seen latest
+                               let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
+                       }
+                       { // second update, seen first
+                               let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
+                       }
+               }
+               { // direction true
+                       { // first and only update
+                               let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+               }
+
+               drop(receiver);
+               persister.persist_gossip().await;
+       }
+
+       let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let client_graph_arc = Arc::new(client_graph);
+
+       { // sync after initial seed
+               let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
+               logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
+
+               let channel_count = network_graph_arc.read_only().channels().len();
+
+               assert_eq!(channel_count, 1);
+               assert_eq!(serialization.message_count, 3);
+               assert_eq!(serialization.announcement_count, 1);
+               assert_eq!(serialization.update_count, 2);
+
+               let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
+               let update_result = rgs.update_network_graph(&serialization.data).unwrap();
+               // the update result must be a multiple of our snapshot granularity
+               assert_eq!(update_result % config::snapshot_generation_interval(), 0);
+               assert!(update_result < timestamp);
+
+               let readonly_graph = client_graph_arc.read_only();
+               let channels = readonly_graph.channels();
+               let client_channel_count = channels.len();
+               assert_eq!(client_channel_count, 1);
+
+               let first_channel = channels.get(&short_channel_id).unwrap();
+               assert!(&first_channel.announcement_message.is_none());
+               // ensure the update in one direction shows the latest fee
+               assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
+               assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
+       }
+
+       clean_test_db().await;
+}
+
+#[tokio::test]
+async fn test_full_snapshot_recency_with_wrong_propagation_order() {
+       let _sanitizer = SchemaSanitizer::new();
+       let logger = Arc::new(TestLogger::new());
+       let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let network_graph_arc = Arc::new(network_graph);
+
+       let short_channel_id = 1;
+       let timestamp = current_time();
+       println!("timestamp: {}", timestamp);
+
+       { // seed the db
+               let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
+               let announcement = generate_announcement(short_channel_id);
+               network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
+               receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
+
+               { // direction false
+                       // apply updates in their timestamp order
+                       let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
+                       let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
+                       network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
+                       network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
+
+                       // propagate updates in their seen order
+                       receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
+                       receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
+               }
+               { // direction true
+                       { // first and only update
+                               let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+               }
+
+               drop(receiver);
+               persister.persist_gossip().await;
+       }
+
+       let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let client_graph_arc = Arc::new(client_graph);
+
+       { // sync after initial seed
+               let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
+               logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
+
+               let channel_count = network_graph_arc.read_only().channels().len();
+
+               assert_eq!(channel_count, 1);
+               assert_eq!(serialization.message_count, 3);
+               assert_eq!(serialization.announcement_count, 1);
+               assert_eq!(serialization.update_count, 2);
+
+               let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
+               let update_result = rgs.update_network_graph(&serialization.data).unwrap();
+               // the update result must be a multiple of our snapshot granularity
+               assert_eq!(update_result % config::snapshot_generation_interval(), 0);
+               assert!(update_result < timestamp);
+
+               let readonly_graph = client_graph_arc.read_only();
+               let channels = readonly_graph.channels();
+               let client_channel_count = channels.len();
+               assert_eq!(client_channel_count, 1);
+
+               let first_channel = channels.get(&short_channel_id).unwrap();
+               assert!(&first_channel.announcement_message.is_none());
+               // ensure the update in one direction shows the latest fee
+               assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
+               assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
+       }
+
+       clean_test_db().await;
+}
+
+#[tokio::test]
+async fn test_full_snapshot_mutiny_scenario() {
+       let _sanitizer = SchemaSanitizer::new();
+       let logger = Arc::new(TestLogger::new());
+       let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let network_graph_arc = Arc::new(network_graph);
+
+       let short_channel_id = 873706024403271681;
+       let timestamp = current_time();
+       // let oldest_simulation_timestamp = 1693300588;
+       let latest_simulation_timestamp = 1695909301;
+       let timestamp_offset = timestamp - latest_simulation_timestamp;
+       println!("timestamp: {}", timestamp);
+
+       { // seed the db
+               let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
+               let announcement = generate_announcement(short_channel_id);
+               network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
+               receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
+
+               { // direction false
+                       {
+                               let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
+                               // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+               }
+               { // direction true
+                       {
+                               let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       {
+                               let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+               }
+
+               drop(receiver);
+               persister.persist_gossip().await;
+       }
+
+       let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let client_graph_arc = Arc::new(client_graph);
+
+       { // sync after initial seed
+               let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
+               logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
+
+               let channel_count = network_graph_arc.read_only().channels().len();
+
+               assert_eq!(channel_count, 1);
+               assert_eq!(serialization.message_count, 3);
+               assert_eq!(serialization.announcement_count, 1);
+               assert_eq!(serialization.update_count, 2);
+
+               let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
+               let update_result = rgs.update_network_graph(&serialization.data).unwrap();
+               println!("update result: {}", update_result);
+               // the update result must be a multiple of our snapshot granularity
+               assert_eq!(update_result % config::snapshot_generation_interval(), 0);
+               assert!(update_result < timestamp);
+
+               let timestamp_delta = timestamp - update_result;
+               println!("timestamp delta: {}", timestamp_delta);
+               assert!(timestamp_delta < config::snapshot_generation_interval());
+
+               let readonly_graph = client_graph_arc.read_only();
+               let channels = readonly_graph.channels();
+               let client_channel_count = channels.len();
+               assert_eq!(client_channel_count, 1);
+
+               let first_channel = channels.get(&short_channel_id).unwrap();
+               assert!(&first_channel.announcement_message.is_none());
+               assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
+               assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
+       }
+
+       clean_test_db().await;
+}
+
+#[tokio::test]
+async fn test_full_snapshot_interlaced_channel_timestamps() {
+       let _sanitizer = SchemaSanitizer::new();
+       let logger = Arc::new(TestLogger::new());
+       let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let network_graph_arc = Arc::new(network_graph);
+
+       let main_channel_id = 1;
+       let timestamp = current_time();
+       println!("timestamp: {}", timestamp);
+
+       { // seed the db
+               let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
+               let secondary_channel_id = main_channel_id + 1;
+
+               { // main channel
+                       let announcement = generate_announcement(main_channel_id);
+                       network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
+                       receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
+               }
+
+               { // secondary channel
+                       let announcement = generate_announcement(secondary_channel_id);
+                       network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
+                       receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
+               }
+
+               { // main channel
+                       { // direction false
+                               let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       { // direction true
+                               let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+               }
+
+               { // in-between channel
+                       { // direction false
+                               let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       { // direction true
+                               let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+               }
+
+               { // main channel
+                       { // direction false
+                               let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+                       { // direction true
+                               let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
+                               network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                               receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+                       }
+               }
+
+               drop(receiver);
+               persister.persist_gossip().await;
+       }
+
+       let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let client_graph_arc = Arc::new(client_graph);
+
+       { // sync after initial seed
+               let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
+               logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
+
+               let channel_count = network_graph_arc.read_only().channels().len();
+
+               assert_eq!(channel_count, 2);
+               assert_eq!(serialization.message_count, 6);
+               assert_eq!(serialization.announcement_count, 2);
+               assert_eq!(serialization.update_count, 4);
+
+               let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
+               let update_result = rgs.update_network_graph(&serialization.data).unwrap();
+               // the update result must be a multiple of our snapshot granularity
+               assert_eq!(update_result % config::snapshot_generation_interval(), 0);
+               assert!(update_result < timestamp);
+
+               let readonly_graph = client_graph_arc.read_only();
+               let channels = readonly_graph.channels();
+               let client_channel_count = channels.len();
+               assert_eq!(client_channel_count, 2);
+
+               let first_channel = channels.get(&main_channel_id).unwrap();
+               assert!(&first_channel.announcement_message.is_none());
+               // ensure the update in one direction shows the latest fee
+               assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
+               assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
+       }
+
+       clean_test_db().await;
+}
+
+#[tokio::test]
+async fn test_full_snapshot_persistence() {
+       let schema_sanitizer = SchemaSanitizer::new();
+       let logger = Arc::new(TestLogger::new());
+       let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+       let network_graph_arc = Arc::new(network_graph);
+       let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
+       let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
+
+       let short_channel_id = 1;
+       let timestamp = current_time();
+       println!("timestamp: {}", timestamp);
+
+       { // seed the db
+               let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
+               let announcement = generate_announcement(short_channel_id);
+               network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
+               receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
+
+               { // direction true
+                       let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
+                       network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                       receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+               }
+
+               { // direction false
+                       let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
+                       network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                       receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+               }
+
+
+               drop(receiver);
+               persister.persist_gossip().await;
+       }
+
+       let cache_path = cache_sanitizer.cache_path();
+       let symlink_path = format!("{}/symlinks/0.bin", cache_path);
+
+       // generate snapshots
+       {
+               snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
+
+               let symlinked_data = fs::read(&symlink_path).unwrap();
+               let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+               let client_graph_arc = Arc::new(client_graph);
+
+               let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
+               let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
+               // the update result must be a multiple of our snapshot granularity
+               assert_eq!(update_result % config::snapshot_generation_interval(), 0);
+
+               let readonly_graph = client_graph_arc.read_only();
+               let channels = readonly_graph.channels();
+               let client_channel_count = channels.len();
+               assert_eq!(client_channel_count, 1);
+
+               let first_channel = channels.get(&short_channel_id).unwrap();
+               assert!(&first_channel.announcement_message.is_none());
+               // ensure the update in one direction shows the latest fee
+               assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
+               assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
+       }
+
+       { // update the db
+               let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
+
+               { // second update
+                       let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
+                       network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
+                       receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
+               }
+
+               drop(receiver);
+               persister.persist_gossip().await;
+       }
+
+       // regenerate snapshots
+       {
+               snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
+
+               let symlinked_data = fs::read(&symlink_path).unwrap();
+               let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
+               let client_graph_arc = Arc::new(client_graph);
+
+               let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
+               let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
+               // the update result must be a multiple of our snapshot granularity
+               assert_eq!(update_result % config::snapshot_generation_interval(), 0);
+
+               let readonly_graph = client_graph_arc.read_only();
+               let channels = readonly_graph.channels();
+               let client_channel_count = channels.len();
+               assert_eq!(client_channel_count, 1);
+
+               let first_channel = channels.get(&short_channel_id).unwrap();
+               assert!(&first_channel.announcement_message.is_none());
+               // ensure the update in one direction shows the latest fee
+               assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
+               assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
+       }
+
+       // clean up afterwards
+       clean_test_db().await;
+}
index f530bd41a112e41e082079e20fce4f19295c3877..93527a45b837c7368d9bc2842167323ff4c2e397 100644 (file)
@@ -14,8 +14,9 @@ pub(crate) type GossipPeerManager<L> = Arc<PeerManager<lightning_net_tokio::Sock
 
 #[derive(Debug)]
 pub(crate) enum GossipMessage {
-       ChannelAnnouncement(ChannelAnnouncement),
-       ChannelUpdate(ChannelUpdate),
+       // the second element is an optional override for the seen value
+       ChannelAnnouncement(ChannelAnnouncement, Option<u32>),
+       ChannelUpdate(ChannelUpdate, Option<u32>),
 }
 
 #[derive(Clone, Copy)]