Allow passing a timestamp override to snapshot calculation.
[rapid-gossip-sync-server] / src / lookup.rs
1 use std::collections::{BTreeMap, HashSet};
2 use std::io::Cursor;
3 use std::ops::Deref;
4 use std::sync::Arc;
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::NetworkGraph;
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
11
12 use futures::StreamExt;
13 use lightning::{log_debug, log_gossip, log_info};
14 use lightning::util::logger::Logger;
15
16 use crate::config;
17 use crate::serialization::MutatedProperties;
18
19 /// The delta set needs to be a BTreeMap so the keys are sorted.
20 /// That way, the scids in the response automatically grow monotonically
21 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
22
23 pub(super) struct AnnouncementDelta {
24         pub(super) seen: u32,
25         pub(super) announcement: UnsignedChannelAnnouncement,
26 }
27
28 pub(super) struct UpdateDelta {
29         pub(super) seen: u32,
30         pub(super) update: UnsignedChannelUpdate,
31 }
32
33 pub(super) struct DirectedUpdateDelta {
34         /// the last update we saw prior to the user-provided timestamp
35         pub(super) last_update_before_seen: Option<UpdateDelta>,
36         /// the latest update we saw overall
37         pub(super) latest_update_after_seen: Option<UpdateDelta>,
38         /// the set of all mutated properties across all updates between the last seen by the user and
39         /// the latest one known to us
40         pub(super) mutated_properties: MutatedProperties,
41         /// Specifically for reminder updates, the flag-only value to send to the client
42         pub(super) serialization_update_flags: Option<u8>
43 }
44
45 pub(super) struct ChannelDelta {
46         pub(super) announcement: Option<AnnouncementDelta>,
47         pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
48         pub(super) first_bidirectional_updates_seen: Option<u32>,
49         /// The seen timestamp of the older of the two latest directional updates
50         pub(super) requires_reminder: bool,
51 }
52
53 impl Default for ChannelDelta {
54         fn default() -> Self {
55                 Self {
56                         announcement: None,
57                         updates: (None, None),
58                         first_bidirectional_updates_seen: None,
59                         requires_reminder: false,
60                 }
61         }
62 }
63
64 impl Default for DirectedUpdateDelta {
65         fn default() -> Self {
66                 Self {
67                         last_update_before_seen: None,
68                         mutated_properties: MutatedProperties::default(),
69                         latest_update_after_seen: None,
70                         serialization_update_flags: None,
71                 }
72         }
73 }
74
75 /// Fetch all the channel announcements that are presently in the network graph, regardless of
76 /// whether they had been seen before.
77 /// Also include all announcements for which the first update was announced
78 /// after `last_sync_timestamp`
79 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_calculation_time: Option<SystemTime>, logger: L) where L::Target: Logger {
80         log_info!(logger, "Obtaining channel ids from network graph");
81         let channel_ids = {
82                 let read_only_graph = network_graph.read_only();
83                 log_info!(logger, "Retrieved read-only network graph copy");
84                 let channel_iterator = read_only_graph.channels().unordered_iter();
85                 channel_iterator
86                         .filter(|c| c.1.announcement_message.is_some())
87                         .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
88                         .collect::<Vec<_>>()
89         };
90         #[cfg(test)]
91         log_info!(logger, "Channel IDs: {:?}", channel_ids);
92         log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
93         let last_sync_timestamp_float = last_sync_timestamp as f64;
94
95         let current_time = snapshot_calculation_time.unwrap_or(SystemTime::now());
96         let current_timestamp = current_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
97         log_info!(logger, "Current timestamp: {}", current_timestamp);
98
99         let include_reminders = {
100                 let current_hour = current_timestamp / 3600;
101                 let current_day = current_timestamp / (24 * 3600);
102
103                 log_debug!(logger, "Current day index: {}", current_day);
104                 log_debug!(logger, "Current hour: {}", current_hour);
105
106                 // anytime between 11pm and 1am
107                 let is_reminder_hour = current_hour < 2 || current_hour > 22;
108                 let is_reminder_day = (current_day % 5) == 0;
109
110                 let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
111                 let is_reminder_scope = snapshot_scope > (40 * 3600);
112                 log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);
113
114                 (is_reminder_hour && is_reminder_day) || is_reminder_scope
115         };
116
117         log_info!(logger, "Obtaining corresponding database entries");
118         // get all the channel announcements that are currently in the network graph
119         let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
120         let mut pinned_rows = Box::pin(announcement_rows);
121
122         let mut announcement_count = 0;
123         while let Some(row_res) = pinned_rows.next().await {
124                 let current_announcement_row = row_res.unwrap();
125                 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
126                 let mut readable = Cursor::new(blob);
127                 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
128
129                 let scid = unsigned_announcement.short_channel_id;
130                 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
131
132                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
133                 (*current_channel_delta).announcement = Some(AnnouncementDelta {
134                         announcement: unsigned_announcement,
135                         seen: current_seen_timestamp,
136                 });
137
138                 announcement_count += 1;
139         }
140         log_info!(logger, "Fetched {} announcement rows", announcement_count);
141
142         {
143                 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
144
145                 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
146                 // Steps:
147                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
148                 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
149                 // This will allow us to mark the first time updates in both directions were seen
150
151                 // here is where the channels whose first update in either direction occurred after
152                 // `last_seen_timestamp` are added to the selection
153                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
154                         [&channel_ids, &last_sync_timestamp_float];
155                 let newer_oldest_directional_updates = client.query_raw("
156                         SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
157                                 SELECT DISTINCT ON (short_channel_id) *
158                                 FROM (
159                                         SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
160                                         FROM channel_updates
161                                         WHERE short_channel_id = any($1)
162                                         ORDER BY short_channel_id ASC, direction ASC, seen ASC
163                                 ) AS directional_last_seens
164                                 ORDER BY short_channel_id ASC, seen DESC
165                         ) AS distinct_chans
166                         WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
167                         ", params).await.unwrap();
168                 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
169
170                 let mut newer_oldest_directional_update_count = 0;
171                 while let Some(row_res) = pinned_updates.next().await {
172                         let current_row = row_res.unwrap();
173
174                         let scid: i64 = current_row.get("short_channel_id");
175                         let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
176
177                         // the newer of the two oldest seen directional updates came after last sync timestamp
178                         let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
179                         // first time a channel was seen in both directions
180                         (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
181
182                         newer_oldest_directional_update_count += 1;
183                 }
184                 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
185         }
186
187         if include_reminders {
188                 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
189
190                 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
191                 // Steps:
192                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
193                 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
194                 let reminder_threshold_timestamp = current_time.checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
195
196                 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
197                 let reminder_lookup_threshold_timestamp = current_time.checked_sub(config::CHANNEL_REMINDER_AGE * 3).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
198                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
199
200                 /*
201                 What exactly is the below query doing?
202
203                 First, the inner query groups all channel updates by their scid/direction combination,
204                 and then sorts those in reverse chronological order by the "seen" column.
205
206                 Then, each row is annotated based on whether its subsequent row for the same scid/direction
207                 combination has a different value for any one of these six fields:
208                 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
209                 Those are simply the properties we use to keep track of channel mutations.
210
211                 The outer query takes all of those results and selects the first value that has a distinct
212                 successor for each scid/direction combination. That yields the first instance at which
213                 a given channel configuration was received after any prior mutations.
214
215                 Knowing that, we can check whether or not there have been any mutations within the
216                 reminder requirement window. Because we only care about that window (and potentially the
217                 2-week-window), we pre-filter the scanned updates by only those that were received within
218                 3x the timeframe that we consider necessitates reminders.
219                 */
220
221                 let mutated_updates = client.query_raw("
222                 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
223                         SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
224                                 disable<>lead(disable) OVER w1
225                                         OR
226                                 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
227                                         OR
228                                 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
229                                         OR
230                                 fee_base_msat<>lead(fee_base_msat) OVER w1
231                                         OR
232                                 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
233                                         OR
234                                 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
235                                 TRUE
236                         ) has_distinct_successor
237                         FROM channel_updates
238                         WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
239                         WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
240                 ) _
241                 WHERE has_distinct_successor
242                 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
243                 ", params).await.unwrap();
244
245                 let mut pinned_updates = Box::pin(mutated_updates);
246                 let mut older_latest_directional_update_count = 0;
247                 while let Some(row_res) = pinned_updates.next().await {
248                         let current_row = row_res.unwrap();
249                         let seen = current_row.get::<_, i64>("seen") as u32;
250
251                         if seen < reminder_threshold_timestamp as u32 {
252                                 let blob: Vec<u8> = current_row.get("blob_signed");
253                                 let mut readable = Cursor::new(blob);
254                                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
255
256                                 let scid = unsigned_channel_update.short_channel_id;
257                                 let direction: bool = current_row.get("direction");
258
259                                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
260
261                                 // We might be able to get away with not using this
262                                 (*current_channel_delta).requires_reminder = true;
263                                 older_latest_directional_update_count += 1;
264
265                                 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
266                                         if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
267                                                 // we don't send reminders if we don't have bidirectional update data
268                                                 continue;
269                                         }
270
271                                         if let Some(info) = current_channel_info.one_to_two.as_ref() {
272                                                 let flags: u8 = if info.enabled { 0 } else { 2 };
273                                                 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
274                                                 current_update.serialization_update_flags = Some(flags);
275                                         }
276
277                                         if let Some(info) = current_channel_info.two_to_one.as_ref() {
278                                                 let flags: u8 = if info.enabled { 1 } else { 3 };
279                                                 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
280                                                 current_update.serialization_update_flags = Some(flags);
281                                         }
282                                 } else {
283                                         // we don't send reminders if we don't have the channel
284                                         continue;
285                                 }
286
287                                 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
288                         }
289                 }
290                 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
291         }
292 }
293
294 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
295         let start = Instant::now();
296         let last_sync_timestamp_float = last_sync_timestamp as f64;
297
298         // get the latest channel update in each direction prior to last_sync_timestamp, provided
299         // there was an update in either direction that happened after the last sync (to avoid
300         // collecting too many reference updates)
301         let reference_rows = client.query_raw("
302                 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
303                 WHERE id IN (
304                         SELECT DISTINCT ON (short_channel_id, direction) id
305                         FROM channel_updates
306                         WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
307                                 SELECT DISTINCT ON (short_channel_id) short_channel_id
308                                 FROM channel_updates
309                                 WHERE seen >= TO_TIMESTAMP($1)
310                         )
311                         ORDER BY short_channel_id ASC, direction ASC, seen DESC
312                 )
313                 ", [last_sync_timestamp_float]).await.unwrap();
314         let mut pinned_rows = Box::pin(reference_rows);
315
316         log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
317
318         let mut last_seen_update_ids: Vec<i32> = Vec::new();
319         let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
320         let mut reference_row_count = 0;
321
322         while let Some(row_res) = pinned_rows.next().await {
323                 let current_reference = row_res.unwrap();
324                 let update_id: i32 = current_reference.get("id");
325                 last_seen_update_ids.push(update_id);
326                 non_intermediate_ids.insert(update_id);
327
328                 let direction: bool = current_reference.get("direction");
329                 let seen = current_reference.get::<_, i64>("seen") as u32;
330                 let blob: Vec<u8> = current_reference.get("blob_signed");
331                 let mut readable = Cursor::new(blob);
332                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
333                 let scid = unsigned_channel_update.short_channel_id;
334
335                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
336                 let update_delta = if !direction {
337                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
338                 } else {
339                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
340                 };
341                 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
342                 update_delta.last_update_before_seen = Some(UpdateDelta {
343                         seen,
344                         update: unsigned_channel_update,
345                 });
346
347                 reference_row_count += 1;
348         }
349
350         log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
351                 reference_row_count, delta_set.len(), start.elapsed());
352
353         // get all the intermediate channel updates
354         // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
355         // have been omitted)
356
357         let intermediate_updates = client.query_raw("
358                 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
359                 FROM channel_updates
360                 WHERE seen >= TO_TIMESTAMP($1)
361                 ORDER BY short_channel_id ASC, timestamp DESC
362                 ", [last_sync_timestamp_float]).await.unwrap();
363         let mut pinned_updates = Box::pin(intermediate_updates);
364         log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
365
366         let mut previous_scid = u64::MAX;
367         let mut previously_seen_directions = (false, false);
368
369         // let mut previously_seen_directions = (false, false);
370         let mut intermediate_update_count = 0;
371         while let Some(row_res) = pinned_updates.next().await {
372                 let intermediate_update = row_res.unwrap();
373                 let update_id: i32 = intermediate_update.get("id");
374                 if non_intermediate_ids.contains(&update_id) {
375                         continue;
376                 }
377                 intermediate_update_count += 1;
378
379                 let direction: bool = intermediate_update.get("direction");
380                 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
381                 let blob: Vec<u8> = intermediate_update.get("blob_signed");
382                 let mut readable = Cursor::new(blob);
383                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
384
385                 let scid = unsigned_channel_update.short_channel_id;
386                 if scid != previous_scid {
387                         previous_scid = scid;
388                         previously_seen_directions = (false, false);
389                 }
390
391                 // get the write configuration for this particular channel's directional details
392                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
393                 let update_delta = if !direction {
394                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
395                 } else {
396                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
397                 };
398
399                 {
400                         // handle the latest deltas
401                         if !direction && !previously_seen_directions.0 {
402                                 previously_seen_directions.0 = true;
403                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
404                                         seen: current_seen_timestamp,
405                                         update: unsigned_channel_update.clone(),
406                                 });
407                         } else if direction && !previously_seen_directions.1 {
408                                 previously_seen_directions.1 = true;
409                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
410                                         seen: current_seen_timestamp,
411                                         update: unsigned_channel_update.clone(),
412                                 });
413                         }
414                 }
415
416                 // determine mutations
417                 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
418                         if unsigned_channel_update.flags != last_seen_update.update.flags {
419                                 update_delta.mutated_properties.flags = true;
420                         }
421                         if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
422                                 update_delta.mutated_properties.cltv_expiry_delta = true;
423                         }
424                         if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
425                                 update_delta.mutated_properties.htlc_minimum_msat = true;
426                         }
427                         if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
428                                 update_delta.mutated_properties.fee_base_msat = true;
429                         }
430                         if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
431                                 update_delta.mutated_properties.fee_proportional_millionths = true;
432                         }
433                         if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
434                                 update_delta.mutated_properties.htlc_maximum_msat = true;
435                         }
436                 }
437         }
438         log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
439 }
440
441 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
442         let original_length = delta_set.len();
443         let keys: Vec<u64> = delta_set.keys().cloned().collect();
444         for k in keys {
445                 let v = delta_set.get(&k).unwrap();
446                 if v.announcement.is_none() {
447                         // this channel is not currently in the network graph
448                         delta_set.remove(&k);
449                         continue;
450                 }
451
452                 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
453                         if update.is_none() {
454                                 return false;
455                         };
456                         let update_reference = update.as_ref().unwrap();
457                         // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
458                         // if there has been an update after the channel was first seen
459
460                         v.requires_reminder || update_reference.latest_update_after_seen.is_some()
461                 };
462
463                 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
464                 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
465
466                 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
467                         delta_set.remove(&k);
468                 }
469         }
470
471         let new_length = delta_set.len();
472         if original_length != new_length {
473                 log_info!(logger, "length modified!");
474         }
475 }