Make reminder inclusion more conservative.
[rapid-gossip-sync-server] / src / lookup.rs
1 use std::collections::{BTreeMap, HashSet};
2 use std::io::Cursor;
3 use std::ops::Deref;
4 use std::sync::Arc;
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::NetworkGraph;
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
11
12 use futures::StreamExt;
13 use lightning::{log_debug, log_gossip, log_info};
14 use lightning::util::logger::Logger;
15
16 use crate::config;
17 use crate::serialization::MutatedProperties;
18
19 /// The delta set needs to be a BTreeMap so the keys are sorted.
20 /// That way, the scids in the response automatically grow monotonically
21 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
22
23 pub(super) struct AnnouncementDelta {
24         pub(super) seen: u32,
25         pub(super) announcement: UnsignedChannelAnnouncement,
26 }
27
28 pub(super) struct UpdateDelta {
29         pub(super) seen: u32,
30         pub(super) update: UnsignedChannelUpdate,
31 }
32
33 pub(super) struct DirectedUpdateDelta {
34         /// the last update we saw prior to the user-provided timestamp
35         pub(super) last_update_before_seen: Option<UpdateDelta>,
36         /// the latest update we saw overall
37         pub(super) latest_update_after_seen: Option<UpdateDelta>,
38         /// the set of all mutated properties across all updates between the last seen by the user and
39         /// the latest one known to us
40         pub(super) mutated_properties: MutatedProperties,
41         /// Specifically for reminder updates, the flag-only value to send to the client
42         pub(super) serialization_update_flags: Option<u8>
43 }
44
45 pub(super) struct ChannelDelta {
46         pub(super) announcement: Option<AnnouncementDelta>,
47         pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
48         pub(super) first_bidirectional_updates_seen: Option<u32>,
49         /// The seen timestamp of the older of the two latest directional updates
50         pub(super) requires_reminder: bool,
51 }
52
53 impl Default for ChannelDelta {
54         fn default() -> Self {
55                 Self {
56                         announcement: None,
57                         updates: (None, None),
58                         first_bidirectional_updates_seen: None,
59                         requires_reminder: false,
60                 }
61         }
62 }
63
64 impl Default for DirectedUpdateDelta {
65         fn default() -> Self {
66                 Self {
67                         last_update_before_seen: None,
68                         mutated_properties: MutatedProperties::default(),
69                         latest_update_after_seen: None,
70                         serialization_update_flags: None,
71                 }
72         }
73 }
74
75 /// Fetch all the channel announcements that are presently in the network graph, regardless of
76 /// whether they had been seen before.
77 /// Also include all announcements for which the first update was announced
78 /// after `last_sync_timestamp`
79 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
80         log_info!(logger, "Obtaining channel ids from network graph");
81         let channel_ids = {
82                 let read_only_graph = network_graph.read_only();
83                 log_info!(logger, "Retrieved read-only network graph copy");
84                 let channel_iterator = read_only_graph.channels().unordered_iter();
85                 channel_iterator
86                         .filter(|c| c.1.announcement_message.is_some())
87                         .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
88                         .collect::<Vec<_>>()
89         };
90         #[cfg(test)]
91         log_info!(logger, "Channel IDs: {:?}", channel_ids);
92         log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
93         let last_sync_timestamp_float = last_sync_timestamp as f64;
94
95         let current_time = SystemTime::now();
96         let current_timestamp = current_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
97         log_info!(logger, "Current timestamp: {}", current_timestamp);
98
99         let current_day = current_timestamp / (24 * 3600);
100         let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
101         log_debug!(logger, "Current day index: {}", current_day);
102         log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);
103         let include_reminders = ((current_day % 5) == 0 || snapshot_scope > (40 * 3600));
104
105         log_info!(logger, "Obtaining corresponding database entries");
106         // get all the channel announcements that are currently in the network graph
107         let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
108         let mut pinned_rows = Box::pin(announcement_rows);
109
110         let mut announcement_count = 0;
111         while let Some(row_res) = pinned_rows.next().await {
112                 let current_announcement_row = row_res.unwrap();
113                 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
114                 let mut readable = Cursor::new(blob);
115                 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
116
117                 let scid = unsigned_announcement.short_channel_id;
118                 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
119
120                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
121                 (*current_channel_delta).announcement = Some(AnnouncementDelta {
122                         announcement: unsigned_announcement,
123                         seen: current_seen_timestamp,
124                 });
125
126                 announcement_count += 1;
127         }
128         log_info!(logger, "Fetched {} announcement rows", announcement_count);
129
130         {
131                 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
132
133                 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
134                 // Steps:
135                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
136                 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
137                 // This will allow us to mark the first time updates in both directions were seen
138
139                 // here is where the channels whose first update in either direction occurred after
140                 // `last_seen_timestamp` are added to the selection
141                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
142                         [&channel_ids, &last_sync_timestamp_float];
143                 let newer_oldest_directional_updates = client.query_raw("
144                         SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
145                                 SELECT DISTINCT ON (short_channel_id) *
146                                 FROM (
147                                         SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
148                                         FROM channel_updates
149                                         WHERE short_channel_id = any($1)
150                                         ORDER BY short_channel_id ASC, direction ASC, seen ASC
151                                 ) AS directional_last_seens
152                                 ORDER BY short_channel_id ASC, seen DESC
153                         ) AS distinct_chans
154                         WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
155                         ", params).await.unwrap();
156                 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
157
158                 let mut newer_oldest_directional_update_count = 0;
159                 while let Some(row_res) = pinned_updates.next().await {
160                         let current_row = row_res.unwrap();
161
162                         let scid: i64 = current_row.get("short_channel_id");
163                         let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
164
165                         // the newer of the two oldest seen directional updates came after last sync timestamp
166                         let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
167                         // first time a channel was seen in both directions
168                         (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
169
170                         newer_oldest_directional_update_count += 1;
171                 }
172                 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
173         }
174
175         if include_reminders {
176                 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
177
178                 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
179                 // Steps:
180                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
181                 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
182                 let reminder_threshold_timestamp = current_time.checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
183
184                 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
185                 let reminder_lookup_threshold_timestamp = current_time.checked_sub(config::CHANNEL_REMINDER_AGE * 3).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
186                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
187
188                 /*
189                 What exactly is the below query doing?
190
191                 First, the inner query groups all channel updates by their scid/direction combination,
192                 and then sorts those in reverse chronological order by the "seen" column.
193
194                 Then, each row is annotated based on whether its subsequent row for the same scid/direction
195                 combination has a different value for any one of these six fields:
196                 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
197                 Those are simply the properties we use to keep track of channel mutations.
198
199                 The outer query takes all of those results and selects the first value that has a distinct
200                 successor for each scid/direction combination. That yields the first instance at which
201                 a given channel configuration was received after any prior mutations.
202
203                 Knowing that, we can check whether or not there have been any mutations within the
204                 reminder requirement window. Because we only care about that window (and potentially the
205                 2-week-window), we pre-filter the scanned updates by only those that were received within
206                 3x the timeframe that we consider necessitates reminders.
207                 */
208
209                 let mutated_updates = client.query_raw("
210                 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
211                         SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
212                                 disable<>lead(disable) OVER w1
213                                         OR
214                                 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
215                                         OR
216                                 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
217                                         OR
218                                 fee_base_msat<>lead(fee_base_msat) OVER w1
219                                         OR
220                                 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
221                                         OR
222                                 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
223                                 TRUE
224                         ) has_distinct_successor
225                         FROM channel_updates
226                         WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
227                         WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
228                 ) _
229                 WHERE has_distinct_successor
230                 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
231                 ", params).await.unwrap();
232
233                 let mut pinned_updates = Box::pin(mutated_updates);
234                 let mut older_latest_directional_update_count = 0;
235                 while let Some(row_res) = pinned_updates.next().await {
236                         let current_row = row_res.unwrap();
237                         let seen = current_row.get::<_, i64>("seen") as u32;
238
239                         if seen < reminder_threshold_timestamp as u32 {
240                                 let blob: Vec<u8> = current_row.get("blob_signed");
241                                 let mut readable = Cursor::new(blob);
242                                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
243
244                                 let scid = unsigned_channel_update.short_channel_id;
245                                 let direction: bool = current_row.get("direction");
246
247                                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
248
249                                 // We might be able to get away with not using this
250                                 (*current_channel_delta).requires_reminder = true;
251                                 older_latest_directional_update_count += 1;
252
253                                 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
254                                         if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
255                                                 // we don't send reminders if we don't have bidirectional update data
256                                                 continue;
257                                         }
258
259                                         if let Some(info) = current_channel_info.one_to_two.as_ref() {
260                                                 let flags: u8 = if info.enabled { 0 } else { 2 };
261                                                 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
262                                                 current_update.serialization_update_flags = Some(flags);
263                                         }
264
265                                         if let Some(info) = current_channel_info.two_to_one.as_ref() {
266                                                 let flags: u8 = if info.enabled { 1 } else { 3 };
267                                                 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
268                                                 current_update.serialization_update_flags = Some(flags);
269                                         }
270                                 } else {
271                                         // we don't send reminders if we don't have the channel
272                                         continue;
273                                 }
274
275                                 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
276                         }
277                 }
278                 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
279         }
280 }
281
282 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
283         let start = Instant::now();
284         let last_sync_timestamp_float = last_sync_timestamp as f64;
285
286         // get the latest channel update in each direction prior to last_sync_timestamp, provided
287         // there was an update in either direction that happened after the last sync (to avoid
288         // collecting too many reference updates)
289         let reference_rows = client.query_raw("
290                 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
291                 WHERE id IN (
292                         SELECT DISTINCT ON (short_channel_id, direction) id
293                         FROM channel_updates
294                         WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
295                                 SELECT DISTINCT ON (short_channel_id) short_channel_id
296                                 FROM channel_updates
297                                 WHERE seen >= TO_TIMESTAMP($1)
298                         )
299                         ORDER BY short_channel_id ASC, direction ASC, seen DESC
300                 )
301                 ", [last_sync_timestamp_float]).await.unwrap();
302         let mut pinned_rows = Box::pin(reference_rows);
303
304         log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
305
306         let mut last_seen_update_ids: Vec<i32> = Vec::new();
307         let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
308         let mut reference_row_count = 0;
309
310         while let Some(row_res) = pinned_rows.next().await {
311                 let current_reference = row_res.unwrap();
312                 let update_id: i32 = current_reference.get("id");
313                 last_seen_update_ids.push(update_id);
314                 non_intermediate_ids.insert(update_id);
315
316                 let direction: bool = current_reference.get("direction");
317                 let seen = current_reference.get::<_, i64>("seen") as u32;
318                 let blob: Vec<u8> = current_reference.get("blob_signed");
319                 let mut readable = Cursor::new(blob);
320                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
321                 let scid = unsigned_channel_update.short_channel_id;
322
323                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
324                 let update_delta = if !direction {
325                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
326                 } else {
327                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
328                 };
329                 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
330                 update_delta.last_update_before_seen = Some(UpdateDelta {
331                         seen,
332                         update: unsigned_channel_update,
333                 });
334
335                 reference_row_count += 1;
336         }
337
338         log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
339                 reference_row_count, delta_set.len(), start.elapsed());
340
341         // get all the intermediate channel updates
342         // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
343         // have been omitted)
344
345         let intermediate_updates = client.query_raw("
346                 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
347                 FROM channel_updates
348                 WHERE seen >= TO_TIMESTAMP($1)
349                 ORDER BY short_channel_id ASC, timestamp DESC
350                 ", [last_sync_timestamp_float]).await.unwrap();
351         let mut pinned_updates = Box::pin(intermediate_updates);
352         log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
353
354         let mut previous_scid = u64::MAX;
355         let mut previously_seen_directions = (false, false);
356
357         // let mut previously_seen_directions = (false, false);
358         let mut intermediate_update_count = 0;
359         while let Some(row_res) = pinned_updates.next().await {
360                 let intermediate_update = row_res.unwrap();
361                 let update_id: i32 = intermediate_update.get("id");
362                 if non_intermediate_ids.contains(&update_id) {
363                         continue;
364                 }
365                 intermediate_update_count += 1;
366
367                 let direction: bool = intermediate_update.get("direction");
368                 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
369                 let blob: Vec<u8> = intermediate_update.get("blob_signed");
370                 let mut readable = Cursor::new(blob);
371                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
372
373                 let scid = unsigned_channel_update.short_channel_id;
374                 if scid != previous_scid {
375                         previous_scid = scid;
376                         previously_seen_directions = (false, false);
377                 }
378
379                 // get the write configuration for this particular channel's directional details
380                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
381                 let update_delta = if !direction {
382                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
383                 } else {
384                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
385                 };
386
387                 {
388                         // handle the latest deltas
389                         if !direction && !previously_seen_directions.0 {
390                                 previously_seen_directions.0 = true;
391                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
392                                         seen: current_seen_timestamp,
393                                         update: unsigned_channel_update.clone(),
394                                 });
395                         } else if direction && !previously_seen_directions.1 {
396                                 previously_seen_directions.1 = true;
397                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
398                                         seen: current_seen_timestamp,
399                                         update: unsigned_channel_update.clone(),
400                                 });
401                         }
402                 }
403
404                 // determine mutations
405                 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
406                         if unsigned_channel_update.flags != last_seen_update.update.flags {
407                                 update_delta.mutated_properties.flags = true;
408                         }
409                         if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
410                                 update_delta.mutated_properties.cltv_expiry_delta = true;
411                         }
412                         if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
413                                 update_delta.mutated_properties.htlc_minimum_msat = true;
414                         }
415                         if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
416                                 update_delta.mutated_properties.fee_base_msat = true;
417                         }
418                         if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
419                                 update_delta.mutated_properties.fee_proportional_millionths = true;
420                         }
421                         if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
422                                 update_delta.mutated_properties.htlc_maximum_msat = true;
423                         }
424                 }
425         }
426         log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
427 }
428
429 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
430         let original_length = delta_set.len();
431         let keys: Vec<u64> = delta_set.keys().cloned().collect();
432         for k in keys {
433                 let v = delta_set.get(&k).unwrap();
434                 if v.announcement.is_none() {
435                         // this channel is not currently in the network graph
436                         delta_set.remove(&k);
437                         continue;
438                 }
439
440                 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
441                         if update.is_none() {
442                                 return false;
443                         };
444                         let update_reference = update.as_ref().unwrap();
445                         // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
446                         // if there has been an update after the channel was first seen
447
448                         v.requires_reminder || update_reference.latest_update_after_seen.is_some()
449                 };
450
451                 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
452                 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
453
454                 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
455                         delta_set.remove(&k);
456                 }
457         }
458
459         let new_length = delta_set.len();
460         if original_length != new_length {
461                 log_info!(logger, "length modified!");
462         }
463 }