From f21d676279519341177f6f4fbc4273a6c750e886 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Thu, 31 Aug 2023 15:59:32 -0700 Subject: [PATCH] Distinguish between snapshot and symlink intervals. Snapshots may be generated every day or even every couple days, but symlinks will be generated at the given symlink granularity. To avoid gcd complications, all snapshot generation intervals must be multiples of the symlink granularity interval. --- README.md | 26 +++++++++++++------------- src/config.rs | 17 ++++++++++------- src/lib.rs | 11 +++++++---- src/snapshot.rs | 31 +++++++++++++++++-------------- 4 files changed, 47 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index ff750a1..1210490 100644 --- a/README.md +++ b/README.md @@ -12,18 +12,18 @@ These are the components it's comprised of. A config file where the Postgres credentials and Lightning peers can be adjusted. Most adjustments can be made by setting environment variables, whose usage is as follows: -| Name | Default | Description | -|:-------------------------------------|:--------------------|:-----------------------------------------------------------------------------------------------------------| -| RAPID_GOSSIP_SYNC_SERVER_DB_HOST | localhost | Domain of the Postgres database | -| RAPID_GOSSIP_SYNC_SERVER_DB_USER | alice | Username to access Postgres | -| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD | _None_ | Password to access Postgres | -| RAPID_GOSSIP_SYNC_SERVER_DB_NAME | ln_graph_sync | Name of the database to be used for gossip storage | -| RAPID_GOSSIP_SYNC_SERVER_NETWORK | mainnet | Network to operate in. Possible values are mainnet, testnet, signet, regtest | -| RAPID_GOSSIP_SYNC_CALC_INTERVAL | 86400 | The interval in seconds that RGS creates a new snapshot | -| BITCOIN_REST_DOMAIN | 127.0.0.1 | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) | -| BITCOIN_REST_PORT | 8332 | HTTP port of the bitcoind REST server | -| BITCOIN_REST_PATH | /rest/ | Path infix to access the bitcoind REST endpoints | -| LN_PEERS | _Wallet of Satoshi_ | Comma separated list of LN peers to use for retrieving gossip | +| Name | Default | Description | +|:-------------------------------------------|:--------------------|:-----------------------------------------------------------------------------------------------------------| +| RAPID_GOSSIP_SYNC_SERVER_DB_HOST | localhost | Domain of the Postgres database | +| RAPID_GOSSIP_SYNC_SERVER_DB_USER | alice | Username to access Postgres | +| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD | _None_ | Password to access Postgres | +| RAPID_GOSSIP_SYNC_SERVER_DB_NAME | ln_graph_sync | Name of the database to be used for gossip storage | +| RAPID_GOSSIP_SYNC_SERVER_NETWORK | mainnet | Network to operate in. Possible values are mainnet, testnet, signet, regtest | +| RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL | 10800 | The interval in seconds between snapshots | +| BITCOIN_REST_DOMAIN | 127.0.0.1 | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) | +| BITCOIN_REST_PORT | 8332 | HTTP port of the bitcoind REST server | +| BITCOIN_REST_PATH | /rest/ | Path infix to access the bitcoind REST endpoints | +| LN_PEERS | _Wallet of Satoshi_ | Comma separated list of LN peers to use for retrieving gossip | ### downloader @@ -37,7 +37,7 @@ The module responsible for persisting all the downloaded graph data to Postgres. The snapshotting module is responsible for calculating and storing snapshots. It's started up as soon as the first full graph sync completes, and then keeps updating the snapshots at a -configurable interval with a 24 hour default. +configurable interval with a 3-hour-default. ### lookup diff --git a/src/config.rs b/src/config.rs index 87a7f42..b1c7ba2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,18 +15,21 @@ use lightning::util::ser::Readable; use lightning_block_sync::http::HttpEndpoint; use tokio_postgres::Config; -pub(crate) const SCHEMA_VERSION: i32 = 13; -pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds +pub(crate) const SCHEMA_VERSION: i32 = 12; +pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; +// generate symlinks based on a 3-hour-granularity /// If the last update in either direction was more than six days ago, we send a reminder /// That reminder may be either in the form of a channel announcement, or in the form of empty /// updates in both directions. pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60); pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true; -pub(crate) fn calculate_interval() -> u32 { - let interval = env::var("RAPID_GOSSIP_SYNC_CALC_INTERVAL").unwrap_or("86400".to_string()) +pub(crate) fn snapshot_generation_interval() -> u32 { + let interval = env::var("RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL").unwrap_or(SYMLINK_GRANULARITY_INTERVAL.to_string()) .parse::() - .expect("RAPID_GOSSIP_SYNC_CALC_INTERVAL env variable must be a u32."); + .expect("RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL env variable must be a u32."); + assert!(interval > 0, "RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL must be positive"); + assert_eq!(interval % SYMLINK_GRANULARITY_INTERVAL, 0, "RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL must be a multiple of {} (seconds)", SYMLINK_GRANULARITY_INTERVAL); interval } @@ -168,7 +171,7 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) tx_ref.execute("UPDATE channel_updates SET short_channel_id = $1, direction = $2 WHERE id = $3", &[&scid, &direction, &id]).await.unwrap(); }); } - while let Some(_) = updates.next().await { } + while let Some(_) = updates.next().await {} } tx.execute("ALTER TABLE channel_updates ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap(); tx.execute("ALTER TABLE channel_updates ALTER short_channel_id SET NOT NULL", &[]).await.unwrap(); @@ -195,7 +198,7 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) tx_ref.execute("UPDATE channel_announcements SET short_channel_id = $1 WHERE id = $2", &[&scid, &id]).await.unwrap(); }); } - while let Some(_) = updates.next().await { } + while let Some(_) = updates.next().await {} } tx.execute("ALTER TABLE channel_announcements ADD CONSTRAINT channel_announcements_short_channel_id_key UNIQUE (short_channel_id)", &[]).await.unwrap(); tx.execute("ALTER TABLE channel_announcements ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 47f62f6..f56aca2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ use lightning::util::logger::Logger; use lightning::util::ser::{ReadableArgs, Writeable}; use tokio::sync::mpsc; use tokio_postgres::{Client, NoTls}; +use crate::config::SYMLINK_GRANULARITY_INTERVAL; use crate::lookup::DeltaSet; use crate::persistence::GossipPersister; @@ -85,6 +86,9 @@ impl RapidSyncProcessor where L::Ta } pub async fn start_sync(&self) { + log_info!(self.logger, "Starting Rapid Gossip Sync Server"); + log_info!(self.logger, "Snapshot interval: {} seconds", config::snapshot_generation_interval()); + // means to indicate sync completion status within this module let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1); @@ -142,12 +146,11 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { let mut blob = GOSSIP_PREFIX.to_vec(); let network = config::network(); - let calc_interval = config::calculate_interval(); let genesis_block = bitcoin::blockdata::constants::genesis_block(network); let chain_hash = genesis_block.block_hash(); chain_hash.write(&mut blob).unwrap(); - let blob_timestamp = Snapshotter::>::round_down_to_nearest_multiple(current_timestamp, calc_interval as u64) as u32; + let blob_timestamp = Snapshotter::>::round_down_to_nearest_multiple(current_timestamp, SYMLINK_GRANULARITY_INTERVAL as u64) as u32; blob_timestamp.write(&mut blob).unwrap(); 0u32.write(&mut blob).unwrap(); // node count @@ -163,7 +166,7 @@ async fn serialize_delta(network_graph: Arc>, network_graph.remove_stale_channels_and_tracking(); let mut output: Vec = vec![]; - let calc_interval = config::calculate_interval(); + let snapshot_interval = config::snapshot_generation_interval(); // set a flag if the chain hash is prepended // chain hash only necessary if either channel announcements or non-incremental updates are present @@ -249,7 +252,7 @@ async fn serialize_delta(network_graph: Arc>, serialization_details.chain_hash.write(&mut prefixed_output).unwrap(); // always write the latest seen timestamp let latest_seen_timestamp = serialization_details.latest_seen; - let overflow_seconds = latest_seen_timestamp % calc_interval; + let overflow_seconds = latest_seen_timestamp % snapshot_interval; let serialized_seen_timestamp = latest_seen_timestamp.saturating_sub(overflow_seconds); serialized_seen_timestamp.write(&mut prefixed_output).unwrap(); diff --git a/src/snapshot.rs b/src/snapshot.rs index 28d0768..dd3bb76 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -25,10 +25,8 @@ impl Snapshotter where L::Target: Logger { pub(crate) async fn snapshot_gossip(&self) { log_info!(self.logger, "Initiating snapshotting service"); - let calc_interval = config::calculate_interval(); - let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX]; - const DAY_SECONDS: u64 = 60 * 60 * 24; - let round_day_seconds = calc_interval as u64; + let snapshot_interval = config::snapshot_generation_interval() as u64; + let snapshot_sync_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX]; let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path()); let pending_symlink_directory = format!("{}/symlinks_pending", cache_path()); @@ -40,7 +38,7 @@ impl Snapshotter where L::Target: Logger { loop { // 1. get the current timestamp let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); - let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64); + let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64); log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp); // 2. sleep until the next round interval @@ -70,9 +68,9 @@ impl Snapshotter where L::Target: Logger { fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory"); let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new(); - for factor in &snapshot_sync_day_factors { + for factor in &snapshot_sync_factors { // basically timestamp - day_seconds * factor - let timestamp = reference_timestamp.saturating_sub(DAY_SECONDS.saturating_mul(factor.clone())); + let timestamp = reference_timestamp.saturating_sub(snapshot_interval.saturating_mul(factor.clone())); snapshot_sync_timestamps.push((factor.clone(), timestamp)); }; @@ -108,7 +106,7 @@ impl Snapshotter where L::Target: Logger { } // Number of intervals since Jan 1, 2022, a few months before RGS server was released. - let symlink_count = (reference_timestamp - 1640995200) / config::SNAPSHOT_CALCULATION_INTERVAL as u64; + let symlink_count = (reference_timestamp - 1640995200) / config::SYMLINK_GRANULARITY_INTERVAL as u64; for i in 0..symlink_count { // let's create non-dummy-symlinks @@ -128,13 +126,13 @@ impl Snapshotter where L::Target: Logger { return on the first iteration that is at least equal to the requested interval. Note, however, that the last value in the array is u64::max, which means that - multiplying it with DAY_SECONDS will overflow. To avoid that, we use + multiplying it with snapshot_interval will overflow. To avoid that, we use saturating_mul. */ // find min(x) in snapshot_sync_day_factors where x >= i - snapshot_sync_day_factors.iter().find(|x| { - DAY_SECONDS.saturating_mul(**x) >= i * config::SNAPSHOT_CALCULATION_INTERVAL as u64 + snapshot_sync_factors.iter().find(|x| { + snapshot_interval.saturating_mul(**x) >= i * config::SYMLINK_GRANULARITY_INTERVAL as u64 }).unwrap().clone() }; log_info!(self.logger, "i: {}, referenced day range: {}", i, referenced_day_range); @@ -146,7 +144,7 @@ impl Snapshotter where L::Target: Logger { // special-case 0 to always refer to a full/initial sync 0 } else { - reference_timestamp.saturating_sub((config::SNAPSHOT_CALCULATION_INTERVAL as u64).saturating_mul(i)) + reference_timestamp.saturating_sub((config::SYMLINK_GRANULARITY_INTERVAL as u64).saturating_mul(i)) }; let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp); @@ -169,8 +167,13 @@ impl Snapshotter where L::Target: Logger { // constructing the snapshots may have taken a while let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); - let remainder = current_time % config::SNAPSHOT_CALCULATION_INTERVAL as u64; - let time_until_next_day = config::SNAPSHOT_CALCULATION_INTERVAL as u64 - remainder; + + // NOTE: we're waiting until the next multiple of snapshot_interval + // however, if the symlink granularity is lower, then during that time, no intermediate + // symlinks will be generated. That should be ok, because any timestamps previously + // returned would already have generated symlinks, but this does have bug potential + let remainder = current_time % snapshot_interval; + let time_until_next_day = snapshot_interval - remainder; log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day); // add in an extra five seconds to assure the rounding down works correctly -- 2.39.5