218285652868b56dac11dacedf3cf3c843d10510
[rapid-gossip-sync-server] / src / snapshot.rs
1 use std::collections::HashMap;
2 use std::fs;
3 use std::ops::Deref;
4 use std::os::unix::fs::symlink;
5 use std::sync::Arc;
6 use std::time::{Duration, SystemTime, UNIX_EPOCH};
7 use lightning::log_info;
8
9 use lightning::routing::gossip::NetworkGraph;
10 use lightning::util::logger::Logger;
11
12 use crate::config;
13 use crate::config::cache_path;
14
15 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
16         network_graph: Arc<NetworkGraph<L>>,
17         logger: L,
18 }
19
20 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
21         pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
22                 Self { network_graph, logger }
23         }
24
25         pub(crate) async fn snapshot_gossip(&self) {
26                 log_info!(self.logger, "Initiating snapshotting service");
27
28                 let snapshot_interval = config::snapshot_generation_interval() as u64;
29                 let mut snapshot_scopes = vec![];
30                 { // double the coefficient until it reaches the maximum (limited) snapshot scope
31                         let mut current_scope = snapshot_interval;
32                         loop {
33                                 snapshot_scopes.push(current_scope);
34                                 if current_scope >= config::MAX_SNAPSHOT_SCOPE as u64 {
35                                         snapshot_scopes.push(u64::MAX);
36                                         break;
37                                 }
38
39                                 // double the current factor
40                                 current_scope <<= 1;
41                         }
42                 }
43
44                 let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
45                 let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
46                 let finalized_snapshot_directory = format!("{}/snapshots", cache_path());
47                 let finalized_symlink_directory = format!("{}/symlinks", cache_path());
48                 let relative_symlink_to_snapshot_path = "../snapshots";
49
50                 // this is gonna be a never-ending background job
51                 loop {
52                         // 1. get the current timestamp
53                         let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
54                         let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64);
55                         log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
56
57                         // 2. sleep until the next round interval
58                         // 3. refresh all snapshots
59
60                         // the stored snapshots should adhere to the following format
61                         // from one day ago
62                         // from two days ago
63                         // …
64                         // from a week ago
65                         // from two weeks ago
66                         // from three weeks ago
67                         // full
68                         // That means that at any given moment, there should only ever be
69                         // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
70                         // The snapshots, unlike dynamic updates, should account for all intermediate
71                         // channel updates
72
73                         // purge and recreate the pending directories
74                         if fs::metadata(&pending_snapshot_directory).is_ok() {
75                                 fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
76                         }
77                         if fs::metadata(&pending_symlink_directory).is_ok() {
78                                 fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
79                         }
80                         fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
81                         fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
82
83                         let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
84                         for current_scope in &snapshot_scopes {
85                                 let timestamp = reference_timestamp.saturating_sub(current_scope.clone());
86                                 snapshot_sync_timestamps.push((current_scope.clone(), timestamp));
87                         };
88
89                         let mut snapshot_filenames_by_scope: HashMap<u64, String> = HashMap::with_capacity(10);
90
91                         for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps {
92                                 let network_graph_clone = self.network_graph.clone();
93                                 {
94                                         log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
95                                         // calculate the snapshot
96                                         let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
97
98                                         // persist the snapshot and update the symlink
99                                         let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
100                                         let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
101                                         log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
102                                         fs::write(&snapshot_path, snapshot.data).unwrap();
103                                         snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
104                                 }
105                         }
106
107                         {
108                                 // create dummy symlink
109                                 let dummy_filename = "empty_delta.lngossip";
110                                 let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
111                                 let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
112                                 fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
113
114                                 let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
115                                 let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
116                                 log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
117                                 symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
118                         }
119
120                         // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
121                         let symlink_count = (reference_timestamp - 1640995200) / config::SYMLINK_GRANULARITY_INTERVAL as u64;
122                         for i in 0..symlink_count {
123                                 // let's create non-dummy-symlinks
124
125                                 // first, determine which snapshot range should be referenced
126                                 let referenced_scope = if i == 0 {
127                                         // special-case 0 to always refer to a full/initial sync
128                                         u64::MAX
129                                 } else {
130                                         /*
131                                         We have snapshots for 6-day- and 7-day-intervals, but the next interval is
132                                         14 days. So if somebody requests an update with a timestamp that is 10 days old,
133                                         there is no longer a snapshot for that specific interval.
134
135                                         The correct snapshot will be the next highest interval, i. e. for 14 days.
136
137                                         The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
138                                         return on the first iteration that is at least equal to the requested interval.
139
140                                         Note, however, that the last value in the array is u64::max, which means that
141                                         multiplying it with snapshot_interval will overflow. To avoid that, we use
142                                         saturating_mul.
143                                          */
144
145                                         // find min(x) in snapshot_scopes where i * granularity <= x (the current scope)
146                                         snapshot_scopes.iter().find(|current_scope| {
147                                                 i * config::SYMLINK_GRANULARITY_INTERVAL as u64 <= **current_scope
148                                         }).unwrap().clone()
149                                 };
150                                 log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);
151
152                                 let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
153                                 let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
154
155                                 let canonical_last_sync_timestamp = if i == 0 {
156                                         // special-case 0 to always refer to a full/initial sync
157                                         0
158                                 } else {
159                                         reference_timestamp.saturating_sub((config::SYMLINK_GRANULARITY_INTERVAL as u64).saturating_mul(i))
160                                 };
161                                 let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
162
163                                 log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
164                                 symlink(&relative_snapshot_path, &symlink_path).unwrap();
165                         }
166
167                         let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
168                         let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
169                         fs::write(&update_time_path, format!("{}", update_time)).unwrap();
170
171                         if fs::metadata(&finalized_snapshot_directory).is_ok() {
172                                 fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
173                         }
174                         if fs::metadata(&finalized_symlink_directory).is_ok() {
175                                 fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
176                         }
177                         fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
178                         fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
179
180                         // constructing the snapshots may have taken a while
181                         let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
182
183                         // NOTE: we're waiting until the next multiple of snapshot_interval
184                         // however, if the symlink granularity is lower, then during that time, no intermediate
185                         // symlinks will be generated. That should be ok, because any timestamps previously
186                         // returned would already have generated symlinks, but this does have bug potential
187                         let remainder = current_time % snapshot_interval;
188                         let time_until_next_generation = snapshot_interval - remainder;
189
190                         log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_generation);
191                         // add in an extra five seconds to assure the rounding down works correctly
192                         let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_generation + 5));
193                         sleep.await;
194                 }
195         }
196
197         pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
198                 let round_multiple_delta = number % multiple;
199                 number - round_multiple_delta
200         }
201 }