1 use std::collections::HashMap;
4 use std::os::unix::fs::symlink;
6 use std::time::{Duration, SystemTime, UNIX_EPOCH};
7 use lightning::log_info;
9 use lightning::routing::gossip::NetworkGraph;
10 use lightning::util::logger::Logger;
13 use crate::config::cache_path;
15 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
16 network_graph: Arc<NetworkGraph<L>>,
20 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
21 pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
22 Self { network_graph, logger }
25 pub(crate) async fn snapshot_gossip(&self) {
26 log_info!(self.logger, "Initiating snapshotting service");
28 let snapshot_interval = config::snapshot_generation_interval() as u64;
29 let mut snapshot_scopes = vec![];
30 { // double the coefficient until it reaches the maximum (limited) snapshot scope
31 let mut current_scope = snapshot_interval;
33 snapshot_scopes.push(current_scope);
34 if current_scope >= config::MAX_SNAPSHOT_SCOPE as u64 {
35 snapshot_scopes.push(u64::MAX);
39 // double the current factor
44 let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
45 let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
46 let finalized_snapshot_directory = format!("{}/snapshots", cache_path());
47 let finalized_symlink_directory = format!("{}/symlinks", cache_path());
48 let relative_symlink_to_snapshot_path = "../snapshots";
50 // this is gonna be a never-ending background job
52 // 1. get the current timestamp
53 let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
54 let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64);
55 log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
57 // 2. sleep until the next round interval
58 // 3. refresh all snapshots
60 // the stored snapshots should adhere to the following format
66 // from three weeks ago
68 // That means that at any given moment, there should only ever be
69 // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
70 // The snapshots, unlike dynamic updates, should account for all intermediate
73 // purge and recreate the pending directories
74 if fs::metadata(&pending_snapshot_directory).is_ok() {
75 fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
77 if fs::metadata(&pending_symlink_directory).is_ok() {
78 fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
80 fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
81 fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
83 let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
84 for current_scope in &snapshot_scopes {
85 let timestamp = reference_timestamp.saturating_sub(current_scope.clone());
86 snapshot_sync_timestamps.push((current_scope.clone(), timestamp));
89 let mut snapshot_filenames_by_scope: HashMap<u64, String> = HashMap::with_capacity(10);
91 for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps {
92 let network_graph_clone = self.network_graph.clone();
94 log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
95 // calculate the snapshot
96 let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
98 // persist the snapshot and update the symlink
99 let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
100 let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
101 log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
102 fs::write(&snapshot_path, snapshot.data).unwrap();
103 snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
108 // create dummy symlink
109 let dummy_filename = "empty_delta.lngossip";
110 let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
111 let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
112 fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
114 let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
115 let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
116 log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
117 symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
120 // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
121 let symlink_count = (reference_timestamp - 1640995200) / config::SYMLINK_GRANULARITY_INTERVAL as u64;
122 for i in 0..symlink_count {
123 // let's create non-dummy-symlinks
125 // first, determine which snapshot range should be referenced
126 let referenced_scope = if i == 0 {
127 // special-case 0 to always refer to a full/initial sync
131 We have snapshots for 6-day- and 7-day-intervals, but the next interval is
132 14 days. So if somebody requests an update with a timestamp that is 10 days old,
133 there is no longer a snapshot for that specific interval.
135 The correct snapshot will be the next highest interval, i. e. for 14 days.
137 The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
138 return on the first iteration that is at least equal to the requested interval.
140 Note, however, that the last value in the array is u64::max, which means that
141 multiplying it with snapshot_interval will overflow. To avoid that, we use
145 // find min(x) in snapshot_scopes where i * granularity <= x (the current scope)
146 snapshot_scopes.iter().find(|current_scope| {
147 i * config::SYMLINK_GRANULARITY_INTERVAL as u64 <= **current_scope
150 log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);
152 let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
153 let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
155 let canonical_last_sync_timestamp = if i == 0 {
156 // special-case 0 to always refer to a full/initial sync
159 reference_timestamp.saturating_sub((config::SYMLINK_GRANULARITY_INTERVAL as u64).saturating_mul(i))
161 let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
163 log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
164 symlink(&relative_snapshot_path, &symlink_path).unwrap();
167 let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
168 let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
169 fs::write(&update_time_path, format!("{}", update_time)).unwrap();
171 if fs::metadata(&finalized_snapshot_directory).is_ok() {
172 fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
174 if fs::metadata(&finalized_symlink_directory).is_ok() {
175 fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
177 fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
178 fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
180 // constructing the snapshots may have taken a while
181 let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
183 // NOTE: we're waiting until the next multiple of snapshot_interval
184 // however, if the symlink granularity is lower, then during that time, no intermediate
185 // symlinks will be generated. That should be ok, because any timestamps previously
186 // returned would already have generated symlinks, but this does have bug potential
187 let remainder = current_time % snapshot_interval;
188 let time_until_next_generation = snapshot_interval - remainder;
190 log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_generation);
191 // add in an extra five seconds to assure the rounding down works correctly
192 let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_generation + 5));
197 pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
198 let round_multiple_delta = number % multiple;
199 number - round_multiple_delta