1 use std::collections::HashMap;
4 use std::os::unix::fs::symlink;
6 use std::time::{Duration, SystemTime, UNIX_EPOCH};
7 use lightning::log_info;
9 use lightning::routing::gossip::NetworkGraph;
10 use lightning::util::logger::Logger;
13 use crate::config::cache_path;
15 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
16 network_graph: Arc<NetworkGraph<L>>,
20 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
21 pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
22 Self { network_graph, logger }
25 pub(crate) async fn snapshot_gossip(&self) {
26 log_info!(self.logger, "Initiating snapshotting service");
28 let snapshot_interval = config::snapshot_generation_interval() as u64;
29 let mut snapshot_scopes = vec![];
30 { // double the coefficient until it reaches the maximum (limited) snapshot scope
31 let mut current_scope = snapshot_interval;
33 snapshot_scopes.push(current_scope);
34 if current_scope >= config::MAX_SNAPSHOT_SCOPE as u64 {
35 snapshot_scopes.push(u64::MAX);
39 // double the current factor
44 // this is gonna be a never-ending background job
46 self.generate_snapshots(config::SYMLINK_GRANULARITY_INTERVAL as u64, snapshot_interval, &snapshot_scopes, &cache_path(), None).await;
48 // constructing the snapshots may have taken a while
49 let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
51 // NOTE: we're waiting until the next multiple of snapshot_interval
52 // however, if the symlink granularity is lower, then during that time, no intermediate
53 // symlinks will be generated. That should be ok, because any timestamps previously
54 // returned would already have generated symlinks, but this does have bug potential
55 let remainder = current_time % snapshot_interval;
56 let time_until_next_generation = snapshot_interval - remainder;
58 log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_generation);
59 // add in an extra five seconds to assure the rounding down works correctly
60 let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_generation + 5));
65 pub(crate) async fn generate_snapshots(&self, granularity_interval: u64, snapshot_interval: u64, snapshot_scopes: &[u64], cache_path: &str, max_symlink_count: Option<u64>) {
66 let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path);
67 let pending_symlink_directory = format!("{}/symlinks_pending", cache_path);
68 let finalized_snapshot_directory = format!("{}/snapshots", cache_path);
69 let finalized_symlink_directory = format!("{}/symlinks", cache_path);
70 let relative_symlink_to_snapshot_path = "../snapshots";
72 // 1. get the current timestamp
73 let snapshot_generation_time = SystemTime::now();
74 let snapshot_generation_timestamp = snapshot_generation_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
75 let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval);
76 log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
78 // 2. sleep until the next round interval
79 // 3. refresh all snapshots
81 // the stored snapshots should adhere to the following format
87 // from three weeks ago
89 // That means that at any given moment, there should only ever be
90 // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
91 // The snapshots, unlike dynamic updates, should account for all intermediate
94 // purge and recreate the pending directories
95 let suffixes = [("", ""), ("/v2", "../")];
96 for (suffix, _) in suffixes {
97 let versioned_snapshot_directory = format!("{}{}", pending_snapshot_directory, suffix);
98 let versioned_symlink_directory = format!("{}{}", pending_symlink_directory, suffix);
100 if fs::metadata(&versioned_snapshot_directory).is_ok() {
101 fs::remove_dir_all(&versioned_snapshot_directory).expect("Failed to remove pending snapshot directory.");
103 if fs::metadata(&versioned_symlink_directory).is_ok() {
104 fs::remove_dir_all(&versioned_symlink_directory).expect("Failed to remove pending symlink directory.");
106 fs::create_dir_all(&versioned_snapshot_directory).expect("Failed to create pending snapshot directory");
107 fs::create_dir_all(&versioned_symlink_directory).expect("Failed to create pending symlink directory");
110 let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
111 for current_scope in snapshot_scopes {
112 let timestamp = reference_timestamp.saturating_sub(current_scope.clone());
113 snapshot_sync_timestamps.push((current_scope.clone(), timestamp));
116 let mut snapshot_filenames_by_scope: HashMap<u64, String> = HashMap::with_capacity(10);
118 for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps {
119 let network_graph_clone = self.network_graph.clone();
121 log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
122 // calculate the snapshot
123 let delta = super::calculate_delta(network_graph_clone.clone(), current_last_sync_timestamp.clone() as u32, Some(reference_timestamp), self.logger.clone()).await;
124 let snapshot_v1 = super::serialize_delta(&delta, 1, self.logger.clone());
125 let snapshot_v2 = super::serialize_delta(&delta, 2, self.logger.clone());
127 // persist the snapshot and update the symlink
128 let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
129 let snapshot_path_v1 = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
130 let snapshot_path_v2 = format!("{}/v2/{}", pending_snapshot_directory, snapshot_filename);
131 log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot_v1.message_count, snapshot_v1.channel_announcement_count, snapshot_v1.update_count, snapshot_v1.update_count_full, snapshot_v1.update_count_incremental);
132 fs::write(&snapshot_path_v1, snapshot_v1.data).unwrap();
133 fs::write(&snapshot_path_v2, snapshot_v2.data).unwrap();
134 snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
139 // create dummy symlink
140 let dummy_filename = "empty_delta.lngossip";
141 let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
142 let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
143 fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
145 let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
146 let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
147 log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
148 symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
151 // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
152 let mut symlink_count = (reference_timestamp - 1640995200) / granularity_interval;
153 if let Some(max_symlink_count) = max_symlink_count {
154 // this is primarily useful for testing
155 symlink_count = std::cmp::min(symlink_count, max_symlink_count);
158 for i in 0..symlink_count {
159 // let's create non-dummy-symlinks
161 // first, determine which snapshot range should be referenced
162 let referenced_scope = if i == 0 {
163 // special-case 0 to always refer to a full/initial sync
167 We have snapshots for 6-day- and 7-day-intervals, but the next interval is
168 14 days. So if somebody requests an update with a timestamp that is 10 days old,
169 there is no longer a snapshot for that specific interval.
171 The correct snapshot will be the next highest interval, i. e. for 14 days.
173 The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
174 return on the first iteration that is at least equal to the requested interval.
176 Note, however, that the last value in the array is u64::max, which means that
177 multiplying it with snapshot_interval will overflow. To avoid that, we use
181 // find min(x) in snapshot_scopes where i * granularity <= x (the current scope)
182 snapshot_scopes.iter().find(|current_scope| {
183 i * granularity_interval <= **current_scope
186 log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);
188 for (suffix, path_to_root) in suffixes {
189 let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
190 let relative_snapshot_path = format!("{}{}{}/{}", path_to_root, relative_symlink_to_snapshot_path, suffix, snapshot_filename);
192 let canonical_last_sync_timestamp = if i == 0 {
193 // special-case 0 to always refer to a full/initial sync
196 reference_timestamp.saturating_sub(granularity_interval.saturating_mul(i))
198 let symlink_path = format!("{}{}/{}.bin", pending_symlink_directory, suffix, canonical_last_sync_timestamp);
200 log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
201 symlink(&relative_snapshot_path, &symlink_path).unwrap();
205 let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
206 let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
207 fs::write(&update_time_path, format!("{}", update_time)).unwrap();
209 if fs::metadata(&finalized_snapshot_directory).is_ok() {
210 fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
212 if fs::metadata(&finalized_symlink_directory).is_ok() {
213 fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
215 fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
216 fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
219 pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
220 let round_multiple_delta = number % multiple;
221 number - round_multiple_delta