Test storing node addresses.
[rapid-gossip-sync-server] / src / lookup.rs
1 use std::collections::{BTreeMap, HashSet};
2 use std::io::Cursor;
3 use std::ops::Deref;
4 use std::sync::Arc;
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::NetworkGraph;
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
11
12 use futures::StreamExt;
13 use lightning::{log_debug, log_gossip, log_info};
14 use lightning::util::logger::Logger;
15
16 use crate::config;
17 use crate::serialization::MutatedProperties;
18
19 /// The delta set needs to be a BTreeMap so the keys are sorted.
20 /// That way, the scids in the response automatically grow monotonically
21 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
22
23 pub(super) struct AnnouncementDelta {
24         pub(super) seen: u32,
25         pub(super) announcement: UnsignedChannelAnnouncement,
26 }
27
28 pub(super) struct UpdateDelta {
29         pub(super) seen: u32,
30         pub(super) update: UnsignedChannelUpdate,
31 }
32
33 pub(super) struct DirectedUpdateDelta {
34         /// the last update we saw prior to the user-provided timestamp
35         pub(super) last_update_before_seen: Option<UpdateDelta>,
36         /// the latest update we saw overall
37         pub(super) latest_update_after_seen: Option<UpdateDelta>,
38         /// the set of all mutated properties across all updates between the last seen by the user and
39         /// the latest one known to us
40         pub(super) mutated_properties: MutatedProperties,
41         /// Specifically for reminder updates, the flag-only value to send to the client
42         pub(super) serialization_update_flags: Option<u8>
43 }
44
45 pub(super) struct ChannelDelta {
46         pub(super) announcement: Option<AnnouncementDelta>,
47         pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
48         pub(super) first_bidirectional_updates_seen: Option<u32>,
49         /// The seen timestamp of the older of the two latest directional updates
50         pub(super) requires_reminder: bool,
51 }
52
53 impl Default for ChannelDelta {
54         fn default() -> Self {
55                 Self {
56                         announcement: None,
57                         updates: (None, None),
58                         first_bidirectional_updates_seen: None,
59                         requires_reminder: false,
60                 }
61         }
62 }
63
64 impl Default for DirectedUpdateDelta {
65         fn default() -> Self {
66                 Self {
67                         last_update_before_seen: None,
68                         mutated_properties: MutatedProperties::default(),
69                         latest_update_after_seen: None,
70                         serialization_update_flags: None,
71                 }
72         }
73 }
74
75 /// Fetch all the channel announcements that are presently in the network graph, regardless of
76 /// whether they had been seen before.
77 /// Also include all announcements for which the first update was announced
78 /// after `last_sync_timestamp`
79 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) where L::Target: Logger {
80         log_info!(logger, "Obtaining channel ids from network graph");
81         let channel_ids = {
82                 let read_only_graph = network_graph.read_only();
83                 log_info!(logger, "Retrieved read-only network graph copy");
84                 let channel_iterator = read_only_graph.channels().unordered_iter();
85                 channel_iterator
86                         .filter(|c| c.1.announcement_message.is_some() && c.1.one_to_two.is_some() && c.1.two_to_one.is_some())
87                         .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
88                         .collect::<Vec<_>>()
89         };
90         #[cfg(test)]
91         log_info!(logger, "Channel IDs: {:?}", channel_ids);
92         log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
93         let last_sync_timestamp_float = last_sync_timestamp as f64;
94
95         let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
96         log_info!(logger, "Current timestamp: {}", current_timestamp);
97
98         let include_reminders = {
99                 let current_hour = current_timestamp / 3600;
100                 let current_day = current_timestamp / (24 * 3600);
101
102                 log_debug!(logger, "Current day index: {}", current_day);
103                 log_debug!(logger, "Current hour: {}", current_hour);
104
105                 // every 5th day at midnight
106                 let is_reminder_hour = (current_hour % 24) == 0;
107                 let is_reminder_day = (current_day % 5) == 0;
108
109                 let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
110                 let is_reminder_scope = snapshot_scope > (50 * 3600);
111                 log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);
112
113                 (is_reminder_hour && is_reminder_day) || is_reminder_scope
114         };
115
116         log_info!(logger, "Obtaining corresponding database entries");
117         // get all the channel announcements that are currently in the network graph
118         let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
119         let mut pinned_rows = Box::pin(announcement_rows);
120
121         let mut announcement_count = 0;
122         while let Some(row_res) = pinned_rows.next().await {
123                 let current_announcement_row = row_res.unwrap();
124                 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
125                 let mut readable = Cursor::new(blob);
126                 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
127
128                 let scid = unsigned_announcement.short_channel_id;
129                 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
130
131                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
132                 (*current_channel_delta).announcement = Some(AnnouncementDelta {
133                         announcement: unsigned_announcement,
134                         seen: current_seen_timestamp,
135                 });
136
137                 announcement_count += 1;
138         }
139         log_info!(logger, "Fetched {} announcement rows", announcement_count);
140
141         {
142                 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
143
144                 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
145                 // Steps:
146                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
147                 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
148                 // This will allow us to mark the first time updates in both directions were seen
149
150                 // here is where the channels whose first update in either direction occurred after
151                 // `last_seen_timestamp` are added to the selection
152                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
153                         [&channel_ids, &last_sync_timestamp_float];
154                 let newer_oldest_directional_updates = client.query_raw("
155                         SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
156                                 SELECT DISTINCT ON (short_channel_id) *
157                                 FROM (
158                                         SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
159                                         FROM channel_updates
160                                         WHERE short_channel_id = any($1)
161                                         ORDER BY short_channel_id ASC, direction ASC, seen ASC
162                                 ) AS directional_last_seens
163                                 ORDER BY short_channel_id ASC, seen DESC
164                         ) AS distinct_chans
165                         WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
166                         ", params).await.unwrap();
167                 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
168
169                 let mut newer_oldest_directional_update_count = 0;
170                 while let Some(row_res) = pinned_updates.next().await {
171                         let current_row = row_res.unwrap();
172
173                         let scid: i64 = current_row.get("short_channel_id");
174                         let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
175
176                         // the newer of the two oldest seen directional updates came after last sync timestamp
177                         let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
178                         // first time a channel was seen in both directions
179                         (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
180
181                         newer_oldest_directional_update_count += 1;
182                 }
183                 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
184         }
185
186         if include_reminders {
187                 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
188
189                 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
190                 // Steps:
191                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
192                 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
193                 let reminder_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as f64;
194
195                 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
196                 let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs() * 3).unwrap() as f64;
197                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
198
199                 /*
200                 What exactly is the below query doing?
201
202                 First, the inner query groups all channel updates by their scid/direction combination,
203                 and then sorts those in reverse chronological order by the "seen" column.
204
205                 Then, each row is annotated based on whether its subsequent row for the same scid/direction
206                 combination has a different value for any one of these six fields:
207                 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
208                 Those are simply the properties we use to keep track of channel mutations.
209
210                 The outer query takes all of those results and selects the first value that has a distinct
211                 successor for each scid/direction combination. That yields the first instance at which
212                 a given channel configuration was received after any prior mutations.
213
214                 Knowing that, we can check whether or not there have been any mutations within the
215                 reminder requirement window. Because we only care about that window (and potentially the
216                 2-week-window), we pre-filter the scanned updates by only those that were received within
217                 3x the timeframe that we consider necessitates reminders.
218                 */
219
220                 let mutated_updates = client.query_raw("
221                 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
222                         SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
223                                 disable<>lead(disable) OVER w1
224                                         OR
225                                 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
226                                         OR
227                                 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
228                                         OR
229                                 fee_base_msat<>lead(fee_base_msat) OVER w1
230                                         OR
231                                 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
232                                         OR
233                                 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
234                                 TRUE
235                         ) has_distinct_successor
236                         FROM channel_updates
237                         WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
238                         WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
239                 ) _
240                 WHERE has_distinct_successor
241                 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
242                 ", params).await.unwrap();
243
244                 let mut pinned_updates = Box::pin(mutated_updates);
245                 let mut older_latest_directional_update_count = 0;
246                 while let Some(row_res) = pinned_updates.next().await {
247                         let current_row = row_res.unwrap();
248                         let seen = current_row.get::<_, i64>("seen") as u32;
249
250                         if seen < reminder_threshold_timestamp as u32 {
251                                 let blob: Vec<u8> = current_row.get("blob_signed");
252                                 let mut readable = Cursor::new(blob);
253                                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
254
255                                 let scid = unsigned_channel_update.short_channel_id;
256                                 let direction: bool = current_row.get("direction");
257
258                                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
259
260                                 // We might be able to get away with not using this
261                                 (*current_channel_delta).requires_reminder = true;
262                                 older_latest_directional_update_count += 1;
263
264                                 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
265                                         if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
266                                                 // we don't send reminders if we don't have bidirectional update data
267                                                 continue;
268                                         }
269
270                                         if let Some(info) = current_channel_info.one_to_two.as_ref() {
271                                                 let flags: u8 = if info.enabled { 0 } else { 2 };
272                                                 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
273                                                 current_update.serialization_update_flags = Some(flags);
274                                         }
275
276                                         if let Some(info) = current_channel_info.two_to_one.as_ref() {
277                                                 let flags: u8 = if info.enabled { 1 } else { 3 };
278                                                 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
279                                                 current_update.serialization_update_flags = Some(flags);
280                                         }
281                                 } else {
282                                         // we don't send reminders if we don't have the channel
283                                         continue;
284                                 }
285
286                                 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
287                         }
288                 }
289                 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
290         }
291 }
292
293 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
294         let start = Instant::now();
295         let last_sync_timestamp_float = last_sync_timestamp as f64;
296
297         // get the latest channel update in each direction prior to last_sync_timestamp, provided
298         // there was an update in either direction that happened after the last sync (to avoid
299         // collecting too many reference updates)
300         let reference_rows = client.query_raw("
301                 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
302                 WHERE id IN (
303                         SELECT DISTINCT ON (short_channel_id, direction) id
304                         FROM channel_updates
305                         WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
306                                 SELECT DISTINCT ON (short_channel_id) short_channel_id
307                                 FROM channel_updates
308                                 WHERE seen >= TO_TIMESTAMP($1)
309                         )
310                         ORDER BY short_channel_id ASC, direction ASC, seen DESC
311                 )
312                 ", [last_sync_timestamp_float]).await.unwrap();
313         let mut pinned_rows = Box::pin(reference_rows);
314
315         log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
316
317         let mut last_seen_update_ids: Vec<i32> = Vec::new();
318         let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
319         let mut reference_row_count = 0;
320
321         while let Some(row_res) = pinned_rows.next().await {
322                 let current_reference = row_res.unwrap();
323                 let update_id: i32 = current_reference.get("id");
324                 last_seen_update_ids.push(update_id);
325                 non_intermediate_ids.insert(update_id);
326
327                 let direction: bool = current_reference.get("direction");
328                 let seen = current_reference.get::<_, i64>("seen") as u32;
329                 let blob: Vec<u8> = current_reference.get("blob_signed");
330                 let mut readable = Cursor::new(blob);
331                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
332                 let scid = unsigned_channel_update.short_channel_id;
333
334                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
335                 let update_delta = if !direction {
336                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
337                 } else {
338                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
339                 };
340                 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
341                 update_delta.last_update_before_seen = Some(UpdateDelta {
342                         seen,
343                         update: unsigned_channel_update,
344                 });
345
346                 reference_row_count += 1;
347         }
348
349         log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
350                 reference_row_count, delta_set.len(), start.elapsed());
351
352         // get all the intermediate channel updates
353         // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
354         // have been omitted)
355
356         let intermediate_updates = client.query_raw("
357                 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
358                 FROM channel_updates
359                 WHERE seen >= TO_TIMESTAMP($1)
360                 ORDER BY short_channel_id ASC, timestamp DESC
361                 ", [last_sync_timestamp_float]).await.unwrap();
362         let mut pinned_updates = Box::pin(intermediate_updates);
363         log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
364
365         let mut previous_scid = u64::MAX;
366         let mut previously_seen_directions = (false, false);
367
368         let mut intermediate_update_count = 0;
369         while let Some(row_res) = pinned_updates.next().await {
370                 let intermediate_update = row_res.unwrap();
371                 let update_id: i32 = intermediate_update.get("id");
372                 if non_intermediate_ids.contains(&update_id) {
373                         continue;
374                 }
375                 intermediate_update_count += 1;
376
377                 let direction: bool = intermediate_update.get("direction");
378                 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
379                 let blob: Vec<u8> = intermediate_update.get("blob_signed");
380                 let mut readable = Cursor::new(blob);
381                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
382
383                 let scid = unsigned_channel_update.short_channel_id;
384                 if scid != previous_scid {
385                         previous_scid = scid;
386                         previously_seen_directions = (false, false);
387                 }
388
389                 // get the write configuration for this particular channel's directional details
390                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
391                 let update_delta = if !direction {
392                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
393                 } else {
394                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
395                 };
396
397                 {
398                         // handle the latest deltas
399                         if !direction && !previously_seen_directions.0 {
400                                 previously_seen_directions.0 = true;
401                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
402                                         seen: current_seen_timestamp,
403                                         update: unsigned_channel_update.clone(),
404                                 });
405                         } else if direction && !previously_seen_directions.1 {
406                                 previously_seen_directions.1 = true;
407                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
408                                         seen: current_seen_timestamp,
409                                         update: unsigned_channel_update.clone(),
410                                 });
411                         }
412                 }
413
414                 // determine mutations
415                 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
416                         if unsigned_channel_update.flags != last_seen_update.update.flags {
417                                 update_delta.mutated_properties.flags = true;
418                         }
419                         if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
420                                 update_delta.mutated_properties.cltv_expiry_delta = true;
421                         }
422                         if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
423                                 update_delta.mutated_properties.htlc_minimum_msat = true;
424                         }
425                         if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
426                                 update_delta.mutated_properties.fee_base_msat = true;
427                         }
428                         if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
429                                 update_delta.mutated_properties.fee_proportional_millionths = true;
430                         }
431                         if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
432                                 update_delta.mutated_properties.htlc_maximum_msat = true;
433                         }
434                 }
435         }
436         log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
437 }
438
439 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
440         let original_length = delta_set.len();
441         let keys: Vec<u64> = delta_set.keys().cloned().collect();
442         for k in keys {
443                 let v = delta_set.get(&k).unwrap();
444                 if v.announcement.is_none() {
445                         // this channel is not currently in the network graph
446                         delta_set.remove(&k);
447                         continue;
448                 }
449
450                 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
451                         if update.is_none() {
452                                 return false;
453                         };
454                         let update_reference = update.as_ref().unwrap();
455                         // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
456                         // if there has been an update after the channel was first seen
457
458                         v.requires_reminder || update_reference.latest_update_after_seen.is_some()
459                 };
460
461                 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
462                 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
463
464                 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
465                         delta_set.remove(&k);
466                 }
467         }
468
469         let new_length = delta_set.len();
470         if original_length != new_length {
471                 log_info!(logger, "length modified!");
472         }
473 }