4a86f9854eb8caa600c30b12a0fddede72311f52
[rapid-gossip-sync-server] / src / snapshot.rs
1 use std::collections::HashMap;
2 use std::fs;
3 use std::ops::Deref;
4 use std::os::unix::fs::symlink;
5 use std::sync::Arc;
6 use std::time::{Duration, SystemTime, UNIX_EPOCH};
7 use lightning::log_info;
8
9 use lightning::routing::gossip::NetworkGraph;
10 use lightning::util::logger::Logger;
11
12 use crate::config;
13 use crate::config::cache_path;
14
15 pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
16         network_graph: Arc<NetworkGraph<L>>,
17         logger: L
18 }
19
20 impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
21         pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> Self {
22                 Self { network_graph, logger }
23         }
24
25         pub(crate) async fn snapshot_gossip(&self) {
26                 log_info!(self.logger, "Initiating snapshotting service");
27
28                 let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
29                 let round_day_seconds = config::SNAPSHOT_CALCULATION_INTERVAL as u64;
30
31                 let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
32                 let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
33                 let finalized_snapshot_directory = format!("{}/snapshots", cache_path());
34                 let finalized_symlink_directory = format!("{}/symlinks", cache_path());
35                 let relative_symlink_to_snapshot_path = "../snapshots";
36
37                 // this is gonna be a never-ending background job
38                 loop {
39                         // 1. get the current timestamp
40                         let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
41                         let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, round_day_seconds);
42                         log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
43
44                         // 2. sleep until the next round 24 hours
45                         // 3. refresh all snapshots
46
47                         // the stored snapshots should adhere to the following format
48                         // from one day ago
49                         // from two days ago
50                         // …
51                         // from a week ago
52                         // from two weeks ago
53                         // from three weeks ago
54                         // full
55                         // That means that at any given moment, there should only ever be
56                         // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
57                         // The snapshots, unlike dynamic updates, should account for all intermediate
58                         // channel updates
59
60                         // purge and recreate the pending directories
61                         if fs::metadata(&pending_snapshot_directory).is_ok(){
62                                 fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
63                         }
64                         if fs::metadata(&pending_symlink_directory).is_ok(){
65                                 fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
66                         }
67                         fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
68                         fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
69
70                         let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
71                         for factor in &snapshot_sync_day_factors {
72                                 // basically timestamp - day_seconds * factor
73                                 let timestamp = reference_timestamp.saturating_sub(round_day_seconds.saturating_mul(factor.clone()));
74                                 snapshot_sync_timestamps.push((factor.clone(), timestamp));
75                         };
76
77                         let mut snapshot_filenames_by_day_range: HashMap<u64, String> = HashMap::with_capacity(10);
78
79                         for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
80                                 let network_graph_clone = self.network_graph.clone();
81                                 {
82                                         log_info!(self.logger, "Calculating {}-day snapshot", day_range);
83                                         // calculate the snapshot
84                                         let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
85
86                                         // persist the snapshot and update the symlink
87                                         let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);
88                                         let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
89                                         log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
90                                         fs::write(&snapshot_path, snapshot.data).unwrap();
91                                         snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
92                                 }
93                         }
94
95                         {
96                                 // create dummy symlink
97                                 let dummy_filename = "empty_delta.lngossip";
98                                 let dummy_snapshot = super::serialize_empty_blob(reference_timestamp);
99                                 let dummy_snapshot_path = format!("{}/{}", pending_snapshot_directory, dummy_filename);
100                                 fs::write(&dummy_snapshot_path, dummy_snapshot).unwrap();
101
102                                 let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp);
103                                 let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename);
104                                 log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path);
105                                 symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap();
106                         }
107
108                         // Number of intervals since Jan 1, 2022, a few months before RGS server was released.
109                         let symlink_count = (reference_timestamp - 1640995200) / config::SNAPSHOT_CALCULATION_INTERVAL as u64;
110                         for i in 0..symlink_count {
111                                 // let's create non-dummy-symlinks
112
113                                 // first, determine which snapshot range should be referenced
114                                 let referenced_day_range = if i == 0 {
115                                         // special-case 0 to always refer to a full/initial sync
116                                         u64::MAX
117                                 } else {
118                                         // find min(x) in snapshot_sync_day_factors where x >= i
119                                         snapshot_sync_day_factors.iter().find(|x| {
120                                                 x >= &&i
121                                         }).unwrap().clone()
122                                 };
123
124                                 let snapshot_filename = snapshot_filenames_by_day_range.get(&referenced_day_range).unwrap();
125                                 let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
126
127                                 let canonical_last_sync_timestamp = if i == 0 {
128                                         // special-case 0 to always refer to a full/initial sync
129                                         0
130                                 } else {
131                                         reference_timestamp.saturating_sub(round_day_seconds.saturating_mul(i))
132                                 };
133                                 let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
134
135                                 log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
136                                 symlink(&relative_snapshot_path, &symlink_path).unwrap();
137                         }
138
139                         let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
140                         let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
141                         fs::write(&update_time_path, format!("{}", update_time)).unwrap();
142
143                         if fs::metadata(&finalized_snapshot_directory).is_ok(){
144                                 fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
145                         }
146                         if fs::metadata(&finalized_symlink_directory).is_ok(){
147                                 fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
148                         }
149                         fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
150                         fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
151
152                         // constructing the snapshots may have taken a while
153                         let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
154                         let remainder = current_time % round_day_seconds;
155                         let time_until_next_day = round_day_seconds - remainder;
156
157                         log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day);
158                         // add in an extra five seconds to assure the rounding down works correctly
159                         let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
160                         sleep.await;
161                 }
162         }
163
164         pub(super) fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
165                 let round_multiple_delta = number % multiple;
166                 number - round_multiple_delta
167         }
168 }