Distinguish between snapshot and symlink intervals.
[rapid-gossip-sync-server] / src / snapshot.rs
index bbe94a93618037a227dac3cad4aab0c815658b78..dd3bb7604ed04ee2ba371a9537d8feb47c3c6929 100644 (file)
@@ -1,28 +1,32 @@
 use std::collections::HashMap;
 use std::fs;
+use std::ops::Deref;
 use std::os::unix::fs::symlink;
 use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use lightning::log_info;
 
 use lightning::routing::gossip::NetworkGraph;
+use lightning::util::logger::Logger;
 
-use crate::{config, TestLogger};
+use crate::config;
 use crate::config::cache_path;
 
-pub(crate) struct Snapshotter {
-       network_graph: Arc<NetworkGraph<TestLogger>>,
+pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
+       network_graph: Arc<NetworkGraph<L>>,
+       logger: L
 }
 
-impl Snapshotter {
-       pub fn new(network_graph: Arc<NetworkGraph<TestLogger>>) -> Self {
-               Self { network_graph }
+impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
+       pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
+               Self { network_graph, logger }
        }
 
        pub(crate) async fn snapshot_gossip(&self) {
-               println!("Initiating snapshotting service");
+               log_info!(self.logger, "Initiating snapshotting service");
 
-               let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
-               let round_day_seconds = config::SNAPSHOT_CALCULATION_INTERVAL as u64;
+               let snapshot_interval = config::snapshot_generation_interval() as u64;
+               let snapshot_sync_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
 
                let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
                let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
@@ -34,10 +38,10 @@ impl Snapshotter {
                loop {
                        // 1. get the current timestamp
                        let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-                       let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, round_day_seconds);
-                       println!("Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
+                       let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64);
+                       log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
 
-                       // 2. sleep until the next round 24 hours
+                       // 2. sleep until the next round interval
                        // 3. refresh all snapshots
 
                        // the stored snapshots should adhere to the following format
@@ -64,9 +68,9 @@ impl Snapshotter {
                        fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
 
                        let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
-                       for factor in &snapshot_sync_day_factors {
+                       for factor in &snapshot_sync_factors {
                                // basically timestamp - day_seconds * factor
-                               let timestamp = reference_timestamp.saturating_sub(round_day_seconds.saturating_mul(factor.clone()));
+                               let timestamp = reference_timestamp.saturating_sub(snapshot_interval.saturating_mul(factor.clone()));
                                snapshot_sync_timestamps.push((factor.clone(), timestamp));
                        };
 
@@ -75,14 +79,14 @@ impl Snapshotter {
                        for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
                                let network_graph_clone = self.network_graph.clone();
                                {
-                                       println!("Calculating {}-day snapshot", day_range);
+                                       log_info!(self.logger, "Calculating {}-day snapshot", day_range);
                                        // calculate the snapshot
-                                       let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, true).await;
+                                       let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
 
                                        // persist the snapshot and update the symlink
                                        let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);
                                        let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
-                                       println!("Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
+                                       log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
                                        fs::write(&snapshot_path, snapshot.data).unwrap();
                                        snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
                                }
@@ -97,11 +101,13 @@ impl Snapshotter {
 
                                let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
                                let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
-                               println!("Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
+                               log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
                                symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
                        }
 
-                       for i in 0..10_001u64 {
+                       // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
+                       let symlink_count = (reference_timestamp - 1640995200) / config::SYMLINK_GRANULARITY_INTERVAL as u64;
+                       for i in 0..symlink_count {
                                // let's create non-dummy-symlinks
 
                                // first, determine which snapshot range should be referenced
@@ -109,11 +115,27 @@ impl Snapshotter {
                                        // special-case 0 to always refer to a full/initial sync
                                        u64::MAX
                                } else {
+                                       /*
+                                       We have snapshots for 6-day- and 7-day-intervals, but the next interval is
+                                       14 days. So if somebody requests an update with a timestamp that is 10 days old,
+                                       there is no longer a snapshot for that specific interval.
+
+                                       The correct snapshot will be the next highest interval, i. e. for 14 days.
+
+                                       The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
+                                       return on the first iteration that is at least equal to the requested interval.
+
+                                       Note, however, that the last value in the array is u64::max, which means that
+                                       multiplying it with snapshot_interval will overflow. To avoid that, we use
+                                       saturating_mul.
+                                        */
+
                                        // find min(x) in snapshot_sync_day_factors where x >= i
-                                       snapshot_sync_day_factors.iter().find(|x| {
-                                               x >= &&i
+                                       snapshot_sync_factors.iter().find(|x| {
+                                               snapshot_interval.saturating_mul(**x) >= i * config::SYMLINK_GRANULARITY_INTERVAL as u64
                                        }).unwrap().clone()
                                };
+                               log_info!(self.logger, "i: {}, referenced day range: {}", i, referenced_day_range);
 
                                let snapshot_filename = snapshot_filenames_by_day_range.get(&referenced_day_range).unwrap();
                                let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
@@ -122,11 +144,11 @@ impl Snapshotter {
                                        // special-case 0 to always refer to a full/initial sync
                                        0
                                } else {
-                                       reference_timestamp.saturating_sub(round_day_seconds.saturating_mul(i))
+                                       reference_timestamp.saturating_sub((config::SYMLINK_GRANULARITY_INTERVAL as u64).saturating_mul(i))
                                };
                                let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
 
-                               println!("Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
+                               log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
                                symlink(&relative_snapshot_path, &symlink_path).unwrap();
                        }
 
@@ -145,10 +167,15 @@ impl Snapshotter {
 
                        // constructing the snapshots may have taken a while
                        let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
-                       let remainder = current_time % round_day_seconds;
-                       let time_until_next_day = round_day_seconds - remainder;
 
-                       println!("Sleeping until next snapshot capture: {}s", time_until_next_day);
+                       // NOTE: we're waiting until the next multiple of snapshot_interval
+                       // however, if the symlink granularity is lower, then during that time, no intermediate
+                       // symlinks will be generated. That should be ok, because any timestamps previously
+                       // returned would already have generated symlinks, but this does have bug potential
+                       let remainder = current_time % snapshot_interval;
+                       let time_until_next_day = snapshot_interval - remainder;
+
+                       log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day);
                        // add in an extra five seconds to assure the rounding down works correctly
                        let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
                        sleep.await;