From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Thu, 19 Oct 2023 19:50:38 +0000 (+0000) Subject: Merge pull request #63 from arik-so/2023/10/ordering_fix X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=86ebd80028d866d52ff5c2f79491a23dae5a1763;hp=ff9194a3a9bf4664d349a923db5eeee88f2db957;p=rapid-gossip-sync-server Merge pull request #63 from arik-so/2023/10/ordering_fix Fix update recency issue with expanded test coverage --- diff --git a/Cargo.toml b/Cargo.toml index c352d0b..f6d0515 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ tokio-postgres = { version = "=0.7.5" } futures = "0.3" [dev-dependencies] +lightning = { version = "0.0.117", features = ["_test_utils"] } lightning-rapid-gossip-sync = { version = "0.0.117" } [profile.dev] diff --git a/src/config.rs b/src/config.rs index 0655aa7..026e09e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,7 +15,7 @@ use lightning::util::ser::Readable; use lightning_block_sync::http::HttpEndpoint; use tokio_postgres::Config; -pub(crate) const SCHEMA_VERSION: i32 = 12; +pub(crate) const SCHEMA_VERSION: i32 = 13; pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks // generate symlinks based on a 3-hour-granularity @@ -143,7 +143,7 @@ pub(crate) fn db_index_creation_query() -> &'static str { CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_desc_with_id ON channel_updates(short_channel_id ASC, direction ASC, seen DESC) INCLUDE (id); CREATE UNIQUE INDEX IF NOT EXISTS channel_updates_key ON channel_updates (short_channel_id, direction, timestamp); CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen); - CREATE INDEX IF NOT EXISTS channel_updates_timestamp_desc ON channel_updates(timestamp DESC); + CREATE INDEX IF NOT EXISTS channel_updates_scid_asc_timestamp_desc ON channel_updates(short_channel_id ASC, timestamp DESC); " } @@ -282,6 +282,12 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) tx.execute("UPDATE config SET db_schema = 12 WHERE id = 1", &[]).await.unwrap(); tx.commit().await.unwrap(); } + if schema >= 1 && schema <= 12 { + let tx = client.transaction().await.unwrap(); + tx.execute("DROP INDEX IF EXISTS channel_updates_timestamp_desc", &[]).await.unwrap(); + tx.execute("UPDATE config SET db_schema = 13 WHERE id = 1", &[]).await.unwrap(); + tx.commit().await.unwrap(); + } if schema <= 1 || schema > SCHEMA_VERSION { panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION); } diff --git a/src/downloader.rs b/src/downloader.rs index 8d5cdb0..af854c8 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -62,7 +62,7 @@ impl GossipRouter where L::Target: Logger { counter.channel_announcements += 1; } - let gossip_message = GossipMessage::ChannelAnnouncement(msg); + let gossip_message = GossipMessage::ChannelAnnouncement(msg, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { @@ -73,7 +73,7 @@ impl GossipRouter where L::Target: Logger { fn new_channel_update(&self, msg: ChannelUpdate) { self.counter.write().unwrap().channel_updates += 1; - let gossip_message = GossipMessage::ChannelUpdate(msg); + let gossip_message = GossipMessage::ChannelUpdate(msg, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; diff --git a/src/lookup.rs b/src/lookup.rs index 8def6f1..0878860 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -288,7 +288,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_updates WHERE seen >= TO_TIMESTAMP($1) - ORDER BY timestamp DESC + ORDER BY short_channel_id ASC, timestamp DESC ", [last_sync_timestamp_float]).await.unwrap(); let mut pinned_updates = Box::pin(intermediate_updates); log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed()); diff --git a/src/persistence.rs b/src/persistence.rs index 8bb7b18..7b451b7 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -111,7 +111,7 @@ impl GossipPersister where L::Target: Logger { } match &gossip_message { - GossipMessage::ChannelAnnouncement(announcement) => { + GossipMessage::ChannelAnnouncement(announcement, _) => { let scid = announcement.contents.short_channel_id as i64; // start with the type prefix, which is already known a priori @@ -127,7 +127,7 @@ impl GossipPersister where L::Target: Logger { &announcement_signed ])).await.unwrap().unwrap(); } - GossipMessage::ChannelUpdate(update) => { + GossipMessage::ChannelUpdate(update, seen_override) => { let scid = update.contents.short_channel_id as i64; let timestamp = update.contents.timestamp as i64; @@ -146,10 +146,11 @@ impl GossipPersister where L::Target: Logger { let mut update_signed = Vec::new(); update.write(&mut update_signed).unwrap(); - tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client - .execute("INSERT INTO channel_updates (\ + let insertion_statement = if cfg!(test) { + "INSERT INTO channel_updates (\ short_channel_id, \ timestamp, \ + seen, \ channel_flags, \ direction, \ disable, \ @@ -159,9 +160,32 @@ impl GossipPersister where L::Target: Logger { fee_proportional_millionths, \ htlc_maximum_msat, \ blob_signed \ - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", &[ + ) VALUES ($1, $2, TO_TIMESTAMP($3), $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT DO NOTHING" + } else { + "INSERT INTO channel_updates (\ + short_channel_id, \ + timestamp, \ + channel_flags, \ + direction, \ + disable, \ + cltv_expiry_delta, \ + htlc_minimum_msat, \ + fee_base_msat, \ + fee_proportional_millionths, \ + htlc_maximum_msat, \ + blob_signed \ + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING" + }; + + // this may not be used outside test cfg + let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64; + + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute(insertion_statement, &[ &scid, ×tamp, + #[cfg(test)] + &_seen_timestamp, &(update.contents.flags as i16), &direction, &disable, diff --git a/src/snapshot.rs b/src/snapshot.rs index 2182856..07427a6 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -41,141 +41,9 @@ impl Snapshotter where L::Target: Logger { } } - let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path()); - let pending_symlink_directory = format!("{}/symlinks_pending", cache_path()); - let finalized_snapshot_directory = format!("{}/snapshots", cache_path()); - let finalized_symlink_directory = format!("{}/symlinks", cache_path()); - let relative_symlink_to_snapshot_path = "../snapshots"; - // this is gonna be a never-ending background job loop { - // 1. get the current timestamp - let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); - let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64); - log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp); - - // 2. sleep until the next round interval - // 3. refresh all snapshots - - // the stored snapshots should adhere to the following format - // from one day ago - // from two days ago - // … - // from a week ago - // from two weeks ago - // from three weeks ago - // full - // That means that at any given moment, there should only ever be - // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots - // The snapshots, unlike dynamic updates, should account for all intermediate - // channel updates - - // purge and recreate the pending directories - if fs::metadata(&pending_snapshot_directory).is_ok() { - fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory."); - } - if fs::metadata(&pending_symlink_directory).is_ok() { - fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory."); - } - fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory"); - fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory"); - - let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new(); - for current_scope in &snapshot_scopes { - let timestamp = reference_timestamp.saturating_sub(current_scope.clone()); - snapshot_sync_timestamps.push((current_scope.clone(), timestamp)); - }; - - let mut snapshot_filenames_by_scope: HashMap = HashMap::with_capacity(10); - - for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps { - let network_graph_clone = self.network_graph.clone(); - { - log_info!(self.logger, "Calculating {}-second snapshot", current_scope); - // calculate the snapshot - let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await; - - // persist the snapshot and update the symlink - let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp); - let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename); - log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental); - fs::write(&snapshot_path, snapshot.data).unwrap(); - snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename); - } - } - - { - // create dummy symlink - let dummy_filename = "empty_delta.lngossip"; - let dummy_snapshot = super::serialize_empty_blob(reference_timestamp); - let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename); - fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap(); - - let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp); - let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename); - log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path); - symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap(); - } - - // Number of intervals since Jan 1, 2022, a few months before RGS server was released. - let symlink_count = (reference_timestamp - 1640995200) / config::SYMLINK_GRANULARITY_INTERVAL as u64; - for i in 0..symlink_count { - // let's create non-dummy-symlinks - - // first, determine which snapshot range should be referenced - let referenced_scope = if i == 0 { - // special-case 0 to always refer to a full/initial sync - u64::MAX - } else { - /* - We have snapshots for 6-day- and 7-day-intervals, but the next interval is - 14 days. So if somebody requests an update with a timestamp that is 10 days old, - there is no longer a snapshot for that specific interval. - - The correct snapshot will be the next highest interval, i. e. for 14 days. - - The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will - return on the first iteration that is at least equal to the requested interval. - - Note, however, that the last value in the array is u64::max, which means that - multiplying it with snapshot_interval will overflow. To avoid that, we use - saturating_mul. - */ - - // find min(x) in snapshot_scopes where i * granularity <= x (the current scope) - snapshot_scopes.iter().find(|current_scope| { - i * config::SYMLINK_GRANULARITY_INTERVAL as u64 <= **current_scope - }).unwrap().clone() - }; - log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope); - - let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap(); - let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename); - - let canonical_last_sync_timestamp = if i == 0 { - // special-case 0 to always refer to a full/initial sync - 0 - } else { - reference_timestamp.saturating_sub((config::SYMLINK_GRANULARITY_INTERVAL as u64).saturating_mul(i)) - }; - let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp); - - log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path); - symlink(&relative_snapshot_path, &symlink_path).unwrap(); - } - - let update_time_path = format!("{}/update_time.txt", pending_symlink_directory); - let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); - fs::write(&update_time_path, format!("{}", update_time)).unwrap(); - - if fs::metadata(&finalized_snapshot_directory).is_ok() { - fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory."); - } - if fs::metadata(&finalized_symlink_directory).is_ok() { - fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory."); - } - fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory."); - fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory."); + self.generate_snapshots(config::SYMLINK_GRANULARITY_INTERVAL as u64, snapshot_interval, &snapshot_scopes, &cache_path(), None).await; // constructing the snapshots may have taken a while let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); @@ -194,6 +62,147 @@ impl Snapshotter where L::Target: Logger { } } + pub(crate) async fn generate_snapshots(&self, granularity_interval: u64, snapshot_interval: u64, snapshot_scopes: &[u64], cache_path: &str, max_symlink_count: Option) { + let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path); + let pending_symlink_directory = format!("{}/symlinks_pending", cache_path); + let finalized_snapshot_directory = format!("{}/snapshots", cache_path); + let finalized_symlink_directory = format!("{}/symlinks", cache_path); + let relative_symlink_to_snapshot_path = "../snapshots"; + + // 1. get the current timestamp + let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64); + log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp); + + // 2. sleep until the next round interval + // 3. refresh all snapshots + + // the stored snapshots should adhere to the following format + // from one day ago + // from two days ago + // … + // from a week ago + // from two weeks ago + // from three weeks ago + // full + // That means that at any given moment, there should only ever be + // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots + // The snapshots, unlike dynamic updates, should account for all intermediate + // channel updates + + // purge and recreate the pending directories + if fs::metadata(&pending_snapshot_directory).is_ok() { + fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory."); + } + if fs::metadata(&pending_symlink_directory).is_ok() { + fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory."); + } + fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory"); + fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory"); + + let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new(); + for current_scope in snapshot_scopes { + let timestamp = reference_timestamp.saturating_sub(current_scope.clone()); + snapshot_sync_timestamps.push((current_scope.clone(), timestamp)); + }; + + let mut snapshot_filenames_by_scope: HashMap = HashMap::with_capacity(10); + + for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps { + let network_graph_clone = self.network_graph.clone(); + { + log_info!(self.logger, "Calculating {}-second snapshot", current_scope); + // calculate the snapshot + let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await; + + // persist the snapshot and update the symlink + let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp); + let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename); + log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental); + fs::write(&snapshot_path, snapshot.data).unwrap(); + snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename); + } + } + + { + // create dummy symlink + let dummy_filename = "empty_delta.lngossip"; + let dummy_snapshot = super::serialize_empty_blob(reference_timestamp); + let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename); + fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap(); + + let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp); + let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename); + log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path); + symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap(); + } + + // Number of intervals since Jan 1, 2022, a few months before RGS server was released. + let mut symlink_count = (reference_timestamp - 1640995200) / granularity_interval; + if let Some(max_symlink_count) = max_symlink_count { + // this is primarily useful for testing + symlink_count = std::cmp::min(symlink_count, max_symlink_count); + }; + + for i in 0..symlink_count { + // let's create non-dummy-symlinks + + // first, determine which snapshot range should be referenced + let referenced_scope = if i == 0 { + // special-case 0 to always refer to a full/initial sync + u64::MAX + } else { + /* + We have snapshots for 6-day- and 7-day-intervals, but the next interval is + 14 days. So if somebody requests an update with a timestamp that is 10 days old, + there is no longer a snapshot for that specific interval. + + The correct snapshot will be the next highest interval, i. e. for 14 days. + + The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will + return on the first iteration that is at least equal to the requested interval. + + Note, however, that the last value in the array is u64::max, which means that + multiplying it with snapshot_interval will overflow. To avoid that, we use + saturating_mul. + */ + + // find min(x) in snapshot_scopes where i * granularity <= x (the current scope) + snapshot_scopes.iter().find(|current_scope| { + i * granularity_interval <= **current_scope + }).unwrap().clone() + }; + log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope); + + let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap(); + let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename); + + let canonical_last_sync_timestamp = if i == 0 { + // special-case 0 to always refer to a full/initial sync + 0 + } else { + reference_timestamp.saturating_sub(granularity_interval.saturating_mul(i)) + }; + let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp); + + log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path); + symlink(&relative_snapshot_path, &symlink_path).unwrap(); + } + + let update_time_path = format!("{}/update_time.txt", pending_symlink_directory); + let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + fs::write(&update_time_path, format!("{}", update_time)).unwrap(); + + if fs::metadata(&finalized_snapshot_directory).is_ok() { + fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory."); + } + if fs::metadata(&finalized_symlink_directory).is_ok() { + fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory."); + } + fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory."); + fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory."); + } + pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 { let round_multiple_delta = number % multiple; number - round_multiple_delta diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 09147e9..ed3a996 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -2,6 +2,7 @@ use std::cell::RefCell; use std::sync::Arc; +use std::{fs, thread}; use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::{BlockHash, Network}; use bitcoin::secp256k1::ecdsa::Signature; @@ -16,6 +17,7 @@ use lightning::util::ser::Writeable; use lightning_rapid_gossip_sync::RapidGossipSync; use crate::{config, serialize_delta}; use crate::persistence::GossipPersister; +use crate::snapshot::Snapshotter; use crate::types::{GossipMessage, tests::TestLogger}; const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week @@ -39,7 +41,7 @@ fn current_time() -> u32 { pub(crate) fn db_test_schema() -> String { DB_TEST_SCHEMA.with(|suffix_reference| { - let mut suffix_option = suffix_reference.borrow(); + let suffix_option = suffix_reference.borrow(); suffix_option.as_ref().unwrap().clone() }) } @@ -114,7 +116,10 @@ impl SchemaSanitizer { let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards"); let timestamp_seconds = unix_time.as_secs(); let timestamp_nanos = unix_time.as_nanos(); - let preimage = format!("{}", timestamp_nanos); + // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict + let thread_id = thread::current().id(); + let preimage = format!("{:?}-{}", thread_id, timestamp_nanos); + println!("test schema preimage: {}", preimage); let suffix = Sha256dHash::hash(preimage.as_bytes()).into_inner().to_hex(); // the schema must start with a letter let schema = format!("test_{}_{}", timestamp_seconds, suffix); @@ -136,6 +141,26 @@ impl Drop for SchemaSanitizer { } } +struct CacheSanitizer {} + +impl CacheSanitizer { + /// The CacheSanitizer instantiation requires that there be a schema sanitizer + fn new(_: &SchemaSanitizer) -> Self { + Self {} + } + + fn cache_path(&self) -> String { + format!("./res/{}/", db_test_schema()) + } +} + +impl Drop for CacheSanitizer { + fn drop(&mut self) { + let cache_path = self.cache_path(); + fs::remove_dir_all(cache_path).unwrap(); + } +} + async fn clean_test_db() { let client = crate::connect_to_db().await; @@ -168,9 +193,9 @@ async fn test_trivial_setup() { network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement)).await.unwrap(); - receiver.send(GossipMessage::ChannelUpdate(update_1)).await.unwrap(); - receiver.send(GossipMessage::ChannelUpdate(update_2)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap(); drop(receiver); persister.persist_gossip().await; } @@ -215,3 +240,574 @@ async fn test_trivial_setup() { assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL); assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL); } + +#[tokio::test] +async fn test_full_snapshot_recency() { + let _sanitizer = SchemaSanitizer::new(); + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + + let short_channel_id = 1; + let timestamp = current_time(); + println!("timestamp: {}", timestamp); + + { // seed the db + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + let announcement = generate_announcement(short_channel_id); + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + + { // direction false + { // first update + let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { // second update + let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + } + { // direction true + { // first and only update + let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + } + + drop(receiver); + persister.persist_gossip().await; + } + + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + + { // sync after initial seed + let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); + + let channel_count = network_graph_arc.read_only().channels().len(); + + assert_eq!(channel_count, 1); + assert_eq!(serialization.message_count, 3); + assert_eq!(serialization.announcement_count, 1); + assert_eq!(serialization.update_count, 2); + + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + let update_result = rgs.update_network_graph(&serialization.data).unwrap(); + // the update result must be a multiple of our snapshot granularity + assert_eq!(update_result % config::snapshot_generation_interval(), 0); + assert!(update_result < timestamp); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); + + let first_channel = channels.get(&short_channel_id).unwrap(); + assert!(&first_channel.announcement_message.is_none()); + // ensure the update in one direction shows the latest fee + assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39); + assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10); + } + + clean_test_db().await; +} + +#[tokio::test] +async fn test_full_snapshot_recency_with_wrong_seen_order() { + let _sanitizer = SchemaSanitizer::new(); + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + + let short_channel_id = 1; + let timestamp = current_time(); + println!("timestamp: {}", timestamp); + + { // seed the db + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + let announcement = generate_announcement(short_channel_id); + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + + { // direction false + { // first update, seen latest + let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap(); + } + { // second update, seen first + let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap(); + } + } + { // direction true + { // first and only update + let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + } + + drop(receiver); + persister.persist_gossip().await; + } + + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + + { // sync after initial seed + let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); + + let channel_count = network_graph_arc.read_only().channels().len(); + + assert_eq!(channel_count, 1); + assert_eq!(serialization.message_count, 3); + assert_eq!(serialization.announcement_count, 1); + assert_eq!(serialization.update_count, 2); + + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + let update_result = rgs.update_network_graph(&serialization.data).unwrap(); + // the update result must be a multiple of our snapshot granularity + assert_eq!(update_result % config::snapshot_generation_interval(), 0); + assert!(update_result < timestamp); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); + + let first_channel = channels.get(&short_channel_id).unwrap(); + assert!(&first_channel.announcement_message.is_none()); + // ensure the update in one direction shows the latest fee + assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39); + assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10); + } + + clean_test_db().await; +} + +#[tokio::test] +async fn test_full_snapshot_recency_with_wrong_propagation_order() { + let _sanitizer = SchemaSanitizer::new(); + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + + let short_channel_id = 1; + let timestamp = current_time(); + println!("timestamp: {}", timestamp); + + { // seed the db + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + let announcement = generate_announcement(short_channel_id); + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + + { // direction false + // apply updates in their timestamp order + let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38); + let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39); + network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); + + // propagate updates in their seen order + receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap(); + } + { // direction true + { // first and only update + let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + } + + drop(receiver); + persister.persist_gossip().await; + } + + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + + { // sync after initial seed + let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); + + let channel_count = network_graph_arc.read_only().channels().len(); + + assert_eq!(channel_count, 1); + assert_eq!(serialization.message_count, 3); + assert_eq!(serialization.announcement_count, 1); + assert_eq!(serialization.update_count, 2); + + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + let update_result = rgs.update_network_graph(&serialization.data).unwrap(); + // the update result must be a multiple of our snapshot granularity + assert_eq!(update_result % config::snapshot_generation_interval(), 0); + assert!(update_result < timestamp); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); + + let first_channel = channels.get(&short_channel_id).unwrap(); + assert!(&first_channel.announcement_message.is_none()); + // ensure the update in one direction shows the latest fee + assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39); + assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10); + } + + clean_test_db().await; +} + +#[tokio::test] +async fn test_full_snapshot_mutiny_scenario() { + let _sanitizer = SchemaSanitizer::new(); + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + + let short_channel_id = 873706024403271681; + let timestamp = current_time(); + // let oldest_simulation_timestamp = 1693300588; + let latest_simulation_timestamp = 1695909301; + let timestamp_offset = timestamp - latest_simulation_timestamp; + println!("timestamp: {}", timestamp); + + { // seed the db + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + let announcement = generate_announcement(short_channel_id); + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + + { // direction false + { + let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { + let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { + let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { + let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { + let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { + let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { + let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { + let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { + let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { + let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap(); + } + { + let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130); + // network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + } + { // direction true + { + let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { + let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + } + + drop(receiver); + persister.persist_gossip().await; + } + + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + + { // sync after initial seed + let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); + + let channel_count = network_graph_arc.read_only().channels().len(); + + assert_eq!(channel_count, 1); + assert_eq!(serialization.message_count, 3); + assert_eq!(serialization.announcement_count, 1); + assert_eq!(serialization.update_count, 2); + + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + let update_result = rgs.update_network_graph(&serialization.data).unwrap(); + println!("update result: {}", update_result); + // the update result must be a multiple of our snapshot granularity + assert_eq!(update_result % config::snapshot_generation_interval(), 0); + assert!(update_result < timestamp); + + let timestamp_delta = timestamp - update_result; + println!("timestamp delta: {}", timestamp_delta); + assert!(timestamp_delta < config::snapshot_generation_interval()); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); + + let first_channel = channels.get(&short_channel_id).unwrap(); + assert!(&first_channel.announcement_message.is_none()); + assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130); + assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10); + } + + clean_test_db().await; +} + +#[tokio::test] +async fn test_full_snapshot_interlaced_channel_timestamps() { + let _sanitizer = SchemaSanitizer::new(); + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + + let main_channel_id = 1; + let timestamp = current_time(); + println!("timestamp: {}", timestamp); + + { // seed the db + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + let secondary_channel_id = main_channel_id + 1; + + { // main channel + let announcement = generate_announcement(main_channel_id); + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + } + + { // secondary channel + let announcement = generate_announcement(secondary_channel_id); + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + } + + { // main channel + { // direction false + let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { // direction true + let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + } + + { // in-between channel + { // direction false + let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { // direction true + let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + } + + { // main channel + { // direction false + let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + { // direction true + let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + } + + drop(receiver); + persister.persist_gossip().await; + } + + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + + { // sync after initial seed + let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1); + + let channel_count = network_graph_arc.read_only().channels().len(); + + assert_eq!(channel_count, 2); + assert_eq!(serialization.message_count, 6); + assert_eq!(serialization.announcement_count, 2); + assert_eq!(serialization.update_count, 4); + + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + let update_result = rgs.update_network_graph(&serialization.data).unwrap(); + // the update result must be a multiple of our snapshot granularity + assert_eq!(update_result % config::snapshot_generation_interval(), 0); + assert!(update_result < timestamp); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 2); + + let first_channel = channels.get(&main_channel_id).unwrap(); + assert!(&first_channel.announcement_message.is_none()); + // ensure the update in one direction shows the latest fee + assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11); + assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6); + } + + clean_test_db().await; +} + +#[tokio::test] +async fn test_full_snapshot_persistence() { + let schema_sanitizer = SchemaSanitizer::new(); + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone()); + let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer); + + let short_channel_id = 1; + let timestamp = current_time(); + println!("timestamp: {}", timestamp); + + { // seed the db + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + let announcement = generate_announcement(short_channel_id); + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + + { // direction true + let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + + { // direction false + let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + + + drop(receiver); + persister.persist_gossip().await; + } + + let cache_path = cache_sanitizer.cache_path(); + let symlink_path = format!("{}/symlinks/0.bin", cache_path); + + // generate snapshots + { + snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await; + + let symlinked_data = fs::read(&symlink_path).unwrap(); + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + let update_result = rgs.update_network_graph(&symlinked_data).unwrap(); + // the update result must be a multiple of our snapshot granularity + assert_eq!(update_result % config::snapshot_generation_interval(), 0); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); + + let first_channel = channels.get(&short_channel_id).unwrap(); + assert!(&first_channel.announcement_message.is_none()); + // ensure the update in one direction shows the latest fee + assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38); + assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10); + } + + { // update the db + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + + { // second update + let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39); + network_graph_arc.update_channel_unsigned(&update.contents).unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap(); + } + + drop(receiver); + persister.persist_gossip().await; + } + + // regenerate snapshots + { + snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await; + + let symlinked_data = fs::read(&symlink_path).unwrap(); + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + let update_result = rgs.update_network_graph(&symlinked_data).unwrap(); + // the update result must be a multiple of our snapshot granularity + assert_eq!(update_result % config::snapshot_generation_interval(), 0); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); + + let first_channel = channels.get(&short_channel_id).unwrap(); + assert!(&first_channel.announcement_message.is_none()); + // ensure the update in one direction shows the latest fee + assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39); + assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10); + } + + // clean up afterwards + clean_test_db().await; +} diff --git a/src/types.rs b/src/types.rs index f530bd4..93527a4 100644 --- a/src/types.rs +++ b/src/types.rs @@ -14,8 +14,9 @@ pub(crate) type GossipPeerManager = Arc), + ChannelUpdate(ChannelUpdate, Option), } #[derive(Clone, Copy)]