1 use std::collections::HashMap;
4 use std::os::unix::fs::symlink;
6 use std::time::{Duration, SystemTime, UNIX_EPOCH};
7 use lightning::log_info;
9 use lightning::routing::gossip::NetworkGraph;
10 use lightning::util::logger::Logger;
13 use crate::config::cache_path;
15 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
16 network_graph: Arc<NetworkGraph<L>>,
20 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
21 pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
22 Self { network_graph, logger }
25 pub(crate) async fn snapshot_gossip(&self) {
26 log_info!(self.logger, "Initiating snapshotting service");
28 let snapshot_interval = config::snapshot_generation_interval() as u64;
29 let mut snapshot_scopes = vec![];
30 { // double the coefficient until it reaches the maximum (limited) snapshot scope
31 let mut current_scope = snapshot_interval;
33 snapshot_scopes.push(current_scope);
34 if current_scope >= config::MAX_SNAPSHOT_SCOPE as u64 {
35 snapshot_scopes.push(u64::MAX);
39 // double the current factor
44 // this is gonna be a never-ending background job
46 self.generate_snapshots(config::SYMLINK_GRANULARITY_INTERVAL as u64, snapshot_interval, &snapshot_scopes, &cache_path(), None).await;
48 // constructing the snapshots may have taken a while
49 let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
51 // NOTE: we're waiting until the next multiple of snapshot_interval
52 // however, if the symlink granularity is lower, then during that time, no intermediate
53 // symlinks will be generated. That should be ok, because any timestamps previously
54 // returned would already have generated symlinks, but this does have bug potential
55 let remainder = current_time % snapshot_interval;
56 let time_until_next_generation = snapshot_interval - remainder;
58 log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_generation);
59 // add in an extra five seconds to assure the rounding down works correctly
60 let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_generation + 5));
65 pub(crate) async fn generate_snapshots(&self, granularity_interval: u64, snapshot_interval: u64, snapshot_scopes: &[u64], cache_path: &str, max_symlink_count: Option<u64>) {
66 let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path);
67 let pending_symlink_directory = format!("{}/symlinks_pending", cache_path);
68 let finalized_snapshot_directory = format!("{}/snapshots", cache_path);
69 let finalized_symlink_directory = format!("{}/symlinks", cache_path);
70 let relative_symlink_to_snapshot_path = "../snapshots";
72 // 1. get the current timestamp
73 let snapshot_generation_time = SystemTime::now();
74 let snapshot_generation_timestamp = snapshot_generation_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
75 let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval);
76 log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
78 // 2. sleep until the next round interval
79 // 3. refresh all snapshots
81 // the stored snapshots should adhere to the following format
87 // from three weeks ago
89 // That means that at any given moment, there should only ever be
90 // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
91 // The snapshots, unlike dynamic updates, should account for all intermediate
94 // purge and recreate the pending directories
95 if fs::metadata(&pending_snapshot_directory).is_ok() {
96 fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
98 if fs::metadata(&pending_symlink_directory).is_ok() {
99 fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
101 fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
102 fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
104 let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
105 for current_scope in snapshot_scopes {
106 let timestamp = reference_timestamp.saturating_sub(current_scope.clone());
107 snapshot_sync_timestamps.push((current_scope.clone(), timestamp));
110 let mut snapshot_filenames_by_scope: HashMap<u64, String> = HashMap::with_capacity(10);
112 for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps {
113 let network_graph_clone = self.network_graph.clone();
115 log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
116 // calculate the snapshot
117 let delta = super::calculate_delta(network_graph_clone.clone(), current_last_sync_timestamp.clone() as u32, Some(reference_timestamp), self.logger.clone()).await;
118 let snapshot = super::serialize_delta(&delta, 1, self.logger.clone());
120 // persist the snapshot and update the symlink
121 let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
122 let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
123 log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.channel_announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
124 fs::write(&snapshot_path, snapshot.data).unwrap();
125 snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
130 // create dummy symlink
131 let dummy_filename = "empty_delta.lngossip";
132 let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
133 let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
134 fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
136 let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
137 let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
138 log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
139 symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
142 // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
143 let mut symlink_count = (reference_timestamp - 1640995200) / granularity_interval;
144 if let Some(max_symlink_count) = max_symlink_count {
145 // this is primarily useful for testing
146 symlink_count = std::cmp::min(symlink_count, max_symlink_count);
149 for i in 0..symlink_count {
150 // let's create non-dummy-symlinks
152 // first, determine which snapshot range should be referenced
153 let referenced_scope = if i == 0 {
154 // special-case 0 to always refer to a full/initial sync
158 We have snapshots for 6-day- and 7-day-intervals, but the next interval is
159 14 days. So if somebody requests an update with a timestamp that is 10 days old,
160 there is no longer a snapshot for that specific interval.
162 The correct snapshot will be the next highest interval, i. e. for 14 days.
164 The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
165 return on the first iteration that is at least equal to the requested interval.
167 Note, however, that the last value in the array is u64::max, which means that
168 multiplying it with snapshot_interval will overflow. To avoid that, we use
172 // find min(x) in snapshot_scopes where i * granularity <= x (the current scope)
173 snapshot_scopes.iter().find(|current_scope| {
174 i * granularity_interval <= **current_scope
177 log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);
179 let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
180 let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
182 let canonical_last_sync_timestamp = if i == 0 {
183 // special-case 0 to always refer to a full/initial sync
186 reference_timestamp.saturating_sub(granularity_interval.saturating_mul(i))
188 let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
190 log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
191 symlink(&relative_snapshot_path, &symlink_path).unwrap();
194 let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
195 let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
196 fs::write(&update_time_path, format!("{}", update_time)).unwrap();
198 if fs::metadata(&finalized_snapshot_directory).is_ok() {
199 fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
201 if fs::metadata(&finalized_symlink_directory).is_ok() {
202 fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
204 fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
205 fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
208 pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
209 let round_multiple_delta = number % multiple;
210 number - round_multiple_delta