1 use std::collections::HashMap;
4 use std::os::unix::fs::symlink;
6 use std::time::{Duration, SystemTime, UNIX_EPOCH};
7 use lightning::log_info;
9 use lightning::routing::gossip::NetworkGraph;
10 use lightning::util::logger::Logger;
13 use crate::config::cache_path;
15 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
16 network_graph: Arc<NetworkGraph<L>>,
20 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
21 pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
22 Self { network_graph, logger }
25 pub(crate) async fn snapshot_gossip(&self) {
26 log_info!(self.logger, "Initiating snapshotting service");
28 let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
29 const DAY_SECONDS: u64 = 60 * 60 * 24;
31 let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
32 let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
33 let finalized_snapshot_directory = format!("{}/snapshots", cache_path());
34 let finalized_symlink_directory = format!("{}/symlinks", cache_path());
35 let relative_symlink_to_snapshot_path = "../snapshots";
37 // this is gonna be a never-ending background job
39 // 1. get the current timestamp
40 let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
41 let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64);
42 log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
44 // 2. sleep until the next round 24 hours
45 // 3. refresh all snapshots
47 // the stored snapshots should adhere to the following format
53 // from three weeks ago
55 // That means that at any given moment, there should only ever be
56 // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
57 // The snapshots, unlike dynamic updates, should account for all intermediate
60 // purge and recreate the pending directories
61 if fs::metadata(&pending_snapshot_directory).is_ok(){
62 fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
64 if fs::metadata(&pending_symlink_directory).is_ok(){
65 fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
67 fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
68 fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
70 let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
71 for factor in &snapshot_sync_day_factors {
72 // basically timestamp - day_seconds * factor
73 let timestamp = reference_timestamp.saturating_sub(DAY_SECONDS.saturating_mul(factor.clone()));
74 snapshot_sync_timestamps.push((factor.clone(), timestamp));
77 let mut snapshot_filenames_by_day_range: HashMap<u64, String> = HashMap::with_capacity(10);
79 for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
80 let network_graph_clone = self.network_graph.clone();
82 log_info!(self.logger, "Calculating {}-day snapshot", day_range);
83 // calculate the snapshot
84 let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
86 // persist the snapshot and update the symlink
87 let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);
88 let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
89 log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
90 fs::write(&snapshot_path, snapshot.data).unwrap();
91 snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
96 // create dummy symlink
97 let dummy_filename = "empty_delta.lngossip";
98 let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
99 let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
100 fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
102 let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
103 let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
104 log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
105 symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
108 // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
109 let symlink_count = (reference_timestamp - 1640995200) / config::SNAPSHOT_CALCULATION_INTERVAL as u64;
110 for i in 0..symlink_count {
111 // let's create non-dummy-symlinks
113 // first, determine which snapshot range should be referenced
114 let referenced_day_range = if i == 0 {
115 // special-case 0 to always refer to a full/initial sync
119 We have snapshots for 6-day- and 7-day-intervals, but the next interval is
120 14 days. So if somebody requests an update with a timestamp that is 10 days old,
121 there is no longer a snapshot for that specific interval.
123 The correct snapshot will be the next highest interval, i. e. for 14 days.
125 The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
126 return on the first iteration that is at least equal to the requested interval.
128 Note, however, that the last value in the array is u64::max, which means that
129 multiplying it with DAY_SECONDS will overflow. To avoid that, we use
133 // find min(x) in snapshot_sync_day_factors where x >= i
134 snapshot_sync_day_factors.iter().find(|x| {
135 DAY_SECONDS.saturating_mul(**x) >= i * config::SNAPSHOT_CALCULATION_INTERVAL as u64
138 log_info!(self.logger, "i: {}, referenced day range: {}", i, referenced_day_range);
140 let snapshot_filename = snapshot_filenames_by_day_range.get(&referenced_day_range).unwrap();
141 let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
143 let canonical_last_sync_timestamp = if i == 0 {
144 // special-case 0 to always refer to a full/initial sync
147 reference_timestamp.saturating_sub((config::SNAPSHOT_CALCULATION_INTERVAL as u64).saturating_mul(i))
149 let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
151 log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
152 symlink(&relative_snapshot_path, &symlink_path).unwrap();
155 let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
156 let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
157 fs::write(&update_time_path, format!("{}", update_time)).unwrap();
159 if fs::metadata(&finalized_snapshot_directory).is_ok(){
160 fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
162 if fs::metadata(&finalized_symlink_directory).is_ok(){
163 fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
165 fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
166 fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
168 // constructing the snapshots may have taken a while
169 let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
170 let remainder = current_time % config::SNAPSHOT_CALCULATION_INTERVAL as u64;
171 let time_until_next_day = config::SNAPSHOT_CALCULATION_INTERVAL as u64 - remainder;
173 log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day);
174 // add in an extra five seconds to assure the rounding down works correctly
175 let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
180 pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
181 let round_multiple_delta = number % multiple;
182 number - round_multiple_delta