Merge pull request #57 from arik-so/2023/08/old-incremental-update-fix
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Wed, 6 Sep 2023 22:16:26 +0000 (22:16 +0000)
committerGitHub <noreply@github.com>
Wed, 6 Sep 2023 22:16:26 +0000 (22:16 +0000)
Send full updates after old last seen updates.

README.md
src/config.rs
src/lib.rs
src/snapshot.rs

index 13f46c6a561dbad062c9f0496ebeedaa8c45899f..12104909aac1cea00e7a769a51ae2bc4c5a02412 100644 (file)
--- a/README.md
+++ b/README.md
@@ -12,17 +12,18 @@ These are the components it's comprised of.
 A config file where the Postgres credentials and Lightning peers can be adjusted. Most adjustments
 can be made by setting environment variables, whose usage is as follows:
 
-| Name                                 | Default             | Description                                                                                                |
-|:-------------------------------------|:--------------------|:-----------------------------------------------------------------------------------------------------------|
-| RAPID_GOSSIP_SYNC_SERVER_DB_HOST     | localhost           | Domain of the Postgres database                                                                            |
-| RAPID_GOSSIP_SYNC_SERVER_DB_USER     | alice               | Username to access Postgres                                                                                |
-| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD | _None_              | Password to access Postgres                                                                                |
-| RAPID_GOSSIP_SYNC_SERVER_DB_NAME     | ln_graph_sync       | Name of the database to be used for gossip storage                                                         |
-| RAPID_GOSSIP_SYNC_SERVER_NETWORK     | mainnet             | Network to operate in. Possible values are mainnet, testnet, signet, regtest                               |
-| BITCOIN_REST_DOMAIN                  | 127.0.0.1           | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
-| BITCOIN_REST_PORT                    | 8332                | HTTP port of the bitcoind REST server                                                                      |
-| BITCOIN_REST_PATH                    | /rest/              | Path infix to access the bitcoind REST endpoints                                                           |
-| LN_PEERS                             | _Wallet of Satoshi_ | Comma separated list of LN peers to use for retrieving gossip                                              |
+| Name                                       | Default             | Description                                                                                                |
+|:-------------------------------------------|:--------------------|:-----------------------------------------------------------------------------------------------------------|
+| RAPID_GOSSIP_SYNC_SERVER_DB_HOST           | localhost           | Domain of the Postgres database                                                                            |
+| RAPID_GOSSIP_SYNC_SERVER_DB_USER           | alice               | Username to access Postgres                                                                                |
+| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD       | _None_              | Password to access Postgres                                                                                |
+| RAPID_GOSSIP_SYNC_SERVER_DB_NAME           | ln_graph_sync       | Name of the database to be used for gossip storage                                                         |
+| RAPID_GOSSIP_SYNC_SERVER_NETWORK           | mainnet             | Network to operate in. Possible values are mainnet, testnet, signet, regtest                               |
+| RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL | 10800               | The interval in seconds between snapshots                                                                  |
+| BITCOIN_REST_DOMAIN                        | 127.0.0.1           | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
+| BITCOIN_REST_PORT                          | 8332                | HTTP port of the bitcoind REST server                                                                      |
+| BITCOIN_REST_PATH                          | /rest/              | Path infix to access the bitcoind REST endpoints                                                           |
+| LN_PEERS                                   | _Wallet of Satoshi_ | Comma separated list of LN peers to use for retrieving gossip                                              |
 
 ### downloader
 
@@ -36,7 +37,7 @@ The module responsible for persisting all the downloaded graph data to Postgres.
 
 The snapshotting module is responsible for calculating and storing snapshots. It's started up
 as soon as the first full graph sync completes, and then keeps updating the snapshots at a
-24-hour-interval.
+configurable interval with a 3-hour-default.
 
 ### lookup
 
index db6d2ef7a243d69de87d41963d3087f3543cb88c..b905c53f3b877f27b12e21d8fd816f9864a6a1a0 100644 (file)
@@ -16,13 +16,24 @@ use lightning_block_sync::http::HttpEndpoint;
 use tokio_postgres::Config;
 
 pub(crate) const SCHEMA_VERSION: i32 = 12;
-pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds
+pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
+pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
+// generate symlinks based on a 3-hour-granularity
 /// If the last update in either direction was more than six days ago, we send a reminder
 /// That reminder may be either in the form of a channel announcement, or in the form of empty
 /// updates in both directions.
 pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60);
 pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;
 
+pub(crate) fn snapshot_generation_interval() -> u32 {
+       let interval = env::var("RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL").unwrap_or(SYMLINK_GRANULARITY_INTERVAL.to_string())
+               .parse::<u32>()
+               .expect("RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL env variable must be a u32.");
+       assert!(interval > 0, "RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL must be positive");
+       assert_eq!(interval % SYMLINK_GRANULARITY_INTERVAL, 0, "RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL must be a multiple of {} (seconds)", SYMLINK_GRANULARITY_INTERVAL);
+       interval
+}
+
 pub(crate) fn network() -> Network {
        let network = env::var("RAPID_GOSSIP_SYNC_SERVER_NETWORK").unwrap_or("bitcoin".to_string()).to_lowercase();
        match network.as_str() {
@@ -161,7 +172,7 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
                                        tx_ref.execute("UPDATE channel_updates SET short_channel_id = $1, direction = $2 WHERE id = $3", &[&scid, &direction, &id]).await.unwrap();
                                });
                        }
-                       while let Some(_) = updates.next().await { }
+                       while let Some(_) = updates.next().await {}
                }
                tx.execute("ALTER TABLE channel_updates ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap();
                tx.execute("ALTER TABLE channel_updates ALTER short_channel_id SET NOT NULL", &[]).await.unwrap();
@@ -188,7 +199,7 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
                                        tx_ref.execute("UPDATE channel_announcements SET short_channel_id = $1 WHERE id = $2", &[&scid, &id]).await.unwrap();
                                });
                        }
-                       while let Some(_) = updates.next().await { }
+                       while let Some(_) = updates.next().await {}
                }
                tx.execute("ALTER TABLE channel_announcements ADD CONSTRAINT channel_announcements_short_channel_id_key UNIQUE (short_channel_id)", &[]).await.unwrap();
                tx.execute("ALTER TABLE channel_announcements ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap();
index 550ed79ddcdac4d56f65e93e5ac94f7fa030bfa7..f56aca260f218c0798df4b6ef2300c8534c4dcf5 100644 (file)
@@ -21,6 +21,7 @@ use lightning::util::logger::Logger;
 use lightning::util::ser::{ReadableArgs, Writeable};
 use tokio::sync::mpsc;
 use tokio_postgres::{Client, NoTls};
+use crate::config::SYMLINK_GRANULARITY_INTERVAL;
 use crate::lookup::DeltaSet;
 
 use crate::persistence::GossipPersister;
@@ -85,6 +86,9 @@ impl<L: Deref + Clone + Send + Sync + 'static> RapidSyncProcessor<L> where L::Ta
        }
 
        pub async fn start_sync(&self) {
+               log_info!(self.logger, "Starting Rapid Gossip Sync Server");
+               log_info!(self.logger, "Snapshot interval: {} seconds", config::snapshot_generation_interval());
+
                // means to indicate sync completion status within this module
                let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1);
 
@@ -146,7 +150,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
        let chain_hash = genesis_block.block_hash();
        chain_hash.write(&mut blob).unwrap();
 
-       let blob_timestamp = Snapshotter::<Arc<RGSSLogger>>::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
+       let blob_timestamp = Snapshotter::<Arc<RGSSLogger>>::round_down_to_nearest_multiple(current_timestamp, SYMLINK_GRANULARITY_INTERVAL as u64) as u32;
        blob_timestamp.write(&mut blob).unwrap();
 
        0u32.write(&mut blob).unwrap(); // node count
@@ -162,6 +166,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
        network_graph.remove_stale_channels_and_tracking();
 
        let mut output: Vec<u8> = vec![];
+       let snapshot_interval = config::snapshot_generation_interval();
 
        // set a flag if the chain hash is prepended
        // chain hash only necessary if either channel announcements or non-incremental updates are present
@@ -247,7 +252,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
        serialization_details.chain_hash.write(&mut prefixed_output).unwrap();
        // always write the latest seen timestamp
        let latest_seen_timestamp = serialization_details.latest_seen;
-       let overflow_seconds = latest_seen_timestamp % config::SNAPSHOT_CALCULATION_INTERVAL;
+       let overflow_seconds = latest_seen_timestamp % snapshot_interval;
        let serialized_seen_timestamp = latest_seen_timestamp.saturating_sub(overflow_seconds);
        serialized_seen_timestamp.write(&mut prefixed_output).unwrap();
 
index 65f6fad03823b56cedd249c2eff87e137dcc56b9..218285652868b56dac11dacedf3cf3c843d10510 100644 (file)
@@ -14,7 +14,7 @@ use crate::config::cache_path;
 
 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
        network_graph: Arc<NetworkGraph<L>>,
-       logger: L
+       logger: L,
 }
 
 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
@@ -25,8 +25,21 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
        pub(crate) async fn snapshot_gossip(&self) {
                log_info!(self.logger, "Initiating snapshotting service");
 
-               let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
-               const DAY_SECONDS: u64 = 60 * 60 * 24;
+               let snapshot_interval = config::snapshot_generation_interval() as u64;
+               let mut snapshot_scopes = vec![];
+               { // double the coefficient until it reaches the maximum (limited) snapshot scope
+                       let mut current_scope = snapshot_interval;
+                       loop {
+                               snapshot_scopes.push(current_scope);
+                               if current_scope >= config::MAX_SNAPSHOT_SCOPE as u64 {
+                                       snapshot_scopes.push(u64::MAX);
+                                       break;
+                               }
+
+                               // double the current factor
+                               current_scope <<= 1;
+                       }
+               }
 
                let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
                let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
@@ -38,10 +51,10 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                loop {
                        // 1. get the current timestamp
                        let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-                       let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64);
+                       let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64);
                        log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
 
-                       // 2. sleep until the next round 24 hours
+                       // 2. sleep until the next round interval
                        // 3. refresh all snapshots
 
                        // the stored snapshots should adhere to the following format
@@ -58,37 +71,36 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                        // channel updates
 
                        // purge and recreate the pending directories
-                       if fs::metadata(&pending_snapshot_directory).is_ok(){
+                       if fs::metadata(&pending_snapshot_directory).is_ok() {
                                fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
                        }
-                       if fs::metadata(&pending_symlink_directory).is_ok(){
+                       if fs::metadata(&pending_symlink_directory).is_ok() {
                                fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
                        }
                        fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
                        fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
 
                        let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
-                       for factor in &snapshot_sync_day_factors {
-                               // basically timestamp - day_seconds * factor
-                               let timestamp = reference_timestamp.saturating_sub(DAY_SECONDS.saturating_mul(factor.clone()));
-                               snapshot_sync_timestamps.push((factor.clone(), timestamp));
+                       for current_scope in &snapshot_scopes {
+                               let timestamp = reference_timestamp.saturating_sub(current_scope.clone());
+                               snapshot_sync_timestamps.push((current_scope.clone(), timestamp));
                        };
 
-                       let mut snapshot_filenames_by_day_range: HashMap<u64, String> = HashMap::with_capacity(10);
+                       let mut snapshot_filenames_by_scope: HashMap<u64, String> = HashMap::with_capacity(10);
 
-                       for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
+                       for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps {
                                let network_graph_clone = self.network_graph.clone();
                                {
-                                       log_info!(self.logger, "Calculating {}-day snapshot", day_range);
+                                       log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
                                        // calculate the snapshot
                                        let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
 
                                        // persist the snapshot and update the symlink
-                                       let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);
+                                       let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
                                        let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
-                                       log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
+                                       log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
                                        fs::write(&snapshot_path, snapshot.data).unwrap();
-                                       snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
+                                       snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
                                }
                        }
 
@@ -106,12 +118,12 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                        }
 
                        // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
-                       let symlink_count = (reference_timestamp - 1640995200) / config::SNAPSHOT_CALCULATION_INTERVAL as u64;
+                       let symlink_count = (reference_timestamp - 1640995200) / config::SYMLINK_GRANULARITY_INTERVAL as u64;
                        for i in 0..symlink_count {
                                // let's create non-dummy-symlinks
 
                                // first, determine which snapshot range should be referenced
-                               let referenced_day_range = if i == 0 {
+                               let referenced_scope = if i == 0 {
                                        // special-case 0 to always refer to a full/initial sync
                                        u64::MAX
                                } else {
@@ -126,29 +138,29 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                                        return on the first iteration that is at least equal to the requested interval.
 
                                        Note, however, that the last value in the array is u64::max, which means that
-                                       multiplying it with DAY_SECONDS will overflow. To avoid that, we use
+                                       multiplying it with snapshot_interval will overflow. To avoid that, we use
                                        saturating_mul.
                                         */
 
-                                       // find min(x) in snapshot_sync_day_factors where x >= i
-                                       snapshot_sync_day_factors.iter().find(|x| {
-                                               DAY_SECONDS.saturating_mul(**x) >= i * config::SNAPSHOT_CALCULATION_INTERVAL as u64
+                                       // find min(x) in snapshot_scopes where i * granularity <= x (the current scope)
+                                       snapshot_scopes.iter().find(|current_scope| {
+                                               i * config::SYMLINK_GRANULARITY_INTERVAL as u64 <= **current_scope
                                        }).unwrap().clone()
                                };
-                               log_info!(self.logger, "i: {}, referenced day range: {}", i, referenced_day_range);
+                               log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);
 
-                               let snapshot_filename = snapshot_filenames_by_day_range.get(&referenced_day_range).unwrap();
+                               let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
                                let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
 
                                let canonical_last_sync_timestamp = if i == 0 {
                                        // special-case 0 to always refer to a full/initial sync
                                        0
                                } else {
-                                       reference_timestamp.saturating_sub((config::SNAPSHOT_CALCULATION_INTERVAL as u64).saturating_mul(i))
+                                       reference_timestamp.saturating_sub((config::SYMLINK_GRANULARITY_INTERVAL as u64).saturating_mul(i))
                                };
                                let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
 
-                               log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
+                               log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
                                symlink(&relative_snapshot_path, &symlink_path).unwrap();
                        }
 
@@ -156,10 +168,10 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
                        let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
                        fs::write(&update_time_path, format!("{}", update_time)).unwrap();
 
-                       if fs::metadata(&finalized_snapshot_directory).is_ok(){
+                       if fs::metadata(&finalized_snapshot_directory).is_ok() {
                                fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
                        }
-                       if fs::metadata(&finalized_symlink_directory).is_ok(){
+                       if fs::metadata(&finalized_symlink_directory).is_ok() {
                                fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
                        }
                        fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
@@ -167,12 +179,17 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
 
                        // constructing the snapshots may have taken a while
                        let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-                       let remainder = current_time % config::SNAPSHOT_CALCULATION_INTERVAL as u64;
-                       let time_until_next_day = config::SNAPSHOT_CALCULATION_INTERVAL as u64 - remainder;
 
-                       log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day);
+                       // NOTE: we're waiting until the next multiple of snapshot_interval
+                       // however, if the symlink granularity is lower, then during that time, no intermediate
+                       // symlinks will be generated. That should be ok, because any timestamps previously
+                       // returned would already have generated symlinks, but this does have bug potential
+                       let remainder = current_time % snapshot_interval;
+                       let time_until_next_generation = snapshot_interval - remainder;
+
+                       log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_generation);
                        // add in an extra five seconds to assure the rounding down works correctly
-                       let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
+                       let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_generation + 5));
                        sleep.await;
                }
        }