28d0768d65ede059651d16bcf6dc3421a681ce58
[rapid-gossip-sync-server] / src / snapshot.rs
1 use std::collections::HashMap;
2 use std::fs;
3 use std::ops::Deref;
4 use std::os::unix::fs::symlink;
5 use std::sync::Arc;
6 use std::time::{Duration, SystemTime, UNIX_EPOCH};
7 use lightning::log_info;
8
9 use lightning::routing::gossip::NetworkGraph;
10 use lightning::util::logger::Logger;
11
12 use crate::config;
13 use crate::config::cache_path;
14
15 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
16         network_graph: Arc<NetworkGraph<L>>,
17         logger: L
18 }
19
20 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
21         pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
22                 Self { network_graph, logger }
23         }
24
25         pub(crate) async fn snapshot_gossip(&self) {
26                 log_info!(self.logger, "Initiating snapshotting service");
27
28                 let calc_interval = config::calculate_interval();
29                 let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
30                 const DAY_SECONDS: u64 = 60 * 60 * 24;
31                 let round_day_seconds = calc_interval as u64;
32
33                 let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
34                 let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
35                 let finalized_snapshot_directory = format!("{}/snapshots", cache_path());
36                 let finalized_symlink_directory = format!("{}/symlinks", cache_path());
37                 let relative_symlink_to_snapshot_path = "../snapshots";
38
39                 // this is gonna be a never-ending background job
40                 loop {
41                         // 1. get the current timestamp
42                         let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
43                         let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64);
44                         log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
45
46                         // 2. sleep until the next round interval
47                         // 3. refresh all snapshots
48
49                         // the stored snapshots should adhere to the following format
50                         // from one day ago
51                         // from two days ago
52                         // …
53                         // from a week ago
54                         // from two weeks ago
55                         // from three weeks ago
56                         // full
57                         // That means that at any given moment, there should only ever be
58                         // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
59                         // The snapshots, unlike dynamic updates, should account for all intermediate
60                         // channel updates
61
62                         // purge and recreate the pending directories
63                         if fs::metadata(&pending_snapshot_directory).is_ok(){
64                                 fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
65                         }
66                         if fs::metadata(&pending_symlink_directory).is_ok(){
67                                 fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
68                         }
69                         fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
70                         fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
71
72                         let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
73                         for factor in &snapshot_sync_day_factors {
74                                 // basically timestamp - day_seconds * factor
75                                 let timestamp = reference_timestamp.saturating_sub(DAY_SECONDS.saturating_mul(factor.clone()));
76                                 snapshot_sync_timestamps.push((factor.clone(), timestamp));
77                         };
78
79                         let mut snapshot_filenames_by_day_range: HashMap<u64, String> = HashMap::with_capacity(10);
80
81                         for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
82                                 let network_graph_clone = self.network_graph.clone();
83                                 {
84                                         log_info!(self.logger, "Calculating {}-day snapshot", day_range);
85                                         // calculate the snapshot
86                                         let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
87
88                                         // persist the snapshot and update the symlink
89                                         let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);
90                                         let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
91                                         log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
92                                         fs::write(&snapshot_path, snapshot.data).unwrap();
93                                         snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
94                                 }
95                         }
96
97                         {
98                                 // create dummy symlink
99                                 let dummy_filename = "empty_delta.lngossip";
100                                 let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
101                                 let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
102                                 fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
103
104                                 let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
105                                 let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
106                                 log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
107                                 symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
108                         }
109
110                         // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
111                         let symlink_count = (reference_timestamp - 1640995200) / config::SNAPSHOT_CALCULATION_INTERVAL as u64;
112                         for i in 0..symlink_count {
113                                 // let's create non-dummy-symlinks
114
115                                 // first, determine which snapshot range should be referenced
116                                 let referenced_day_range = if i == 0 {
117                                         // special-case 0 to always refer to a full/initial sync
118                                         u64::MAX
119                                 } else {
120                                         /*
121                                         We have snapshots for 6-day- and 7-day-intervals, but the next interval is
122                                         14 days. So if somebody requests an update with a timestamp that is 10 days old,
123                                         there is no longer a snapshot for that specific interval.
124
125                                         The correct snapshot will be the next highest interval, i. e. for 14 days.
126
127                                         The `snapshot_sync_day_factors` array is sorted ascendingly, so find() will
128                                         return on the first iteration that is at least equal to the requested interval.
129
130                                         Note, however, that the last value in the array is u64::max, which means that
131                                         multiplying it with DAY_SECONDS will overflow. To avoid that, we use
132                                         saturating_mul.
133                                          */
134
135                                         // find min(x) in snapshot_sync_day_factors where x >= i
136                                         snapshot_sync_day_factors.iter().find(|x| {
137                                                 DAY_SECONDS.saturating_mul(**x) >= i * config::SNAPSHOT_CALCULATION_INTERVAL as u64
138                                         }).unwrap().clone()
139                                 };
140                                 log_info!(self.logger, "i: {}, referenced day range: {}", i, referenced_day_range);
141
142                                 let snapshot_filename = snapshot_filenames_by_day_range.get(&referenced_day_range).unwrap();
143                                 let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
144
145                                 let canonical_last_sync_timestamp = if i == 0 {
146                                         // special-case 0 to always refer to a full/initial sync
147                                         0
148                                 } else {
149                                         reference_timestamp.saturating_sub((config::SNAPSHOT_CALCULATION_INTERVAL as u64).saturating_mul(i))
150                                 };
151                                 let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
152
153                                 log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
154                                 symlink(&relative_snapshot_path, &symlink_path).unwrap();
155                         }
156
157                         let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
158                         let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
159                         fs::write(&update_time_path, format!("{}", update_time)).unwrap();
160
161                         if fs::metadata(&finalized_snapshot_directory).is_ok(){
162                                 fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
163                         }
164                         if fs::metadata(&finalized_symlink_directory).is_ok(){
165                                 fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
166                         }
167                         fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
168                         fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
169
170                         // constructing the snapshots may have taken a while
171                         let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
172                         let remainder = current_time % config::SNAPSHOT_CALCULATION_INTERVAL as u64;
173                         let time_until_next_day = config::SNAPSHOT_CALCULATION_INTERVAL as u64 - remainder;
174
175                         log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day);
176                         // add in an extra five seconds to assure the rounding down works correctly
177                         let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
178                         sleep.await;
179                 }
180         }
181
182         pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
183                 let round_multiple_delta = number % multiple;
184                 number - round_multiple_delta
185         }
186 }