Extract snapshot generation method.
[rapid-gossip-sync-server] / src / snapshot.rs
1 use std::collections::HashMap;
2 use std::fs;
3 use std::ops::Deref;
4 use std::os::unix::fs::symlink;
5 use std::sync::Arc;
6 use std::time::{Duration, SystemTime, UNIX_EPOCH};
7 use lightning::log_info;
8
9 use lightning::routing::gossip::NetworkGraph;
10 use lightning::util::logger::Logger;
11
12 use crate::config;
13 use crate::config::cache_path;
14
15 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
16         network_graph: Arc<NetworkGraph<L>>,
17         logger: L,
18 }
19
20 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
21         pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
22                 Self { network_graph, logger }
23         }
24
25         pub(crate) async fn snapshot_gossip(&self) {
26                 log_info!(self.logger, "Initiating snapshotting service");
27
28                 let snapshot_interval = config::snapshot_generation_interval() as u64;
29                 let mut snapshot_scopes = vec![];
30                 { // double the coefficient until it reaches the maximum (limited) snapshot scope
31                         let mut current_scope = snapshot_interval;
32                         loop {
33                                 snapshot_scopes.push(current_scope);
34                                 if current_scope >= config::MAX_SNAPSHOT_SCOPE as u64 {
35                                         snapshot_scopes.push(u64::MAX);
36                                         break;
37                                 }
38
39                                 // double the current factor
40                                 current_scope <<= 1;
41                         }
42                 }
43
44                 // this is gonna be a never-ending background job
45                 loop {
46                         self.generate_snapshots(config::SYMLINK_GRANULARITY_INTERVAL as u64, snapshot_interval, &snapshot_scopes, &cache_path(), None).await;
47
48                         // constructing the snapshots may have taken a while
49                         let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
50
51                         // NOTE: we're waiting until the next multiple of snapshot_interval
52                         // however, if the symlink granularity is lower, then during that time, no intermediate
53                         // symlinks will be generated. That should be ok, because any timestamps previously
54                         // returned would already have generated symlinks, but this does have bug potential
55                         let remainder = current_time % snapshot_interval;
56                         let time_until_next_generation = snapshot_interval - remainder;
57
58                         log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_generation);
59                         // add in an extra five seconds to assure the rounding down works correctly
60                         let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_generation + 5));
61                         sleep.await;
62                 }
63         }
64
65         pub(crate) async fn generate_snapshots(&self, granularity_interval: u64, snapshot_interval: u64, snapshot_scopes: &[u64], cache_path: &str, max_symlink_count: Option<u64>) {
66                 let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path);
67                 let pending_symlink_directory = format!("{}/symlinks_pending", cache_path);
68                 let finalized_snapshot_directory = format!("{}/snapshots", cache_path);
69                 let finalized_symlink_directory = format!("{}/symlinks", cache_path);
70                 let relative_symlink_to_snapshot_path = "../snapshots";
71
72                 // 1. get the current timestamp
73                 let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
74                 let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64);
75                 log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
76
77                 // 2. sleep until the next round interval
78                 // 3. refresh all snapshots
79
80                 // the stored snapshots should adhere to the following format
81                 // from one day ago
82                 // from two days ago
83                 // …
84                 // from a week ago
85                 // from two weeks ago
86                 // from three weeks ago
87                 // full
88                 // That means that at any given moment, there should only ever be
89                 // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
90                 // The snapshots, unlike dynamic updates, should account for all intermediate
91                 // channel updates
92
93                 // purge and recreate the pending directories
94                 if fs::metadata(&pending_snapshot_directory).is_ok() {
95                         fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
96                 }
97                 if fs::metadata(&pending_symlink_directory).is_ok() {
98                         fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
99                 }
100                 fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
101                 fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
102
103                 let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
104                 for current_scope in snapshot_scopes {
105                         let timestamp = reference_timestamp.saturating_sub(current_scope.clone());
106                         snapshot_sync_timestamps.push((current_scope.clone(), timestamp));
107                 };
108
109                 let mut snapshot_filenames_by_scope: HashMap<u64, String> = HashMap::with_capacity(10);
110
111                 for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps {
112                         let network_graph_clone = self.network_graph.clone();
113                         {
114                                 log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
115                                 // calculate the snapshot
116                                 let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
117
118                                 // persist the snapshot and update the symlink
119                                 let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
120                                 let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
121                                 log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
122                                 fs::write(&snapshot_path, snapshot.data).unwrap();
123                                 snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
124                         }
125                 }
126
127                 {
128                         // create dummy symlink
129                         let dummy_filename = "empty_delta.lngossip";
130                         let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
131                         let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
132                         fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
133
134                         let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
135                         let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
136                         log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
137                         symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
138                 }
139
140                 // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
141                 let mut symlink_count = (reference_timestamp - 1640995200) / granularity_interval;
142                 if let Some(max_symlink_count) = max_symlink_count {
143                         // this is primarily useful for testing
144                         symlink_count = std::cmp::min(symlink_count, max_symlink_count);
145                 };
146
147                 for i in 0..symlink_count {
148                         // let's create non-dummy-symlinks
149
150                         // first, determine which snapshot range should be referenced
151                         let referenced_scope = if i == 0 {
152                                 // special-case 0 to always refer to a full/initial sync
153                                 u64::MAX
154                         } else {
155                                 /*
156                                 We have snapshots for 6-day- and 7-day-intervals, but the next interval is
157                                 14 days. So if somebody requests an update with a timestamp that is 10 days old,
158                                 there is no longer a snapshot for that specific interval.
159
160                                 The correct snapshot will be the next highest interval, i. e. for 14 days.
161
162                                 The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
163                                 return on the first iteration that is at least equal to the requested interval.
164
165                                 Note, however, that the last value in the array is u64::max, which means that
166                                 multiplying it with snapshot_interval will overflow. To avoid that, we use
167                                 saturating_mul.
168                                  */
169
170                                 // find min(x) in snapshot_scopes where i * granularity <= x (the current scope)
171                                 snapshot_scopes.iter().find(|current_scope| {
172                                         i * granularity_interval <= **current_scope
173                                 }).unwrap().clone()
174                         };
175                         log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);
176
177                         let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
178                         let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
179
180                         let canonical_last_sync_timestamp = if i == 0 {
181                                 // special-case 0 to always refer to a full/initial sync
182                                 0
183                         } else {
184                                 reference_timestamp.saturating_sub(granularity_interval.saturating_mul(i))
185                         };
186                         let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
187
188                         log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
189                         symlink(&relative_snapshot_path, &symlink_path).unwrap();
190                 }
191
192                 let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
193                 let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
194                 fs::write(&update_time_path, format!("{}", update_time)).unwrap();
195
196                 if fs::metadata(&finalized_snapshot_directory).is_ok() {
197                         fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
198                 }
199                 if fs::metadata(&finalized_symlink_directory).is_ok() {
200                         fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
201                 }
202                 fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
203                 fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
204         }
205
206         pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
207                 let round_multiple_delta = number % multiple;
208                 number - round_multiple_delta
209         }
210 }