Correct v2 symlink paths
[rapid-gossip-sync-server] / src / snapshot.rs
1 use std::collections::HashMap;
2 use std::fs;
3 use std::ops::Deref;
4 use std::os::unix::fs::symlink;
5 use std::sync::Arc;
6 use std::time::{Duration, SystemTime, UNIX_EPOCH};
7 use lightning::log_info;
8
9 use lightning::routing::gossip::NetworkGraph;
10 use lightning::util::logger::Logger;
11
12 use crate::config;
13 use crate::config::cache_path;
14
15 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
16         network_graph: Arc<NetworkGraph<L>>,
17         logger: L,
18 }
19
20 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
21         pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
22                 Self { network_graph, logger }
23         }
24
25         pub(crate) async fn snapshot_gossip(&self) {
26                 log_info!(self.logger, "Initiating snapshotting service");
27
28                 let snapshot_interval = config::snapshot_generation_interval() as u64;
29                 let mut snapshot_scopes = vec![];
30                 { // double the coefficient until it reaches the maximum (limited) snapshot scope
31                         let mut current_scope = snapshot_interval;
32                         loop {
33                                 snapshot_scopes.push(current_scope);
34                                 if current_scope >= config::MAX_SNAPSHOT_SCOPE as u64 {
35                                         snapshot_scopes.push(u64::MAX);
36                                         break;
37                                 }
38
39                                 // double the current factor
40                                 current_scope <<= 1;
41                         }
42                 }
43
44                 // this is gonna be a never-ending background job
45                 loop {
46                         self.generate_snapshots(config::SYMLINK_GRANULARITY_INTERVAL as u64, snapshot_interval, &snapshot_scopes, &cache_path(), None).await;
47
48                         // constructing the snapshots may have taken a while
49                         let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
50
51                         // NOTE: we're waiting until the next multiple of snapshot_interval
52                         // however, if the symlink granularity is lower, then during that time, no intermediate
53                         // symlinks will be generated. That should be ok, because any timestamps previously
54                         // returned would already have generated symlinks, but this does have bug potential
55                         let remainder = current_time % snapshot_interval;
56                         let time_until_next_generation = snapshot_interval - remainder;
57
58                         log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_generation);
59                         // add in an extra five seconds to assure the rounding down works correctly
60                         let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_generation + 5));
61                         sleep.await;
62                 }
63         }
64
65         pub(crate) async fn generate_snapshots(&self, granularity_interval: u64, snapshot_interval: u64, snapshot_scopes: &[u64], cache_path: &str, max_symlink_count: Option<u64>) {
66                 let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path);
67                 let pending_symlink_directory = format!("{}/symlinks_pending", cache_path);
68                 let finalized_snapshot_directory = format!("{}/snapshots", cache_path);
69                 let finalized_symlink_directory = format!("{}/symlinks", cache_path);
70                 let relative_symlink_to_snapshot_path = "../snapshots";
71
72                 // 1. get the current timestamp
73                 let snapshot_generation_time = SystemTime::now();
74                 let snapshot_generation_timestamp = snapshot_generation_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
75                 let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval);
76                 log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
77
78                 // 2. sleep until the next round interval
79                 // 3. refresh all snapshots
80
81                 // the stored snapshots should adhere to the following format
82                 // from one day ago
83                 // from two days ago
84                 // …
85                 // from a week ago
86                 // from two weeks ago
87                 // from three weeks ago
88                 // full
89                 // That means that at any given moment, there should only ever be
90                 // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
91                 // The snapshots, unlike dynamic updates, should account for all intermediate
92                 // channel updates
93
94                 // purge and recreate the pending directories
95                 let suffixes = [("", ""), ("/v2", "../")];
96                 for (suffix, _) in suffixes {
97                         let versioned_snapshot_directory = format!("{}{}", pending_snapshot_directory, suffix);
98                         let versioned_symlink_directory = format!("{}{}", pending_symlink_directory, suffix);
99
100                         if fs::metadata(&versioned_snapshot_directory).is_ok() {
101                                 fs::remove_dir_all(&versioned_snapshot_directory).expect("Failed to remove pending snapshot directory.");
102                         }
103                         if fs::metadata(&versioned_symlink_directory).is_ok() {
104                                 fs::remove_dir_all(&versioned_symlink_directory).expect("Failed to remove pending symlink directory.");
105                         }
106                         fs::create_dir_all(&versioned_snapshot_directory).expect("Failed to create pending snapshot directory");
107                         fs::create_dir_all(&versioned_symlink_directory).expect("Failed to create pending symlink directory");
108                 }
109
110                 let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
111                 for current_scope in snapshot_scopes {
112                         let timestamp = reference_timestamp.saturating_sub(current_scope.clone());
113                         snapshot_sync_timestamps.push((current_scope.clone(), timestamp));
114                 };
115
116                 let mut snapshot_filenames_by_scope: HashMap<u64, String> = HashMap::with_capacity(10);
117
118                 for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps {
119                         let network_graph_clone = self.network_graph.clone();
120                         {
121                                 log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
122                                 // calculate the snapshot
123                                 let delta = super::calculate_delta(network_graph_clone.clone(), current_last_sync_timestamp.clone() as u32, Some(reference_timestamp), self.logger.clone()).await;
124                                 let snapshot_v1 = super::serialize_delta(&delta, 1, self.logger.clone());
125                                 let snapshot_v2 = super::serialize_delta(&delta, 2, self.logger.clone());
126
127                                 // persist the snapshot and update the symlink
128                                 let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
129                                 let snapshot_path_v1 = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
130                                 let snapshot_path_v2 = format!("{}/v2/{}", pending_snapshot_directory, snapshot_filename);
131                                 log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot_v1.message_count, snapshot_v1.channel_announcement_count, snapshot_v1.update_count, snapshot_v1.update_count_full, snapshot_v1.update_count_incremental);
132                                 fs::write(&snapshot_path_v1, snapshot_v1.data).unwrap();
133                                 fs::write(&snapshot_path_v2, snapshot_v2.data).unwrap();
134                                 snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
135                         }
136                 }
137
138                 {
139                         // create dummy symlink
140                         let dummy_filename = "empty_delta.lngossip";
141                         let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
142                         let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
143                         fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
144
145                         let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
146                         let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
147                         log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
148                         symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
149                 }
150
151                 // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
152                 let mut symlink_count = (reference_timestamp - 1640995200) / granularity_interval;
153                 if let Some(max_symlink_count) = max_symlink_count {
154                         // this is primarily useful for testing
155                         symlink_count = std::cmp::min(symlink_count, max_symlink_count);
156                 };
157
158                 for i in 0..symlink_count {
159                         // let's create non-dummy-symlinks
160
161                         // first, determine which snapshot range should be referenced
162                         let referenced_scope = if i == 0 {
163                                 // special-case 0 to always refer to a full/initial sync
164                                 u64::MAX
165                         } else {
166                                 /*
167                                 We have snapshots for 6-day- and 7-day-intervals, but the next interval is
168                                 14 days. So if somebody requests an update with a timestamp that is 10 days old,
169                                 there is no longer a snapshot for that specific interval.
170
171                                 The correct snapshot will be the next highest interval, i. e. for 14 days.
172
173                                 The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
174                                 return on the first iteration that is at least equal to the requested interval.
175
176                                 Note, however, that the last value in the array is u64::max, which means that
177                                 multiplying it with snapshot_interval will overflow. To avoid that, we use
178                                 saturating_mul.
179                                  */
180
181                                 // find min(x) in snapshot_scopes where i * granularity <= x (the current scope)
182                                 snapshot_scopes.iter().find(|current_scope| {
183                                         i * granularity_interval <= **current_scope
184                                 }).unwrap().clone()
185                         };
186                         log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);
187
188                         for (suffix, path_to_root) in suffixes {
189                                 let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
190                                 let relative_snapshot_path = format!("{}{}{}/{}", path_to_root, relative_symlink_to_snapshot_path, suffix, snapshot_filename);
191
192                                 let canonical_last_sync_timestamp = if i == 0 {
193                                         // special-case 0 to always refer to a full/initial sync
194                                         0
195                                 } else {
196                                         reference_timestamp.saturating_sub(granularity_interval.saturating_mul(i))
197                                 };
198                                 let symlink_path = format!("{}{}/{}.bin", pending_symlink_directory, suffix, canonical_last_sync_timestamp);
199
200                                 log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
201                                 symlink(&relative_snapshot_path, &symlink_path).unwrap();
202                         }
203                 }
204
205                 let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
206                 let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
207                 fs::write(&update_time_path, format!("{}", update_time)).unwrap();
208
209                 if fs::metadata(&finalized_snapshot_directory).is_ok() {
210                         fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
211                 }
212                 if fs::metadata(&finalized_symlink_directory).is_ok() {
213                         fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
214                 }
215                 fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
216                 fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
217         }
218
219         pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
220                 let round_multiple_delta = number % multiple;
221                 number - round_multiple_delta
222         }
223 }