Fix multiplication overflow bug.
[rapid-gossip-sync-server] / src / snapshot.rs
1 use std::collections::HashMap;
2 use std::fs;
3 use std::ops::Deref;
4 use std::os::unix::fs::symlink;
5 use std::sync::Arc;
6 use std::time::{Duration, SystemTime, UNIX_EPOCH};
7 use lightning::log_info;
8
9 use lightning::routing::gossip::NetworkGraph;
10 use lightning::util::logger::Logger;
11
12 use crate::config;
13 use crate::config::cache_path;
14
15 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
16         network_graph: Arc<NetworkGraph<L>>,
17         logger: L
18 }
19
20 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
21         pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
22                 Self { network_graph, logger }
23         }
24
25         pub(crate) async fn snapshot_gossip(&self) {
26                 log_info!(self.logger, "Initiating snapshotting service");
27
28                 let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
29                 const DAY_SECONDS: u64 = 60 * 60 * 24;
30
31                 let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
32                 let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
33                 let finalized_snapshot_directory = format!("{}/snapshots", cache_path());
34                 let finalized_symlink_directory = format!("{}/symlinks", cache_path());
35                 let relative_symlink_to_snapshot_path = "../snapshots";
36
37                 // this is gonna be a never-ending background job
38                 loop {
39                         // 1. get the current timestamp
40                         let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
41                         let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64);
42                         log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
43
44                         // 2. sleep until the next round 24 hours
45                         // 3. refresh all snapshots
46
47                         // the stored snapshots should adhere to the following format
48                         // from one day ago
49                         // from two days ago
50                         // …
51                         // from a week ago
52                         // from two weeks ago
53                         // from three weeks ago
54                         // full
55                         // That means that at any given moment, there should only ever be
56                         // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
57                         // The snapshots, unlike dynamic updates, should account for all intermediate
58                         // channel updates
59
60                         // purge and recreate the pending directories
61                         if fs::metadata(&pending_snapshot_directory).is_ok(){
62                                 fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
63                         }
64                         if fs::metadata(&pending_symlink_directory).is_ok(){
65                                 fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
66                         }
67                         fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
68                         fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
69
70                         let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
71                         for factor in &snapshot_sync_day_factors {
72                                 // basically timestamp - day_seconds * factor
73                                 let timestamp = reference_timestamp.saturating_sub(DAY_SECONDS.saturating_mul(factor.clone()));
74                                 snapshot_sync_timestamps.push((factor.clone(), timestamp));
75                         };
76
77                         let mut snapshot_filenames_by_day_range: HashMap<u64, String> = HashMap::with_capacity(10);
78
79                         for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
80                                 let network_graph_clone = self.network_graph.clone();
81                                 {
82                                         log_info!(self.logger, "Calculating {}-day snapshot", day_range);
83                                         // calculate the snapshot
84                                         let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
85
86                                         // persist the snapshot and update the symlink
87                                         let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);
88                                         let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
89                                         log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
90                                         fs::write(&snapshot_path, snapshot.data).unwrap();
91                                         snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
92                                 }
93                         }
94
95                         {
96                                 // create dummy symlink
97                                 let dummy_filename = "empty_delta.lngossip";
98                                 let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
99                                 let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
100                                 fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
101
102                                 let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
103                                 let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
104                                 log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
105                                 symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
106                         }
107
108                         // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
109                         let symlink_count = (reference_timestamp - 1640995200) / config::SNAPSHOT_CALCULATION_INTERVAL as u64;
110                         for i in 0..symlink_count {
111                                 // let's create non-dummy-symlinks
112
113                                 // first, determine which snapshot range should be referenced
114                                 let referenced_day_range = if i == 0 {
115                                         // special-case 0 to always refer to a full/initial sync
116                                         u64::MAX
117                                 } else {
118                                         /*
119                                         We have snapshots for 6-day- and 7-day-intervals, but the next interval is
120                                         14 days. So if somebody requests an update with a timestamp that is 10 days old,
121                                         there is no longer a snapshot for that specific interval.
122
123                                         The correct snapshot will be the next highest interval, i. e. for 14 days.
124
125                                         The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
126                                         return on the first iteration that is at least equal to the requested interval.
127
128                                         Note, however, that the last value in the array is u64::max, which means that
129                                         multiplying it with DAY_SECONDS will overflow. To avoid that, we use
130                                         saturating_mul.
131                                          */
132
133                                         // find min(x) in snapshot_sync_day_factors where x >= i
134                                         snapshot_sync_day_factors.iter().find(|x| {
135                                                 DAY_SECONDS.saturating_mul(**x) >= i * config::SNAPSHOT_CALCULATION_INTERVAL as u64
136                                         }).unwrap().clone()
137                                 };
138                                 log_info!(self.logger, "i: {}, referenced day range: {}", i, referenced_day_range);
139
140                                 let snapshot_filename = snapshot_filenames_by_day_range.get(&referenced_day_range).unwrap();
141                                 let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
142
143                                 let canonical_last_sync_timestamp = if i == 0 {
144                                         // special-case 0 to always refer to a full/initial sync
145                                         0
146                                 } else {
147                                         reference_timestamp.saturating_sub((config::SNAPSHOT_CALCULATION_INTERVAL as u64).saturating_mul(i))
148                                 };
149                                 let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
150
151                                 log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
152                                 symlink(&relative_snapshot_path, &symlink_path).unwrap();
153                         }
154
155                         let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
156                         let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
157                         fs::write(&update_time_path, format!("{}", update_time)).unwrap();
158
159                         if fs::metadata(&finalized_snapshot_directory).is_ok(){
160                                 fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
161                         }
162                         if fs::metadata(&finalized_symlink_directory).is_ok(){
163                                 fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
164                         }
165                         fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
166                         fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
167
168                         // constructing the snapshots may have taken a while
169                         let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
170                         let remainder = current_time % config::SNAPSHOT_CALCULATION_INTERVAL as u64;
171                         let time_until_next_day = config::SNAPSHOT_CALCULATION_INTERVAL as u64 - remainder;
172
173                         log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day);
174                         // add in an extra five seconds to assure the rounding down works correctly
175                         let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
176                         sleep.await;
177                 }
178         }
179
180         pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
181                 let round_multiple_delta = number % multiple;
182                 number - round_multiple_delta
183         }
184 }