Use reference timestamp for reminder calculation.
[rapid-gossip-sync-server] / src / lookup.rs
1 use std::collections::{BTreeMap, HashSet};
2 use std::io::Cursor;
3 use std::ops::Deref;
4 use std::sync::Arc;
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::NetworkGraph;
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
11
12 use futures::StreamExt;
13 use lightning::{log_debug, log_gossip, log_info};
14 use lightning::util::logger::Logger;
15
16 use crate::config;
17 use crate::serialization::MutatedProperties;
18
19 /// The delta set needs to be a BTreeMap so the keys are sorted.
20 /// That way, the scids in the response automatically grow monotonically
21 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
22
23 pub(super) struct AnnouncementDelta {
24         pub(super) seen: u32,
25         pub(super) announcement: UnsignedChannelAnnouncement,
26 }
27
28 pub(super) struct UpdateDelta {
29         pub(super) seen: u32,
30         pub(super) update: UnsignedChannelUpdate,
31 }
32
33 pub(super) struct DirectedUpdateDelta {
34         /// the last update we saw prior to the user-provided timestamp
35         pub(super) last_update_before_seen: Option<UpdateDelta>,
36         /// the latest update we saw overall
37         pub(super) latest_update_after_seen: Option<UpdateDelta>,
38         /// the set of all mutated properties across all updates between the last seen by the user and
39         /// the latest one known to us
40         pub(super) mutated_properties: MutatedProperties,
41         /// Specifically for reminder updates, the flag-only value to send to the client
42         pub(super) serialization_update_flags: Option<u8>
43 }
44
45 pub(super) struct ChannelDelta {
46         pub(super) announcement: Option<AnnouncementDelta>,
47         pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
48         pub(super) first_bidirectional_updates_seen: Option<u32>,
49         /// The seen timestamp of the older of the two latest directional updates
50         pub(super) requires_reminder: bool,
51 }
52
53 impl Default for ChannelDelta {
54         fn default() -> Self {
55                 Self {
56                         announcement: None,
57                         updates: (None, None),
58                         first_bidirectional_updates_seen: None,
59                         requires_reminder: false,
60                 }
61         }
62 }
63
64 impl Default for DirectedUpdateDelta {
65         fn default() -> Self {
66                 Self {
67                         last_update_before_seen: None,
68                         mutated_properties: MutatedProperties::default(),
69                         latest_update_after_seen: None,
70                         serialization_update_flags: None,
71                 }
72         }
73 }
74
75 /// Fetch all the channel announcements that are presently in the network graph, regardless of
76 /// whether they had been seen before.
77 /// Also include all announcements for which the first update was announced
78 /// after `last_sync_timestamp`
79 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) where L::Target: Logger {
80         log_info!(logger, "Obtaining channel ids from network graph");
81         let channel_ids = {
82                 let read_only_graph = network_graph.read_only();
83                 log_info!(logger, "Retrieved read-only network graph copy");
84                 let channel_iterator = read_only_graph.channels().unordered_iter();
85                 channel_iterator
86                         .filter(|c| c.1.announcement_message.is_some())
87                         .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
88                         .collect::<Vec<_>>()
89         };
90         #[cfg(test)]
91         log_info!(logger, "Channel IDs: {:?}", channel_ids);
92         log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
93         let last_sync_timestamp_float = last_sync_timestamp as f64;
94
95         let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
96         log_info!(logger, "Current timestamp: {}", current_timestamp);
97
98         let include_reminders = {
99                 let current_hour = current_timestamp / 3600;
100                 let current_day = current_timestamp / (24 * 3600);
101
102                 log_debug!(logger, "Current day index: {}", current_day);
103                 log_debug!(logger, "Current hour: {}", current_hour);
104
105                 // every 5th day at midnight
106                 let is_reminder_hour = (current_hour % 24) == 0;
107                 let is_reminder_day = (current_day % 5) == 0;
108
109                 let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
110                 let is_reminder_scope = snapshot_scope > (40 * 3600);
111                 log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);
112
113                 (is_reminder_hour && is_reminder_day) || is_reminder_scope
114         };
115
116         log_info!(logger, "Obtaining corresponding database entries");
117         // get all the channel announcements that are currently in the network graph
118         let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
119         let mut pinned_rows = Box::pin(announcement_rows);
120
121         let mut announcement_count = 0;
122         while let Some(row_res) = pinned_rows.next().await {
123                 let current_announcement_row = row_res.unwrap();
124                 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
125                 let mut readable = Cursor::new(blob);
126                 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
127
128                 let scid = unsigned_announcement.short_channel_id;
129                 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
130
131                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
132                 (*current_channel_delta).announcement = Some(AnnouncementDelta {
133                         announcement: unsigned_announcement,
134                         seen: current_seen_timestamp,
135                 });
136
137                 announcement_count += 1;
138         }
139         log_info!(logger, "Fetched {} announcement rows", announcement_count);
140
141         {
142                 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
143
144                 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
145                 // Steps:
146                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
147                 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
148                 // This will allow us to mark the first time updates in both directions were seen
149
150                 // here is where the channels whose first update in either direction occurred after
151                 // `last_seen_timestamp` are added to the selection
152                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
153                         [&channel_ids, &last_sync_timestamp_float];
154                 let newer_oldest_directional_updates = client.query_raw("
155                         SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
156                                 SELECT DISTINCT ON (short_channel_id) *
157                                 FROM (
158                                         SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
159                                         FROM channel_updates
160                                         WHERE short_channel_id = any($1)
161                                         ORDER BY short_channel_id ASC, direction ASC, seen ASC
162                                 ) AS directional_last_seens
163                                 ORDER BY short_channel_id ASC, seen DESC
164                         ) AS distinct_chans
165                         WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
166                         ", params).await.unwrap();
167                 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
168
169                 let mut newer_oldest_directional_update_count = 0;
170                 while let Some(row_res) = pinned_updates.next().await {
171                         let current_row = row_res.unwrap();
172
173                         let scid: i64 = current_row.get("short_channel_id");
174                         let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
175
176                         // the newer of the two oldest seen directional updates came after last sync timestamp
177                         let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
178                         // first time a channel was seen in both directions
179                         (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
180
181                         newer_oldest_directional_update_count += 1;
182                 }
183                 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
184         }
185
186         if include_reminders {
187                 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
188
189                 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
190                 // Steps:
191                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
192                 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
193                 let reminder_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as f64;
194
195                 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
196                 let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs() * 3).unwrap() as f64;
197                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
198
199                 /*
200                 What exactly is the below query doing?
201
202                 First, the inner query groups all channel updates by their scid/direction combination,
203                 and then sorts those in reverse chronological order by the "seen" column.
204
205                 Then, each row is annotated based on whether its subsequent row for the same scid/direction
206                 combination has a different value for any one of these six fields:
207                 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
208                 Those are simply the properties we use to keep track of channel mutations.
209
210                 The outer query takes all of those results and selects the first value that has a distinct
211                 successor for each scid/direction combination. That yields the first instance at which
212                 a given channel configuration was received after any prior mutations.
213
214                 Knowing that, we can check whether or not there have been any mutations within the
215                 reminder requirement window. Because we only care about that window (and potentially the
216                 2-week-window), we pre-filter the scanned updates by only those that were received within
217                 3x the timeframe that we consider necessitates reminders.
218                 */
219
220                 let mutated_updates = client.query_raw("
221                 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
222                         SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
223                                 disable<>lead(disable) OVER w1
224                                         OR
225                                 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
226                                         OR
227                                 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
228                                         OR
229                                 fee_base_msat<>lead(fee_base_msat) OVER w1
230                                         OR
231                                 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
232                                         OR
233                                 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
234                                 TRUE
235                         ) has_distinct_successor
236                         FROM channel_updates
237                         WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
238                         WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
239                 ) _
240                 WHERE has_distinct_successor
241                 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
242                 ", params).await.unwrap();
243
244                 let mut pinned_updates = Box::pin(mutated_updates);
245                 let mut older_latest_directional_update_count = 0;
246                 while let Some(row_res) = pinned_updates.next().await {
247                         let current_row = row_res.unwrap();
248                         let seen = current_row.get::<_, i64>("seen") as u32;
249
250                         if seen < reminder_threshold_timestamp as u32 {
251                                 let blob: Vec<u8> = current_row.get("blob_signed");
252                                 let mut readable = Cursor::new(blob);
253                                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
254
255                                 let scid = unsigned_channel_update.short_channel_id;
256                                 let direction: bool = current_row.get("direction");
257
258                                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
259
260                                 // We might be able to get away with not using this
261                                 (*current_channel_delta).requires_reminder = true;
262                                 older_latest_directional_update_count += 1;
263
264                                 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
265                                         if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
266                                                 // we don't send reminders if we don't have bidirectional update data
267                                                 continue;
268                                         }
269
270                                         if let Some(info) = current_channel_info.one_to_two.as_ref() {
271                                                 let flags: u8 = if info.enabled { 0 } else { 2 };
272                                                 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
273                                                 current_update.serialization_update_flags = Some(flags);
274                                         }
275
276                                         if let Some(info) = current_channel_info.two_to_one.as_ref() {
277                                                 let flags: u8 = if info.enabled { 1 } else { 3 };
278                                                 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
279                                                 current_update.serialization_update_flags = Some(flags);
280                                         }
281                                 } else {
282                                         // we don't send reminders if we don't have the channel
283                                         continue;
284                                 }
285
286                                 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
287                         }
288                 }
289                 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
290         }
291 }
292
293 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
294         let start = Instant::now();
295         let last_sync_timestamp_float = last_sync_timestamp as f64;
296
297         // get the latest channel update in each direction prior to last_sync_timestamp, provided
298         // there was an update in either direction that happened after the last sync (to avoid
299         // collecting too many reference updates)
300         let reference_rows = client.query_raw("
301                 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
302                 WHERE id IN (
303                         SELECT DISTINCT ON (short_channel_id, direction) id
304                         FROM channel_updates
305                         WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
306                                 SELECT DISTINCT ON (short_channel_id) short_channel_id
307                                 FROM channel_updates
308                                 WHERE seen >= TO_TIMESTAMP($1)
309                         )
310                         ORDER BY short_channel_id ASC, direction ASC, seen DESC
311                 )
312                 ", [last_sync_timestamp_float]).await.unwrap();
313         let mut pinned_rows = Box::pin(reference_rows);
314
315         log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
316
317         let mut last_seen_update_ids: Vec<i32> = Vec::new();
318         let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
319         let mut reference_row_count = 0;
320
321         while let Some(row_res) = pinned_rows.next().await {
322                 let current_reference = row_res.unwrap();
323                 let update_id: i32 = current_reference.get("id");
324                 last_seen_update_ids.push(update_id);
325                 non_intermediate_ids.insert(update_id);
326
327                 let direction: bool = current_reference.get("direction");
328                 let seen = current_reference.get::<_, i64>("seen") as u32;
329                 let blob: Vec<u8> = current_reference.get("blob_signed");
330                 let mut readable = Cursor::new(blob);
331                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
332                 let scid = unsigned_channel_update.short_channel_id;
333
334                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
335                 let update_delta = if !direction {
336                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
337                 } else {
338                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
339                 };
340                 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
341                 update_delta.last_update_before_seen = Some(UpdateDelta {
342                         seen,
343                         update: unsigned_channel_update,
344                 });
345
346                 reference_row_count += 1;
347         }
348
349         log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
350                 reference_row_count, delta_set.len(), start.elapsed());
351
352         // get all the intermediate channel updates
353         // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
354         // have been omitted)
355
356         let intermediate_updates = client.query_raw("
357                 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
358                 FROM channel_updates
359                 WHERE seen >= TO_TIMESTAMP($1)
360                 ORDER BY short_channel_id ASC, timestamp DESC
361                 ", [last_sync_timestamp_float]).await.unwrap();
362         let mut pinned_updates = Box::pin(intermediate_updates);
363         log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
364
365         let mut previous_scid = u64::MAX;
366         let mut previously_seen_directions = (false, false);
367
368         // let mut previously_seen_directions = (false, false);
369         let mut intermediate_update_count = 0;
370         while let Some(row_res) = pinned_updates.next().await {
371                 let intermediate_update = row_res.unwrap();
372                 let update_id: i32 = intermediate_update.get("id");
373                 if non_intermediate_ids.contains(&update_id) {
374                         continue;
375                 }
376                 intermediate_update_count += 1;
377
378                 let direction: bool = intermediate_update.get("direction");
379                 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
380                 let blob: Vec<u8> = intermediate_update.get("blob_signed");
381                 let mut readable = Cursor::new(blob);
382                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
383
384                 let scid = unsigned_channel_update.short_channel_id;
385                 if scid != previous_scid {
386                         previous_scid = scid;
387                         previously_seen_directions = (false, false);
388                 }
389
390                 // get the write configuration for this particular channel's directional details
391                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
392                 let update_delta = if !direction {
393                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
394                 } else {
395                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
396                 };
397
398                 {
399                         // handle the latest deltas
400                         if !direction && !previously_seen_directions.0 {
401                                 previously_seen_directions.0 = true;
402                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
403                                         seen: current_seen_timestamp,
404                                         update: unsigned_channel_update.clone(),
405                                 });
406                         } else if direction && !previously_seen_directions.1 {
407                                 previously_seen_directions.1 = true;
408                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
409                                         seen: current_seen_timestamp,
410                                         update: unsigned_channel_update.clone(),
411                                 });
412                         }
413                 }
414
415                 // determine mutations
416                 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
417                         if unsigned_channel_update.flags != last_seen_update.update.flags {
418                                 update_delta.mutated_properties.flags = true;
419                         }
420                         if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
421                                 update_delta.mutated_properties.cltv_expiry_delta = true;
422                         }
423                         if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
424                                 update_delta.mutated_properties.htlc_minimum_msat = true;
425                         }
426                         if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
427                                 update_delta.mutated_properties.fee_base_msat = true;
428                         }
429                         if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
430                                 update_delta.mutated_properties.fee_proportional_millionths = true;
431                         }
432                         if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
433                                 update_delta.mutated_properties.htlc_maximum_msat = true;
434                         }
435                 }
436         }
437         log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
438 }
439
440 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
441         let original_length = delta_set.len();
442         let keys: Vec<u64> = delta_set.keys().cloned().collect();
443         for k in keys {
444                 let v = delta_set.get(&k).unwrap();
445                 if v.announcement.is_none() {
446                         // this channel is not currently in the network graph
447                         delta_set.remove(&k);
448                         continue;
449                 }
450
451                 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
452                         if update.is_none() {
453                                 return false;
454                         };
455                         let update_reference = update.as_ref().unwrap();
456                         // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
457                         // if there has been an update after the channel was first seen
458
459                         v.requires_reminder || update_reference.latest_update_after_seen.is_some()
460                 };
461
462                 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
463                 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
464
465                 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
466                         delta_set.remove(&k);
467                 }
468         }
469
470         let new_length = delta_set.len();
471         if original_length != new_length {
472                 log_info!(logger, "length modified!");
473         }
474 }