Distinguish between snapshot and symlink intervals.
authorArik Sosman <git@arik.io>
Thu, 31 Aug 2023 22:59:32 +0000 (15:59 -0700)
committerArik Sosman <git@arik.io>
Fri, 1 Sep 2023 21:08:52 +0000 (14:08 -0700)
Snapshots may be generated every day or even every couple days,
but symlinks will be generated at the given symlink granularity.

To avoid gcd complications, all snapshot generation intervals
must be multiples of the symlink granularity interval.

README.md
src/config.rs
src/lib.rs
src/snapshot.rs

index ff750a1001e73bfe873473d297190f22474e70b6..12104909aac1cea00e7a769a51ae2bc4c5a02412 100644 (file)
--- a/README.md
+++ b/README.md
@@ -12,18 +12,18 @@ These are the components it's comprised of.
 A config file where the Postgres credentials and Lightning peers can be adjusted. Most adjustments
 can be made by setting environment variables, whose usage is as follows:
 
-| Name                                 | Default             | Description                                                                                                |
-|:-------------------------------------|:--------------------|:-----------------------------------------------------------------------------------------------------------|
-| RAPID_GOSSIP_SYNC_SERVER_DB_HOST     | localhost           | Domain of the Postgres database                                                                            |
-| RAPID_GOSSIP_SYNC_SERVER_DB_USER     | alice               | Username to access Postgres                                                                                |
-| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD | _None_              | Password to access Postgres                                                                                |
-| RAPID_GOSSIP_SYNC_SERVER_DB_NAME     | ln_graph_sync       | Name of the database to be used for gossip storage                                                         |
-| RAPID_GOSSIP_SYNC_SERVER_NETWORK     | mainnet             | Network to operate in. Possible values are mainnet, testnet, signet, regtest                               |
-| RAPID_GOSSIP_SYNC_CALC_INTERVAL      | 86400               | The interval in seconds that RGS creates a new snapshot                                                    |
-| BITCOIN_REST_DOMAIN                  | 127.0.0.1           | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
-| BITCOIN_REST_PORT                    | 8332                | HTTP port of the bitcoind REST server                                                                      |
-| BITCOIN_REST_PATH                    | /rest/              | Path infix to access the bitcoind REST endpoints                                                           |
-| LN_PEERS                             | _Wallet of Satoshi_ | Comma separated list of LN peers to use for retrieving gossip                                              |
+| Name                                       | Default             | Description                                                                                                |
+|:-------------------------------------------|:--------------------|:-----------------------------------------------------------------------------------------------------------|
+| RAPID_GOSSIP_SYNC_SERVER_DB_HOST           | localhost           | Domain of the Postgres database                                                                            |
+| RAPID_GOSSIP_SYNC_SERVER_DB_USER           | alice               | Username to access Postgres                                                                                |
+| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD       | _None_              | Password to access Postgres                                                                                |
+| RAPID_GOSSIP_SYNC_SERVER_DB_NAME           | ln_graph_sync       | Name of the database to be used for gossip storage                                                         |
+| RAPID_GOSSIP_SYNC_SERVER_NETWORK           | mainnet             | Network to operate in. Possible values are mainnet, testnet, signet, regtest                               |
+| RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL | 10800               | The interval in seconds between snapshots                                                                  |
+| BITCOIN_REST_DOMAIN                        | 127.0.0.1           | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
+| BITCOIN_REST_PORT                          | 8332                | HTTP port of the bitcoind REST server                                                                      |
+| BITCOIN_REST_PATH                          | /rest/              | Path infix to access the bitcoind REST endpoints                                                           |
+| LN_PEERS                                   | _Wallet of Satoshi_ | Comma separated list of LN peers to use for retrieving gossip                                              |
 
 ### downloader
 
@@ -37,7 +37,7 @@ The module responsible for persisting all the downloaded graph data to Postgres.
 
 The snapshotting module is responsible for calculating and storing snapshots. It's started up
 as soon as the first full graph sync completes, and then keeps updating the snapshots at a
-configurable interval with a 24 hour default.
+configurable interval with a 3-hour-default.
 
 ### lookup
 
index 87a7f42c2d29af74c00d9805087a751e8f7fb5a8..b1c7ba283e9f73fe7ac732cd76680891fb20a14d 100644 (file)
@@ -15,18 +15,21 @@ use lightning::util::ser::Readable;
 use lightning_block_sync::http::HttpEndpoint;
 use tokio_postgres::Config;
 
-pub(crate) const SCHEMA_VERSION: i32 = 13;
-pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds
+pub(crate) const SCHEMA_VERSION: i32 = 12;
+pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3;
+// generate symlinks based on a 3-hour-granularity
 /// If the last update in either direction was more than six days ago, we send a reminder
 /// That reminder may be either in the form of a channel announcement, or in the form of empty
 /// updates in both directions.
 pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60);
 pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;
 
-pub(crate) fn calculate_interval() -> u32 {
-       let interval = env::var("RAPID_GOSSIP_SYNC_CALC_INTERVAL").unwrap_or("86400".to_string())
+pub(crate) fn snapshot_generation_interval() -> u32 {
+       let interval = env::var("RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL").unwrap_or(SYMLINK_GRANULARITY_INTERVAL.to_string())
                .parse::<u32>()
-               .expect("RAPID_GOSSIP_SYNC_CALC_INTERVAL env variable must be a u32.");
+               .expect("RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL env variable must be a u32.");
+       assert!(interval > 0, "RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL must be positive");
+       assert_eq!(interval % SYMLINK_GRANULARITY_INTERVAL, 0, "RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL must be a multiple of {} (seconds)", SYMLINK_GRANULARITY_INTERVAL);
        interval
 }
 
@@ -168,7 +171,7 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
                                        tx_ref.execute("UPDATE channel_updates SET short_channel_id = $1, direction = $2 WHERE id = $3", &[&scid, &direction, &id]).await.unwrap();
                                });
                        }
-                       while let Some(_) = updates.next().await { }
+                       while let Some(_) = updates.next().await {}
                }
                tx.execute("ALTER TABLE channel_updates ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap();
                tx.execute("ALTER TABLE channel_updates ALTER short_channel_id SET NOT NULL", &[]).await.unwrap();
@@ -195,7 +198,7 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
                                        tx_ref.execute("UPDATE channel_announcements SET short_channel_id = $1 WHERE id = $2", &[&scid, &id]).await.unwrap();
                                });
                        }
-                       while let Some(_) = updates.next().await { }
+                       while let Some(_) = updates.next().await {}
                }
                tx.execute("ALTER TABLE channel_announcements ADD CONSTRAINT channel_announcements_short_channel_id_key UNIQUE (short_channel_id)", &[]).await.unwrap();
                tx.execute("ALTER TABLE channel_announcements ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap();
index 47f62f6228793445d1ebea318eec04e0d760a147..f56aca260f218c0798df4b6ef2300c8534c4dcf5 100644 (file)
@@ -21,6 +21,7 @@ use lightning::util::logger::Logger;
 use lightning::util::ser::{ReadableArgs, Writeable};
 use tokio::sync::mpsc;
 use tokio_postgres::{Client, NoTls};
+use crate::config::SYMLINK_GRANULARITY_INTERVAL;
 use crate::lookup::DeltaSet;
 
 use crate::persistence::GossipPersister;
@@ -85,6 +86,9 @@ impl<L: Deref + Clone + Send + Sync + 'static> RapidSyncProcessor<L> where L::Ta
        }
 
        pub async fn start_sync(&self) {
+               log_info!(self.logger, "Starting Rapid Gossip Sync Server");
+               log_info!(self.logger, "Snapshot interval: {} seconds", config::snapshot_generation_interval());
+
                // means to indicate sync completion status within this module
                let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1);
 
@@ -142,12 +146,11 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
        let mut blob = GOSSIP_PREFIX.to_vec();
 
        let network = config::network();
-       let calc_interval = config::calculate_interval();
        let genesis_block = bitcoin::blockdata::constants::genesis_block(network);
        let chain_hash = genesis_block.block_hash();
        chain_hash.write(&mut blob).unwrap();
 
-       let blob_timestamp = Snapshotter::<Arc<RGSSLogger>>::round_down_to_nearest_multiple(current_timestamp, calc_interval as u64) as u32;
+       let blob_timestamp = Snapshotter::<Arc<RGSSLogger>>::round_down_to_nearest_multiple(current_timestamp, SYMLINK_GRANULARITY_INTERVAL as u64) as u32;
        blob_timestamp.write(&mut blob).unwrap();
 
        0u32.write(&mut blob).unwrap(); // node count
@@ -163,7 +166,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
        network_graph.remove_stale_channels_and_tracking();
 
        let mut output: Vec<u8> = vec![];
-       let calc_interval = config::calculate_interval();
+       let snapshot_interval = config::snapshot_generation_interval();
 
        // set a flag if the chain hash is prepended
        // chain hash only necessary if either channel announcements or non-incremental updates are present
@@ -249,7 +252,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
        serialization_details.chain_hash.write(&mut prefixed_output).unwrap();
        // always write the latest seen timestamp
        let latest_seen_timestamp = serialization_details.latest_seen;
-       let overflow_seconds = latest_seen_timestamp % calc_interval;
+       let overflow_seconds = latest_seen_timestamp % snapshot_interval;
        let serialized_seen_timestamp = latest_seen_timestamp.saturating_sub(overflow_seconds);
        serialized_seen_timestamp.write(&mut prefixed_output).unwrap();
 
index 28d0768d65ede059651d16bcf6dc3421a681ce58..dd3bb7604ed04ee2ba371a9537d8feb47c3c6929 100644 (file)
@@ -25,10 +25,8 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
        pub(crate) async fn snapshot_gossip(&self) {
                log_info!(self.logger, "Initiating snapshotting service");
 
-               let calc_interval = config::calculate_interval();
-               let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
-               const DAY_SECONDS: u64 = 60 * 60 * 24;
-               let round_day_seconds = calc_interval as u64;
+               let snapshot_interval = config::snapshot_generation_interval() as u64;
+               let snapshot_sync_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
 
                let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
                let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
@@ -40,7 +38,7 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                loop {
                        // 1. get the current timestamp
                        let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-                       let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64);
+                       let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64);
                        log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
 
                        // 2. sleep until the next round interval
@@ -70,9 +68,9 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                        fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
 
                        let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
-                       for factor in &snapshot_sync_day_factors {
+                       for factor in &snapshot_sync_factors {
                                // basically timestamp - day_seconds * factor
-                               let timestamp = reference_timestamp.saturating_sub(DAY_SECONDS.saturating_mul(factor.clone()));
+                               let timestamp = reference_timestamp.saturating_sub(snapshot_interval.saturating_mul(factor.clone()));
                                snapshot_sync_timestamps.push((factor.clone(), timestamp));
                        };
 
@@ -108,7 +106,7 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                        }
 
                        // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
-                       let symlink_count = (reference_timestamp - 1640995200) / config::SNAPSHOT_CALCULATION_INTERVAL as u64;
+                       let symlink_count = (reference_timestamp - 1640995200) / config::SYMLINK_GRANULARITY_INTERVAL as u64;
                        for i in 0..symlink_count {
                                // let's create non-dummy-symlinks
 
@@ -128,13 +126,13 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                                        return on the first iteration that is at least equal to the requested interval.
 
                                        Note, however, that the last value in the array is u64::max, which means that
-                                       multiplying it with DAY_SECONDS will overflow. To avoid that, we use
+                                       multiplying it with snapshot_interval will overflow. To avoid that, we use
                                        saturating_mul.
                                         */
 
                                        // find min(x) in snapshot_sync_day_factors where x >= i
-                                       snapshot_sync_day_factors.iter().find(|x| {
-                                               DAY_SECONDS.saturating_mul(**x) >= i * config::SNAPSHOT_CALCULATION_INTERVAL as u64
+                                       snapshot_sync_factors.iter().find(|x| {
+                                               snapshot_interval.saturating_mul(**x) >= i * config::SYMLINK_GRANULARITY_INTERVAL as u64
                                        }).unwrap().clone()
                                };
                                log_info!(self.logger, "i: {}, referenced day range: {}", i, referenced_day_range);
@@ -146,7 +144,7 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                                        // special-case 0 to always refer to a full/initial sync
                                        0
                                } else {
-                                       reference_timestamp.saturating_sub((config::SNAPSHOT_CALCULATION_INTERVAL as u64).saturating_mul(i))
+                                       reference_timestamp.saturating_sub((config::SYMLINK_GRANULARITY_INTERVAL as u64).saturating_mul(i))
                                };
                                let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
 
@@ -169,8 +167,13 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
 
                        // constructing the snapshots may have taken a while
                        let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-                       let remainder = current_time % config::SNAPSHOT_CALCULATION_INTERVAL as u64;
-                       let time_until_next_day = config::SNAPSHOT_CALCULATION_INTERVAL as u64 - remainder;
+
+                       // NOTE: we're waiting until the next multiple of snapshot_interval
+                       // however, if the symlink granularity is lower, then during that time, no intermediate
+                       // symlinks will be generated. That should be ok, because any timestamps previously
+                       // returned would already have generated symlinks, but this does have bug potential
+                       let remainder = current_time % snapshot_interval;
+                       let time_until_next_day = snapshot_interval - remainder;
 
                        log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day);
                        // add in an extra five seconds to assure the rounding down works correctly