fbc16729bb96cccccd7e6fec7d7ec72d0f349fcf
[rapid-gossip-sync-server] / src / lookup.rs
1 use std::collections::{BTreeMap, HashSet};
2 use std::io::Cursor;
3 use std::ops::Deref;
4 use std::sync::Arc;
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::NetworkGraph;
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
11
12 use futures::StreamExt;
13 use lightning::{log_gossip, log_info};
14 use lightning::util::logger::Logger;
15
16 use crate::config;
17 use crate::serialization::MutatedProperties;
18
19 /// The delta set needs to be a BTreeMap so the keys are sorted.
20 /// That way, the scids in the response automatically grow monotonically
21 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
22
23 pub(super) struct AnnouncementDelta {
24         pub(super) seen: u32,
25         pub(super) announcement: UnsignedChannelAnnouncement,
26 }
27
28 pub(super) struct UpdateDelta {
29         pub(super) seen: u32,
30         pub(super) update: UnsignedChannelUpdate,
31 }
32
33 pub(super) struct DirectedUpdateDelta {
34         /// the last update we saw prior to the user-provided timestamp
35         pub(super) last_update_before_seen: Option<UpdateDelta>,
36         /// the latest update we saw overall
37         pub(super) latest_update_after_seen: Option<UpdateDelta>,
38         /// the set of all mutated properties across all updates between the last seen by the user and
39         /// the latest one known to us
40         pub(super) mutated_properties: MutatedProperties,
41         /// Specifically for reminder updates, the flag-only value to send to the client
42         pub(super) serialization_update_flags: Option<u8>
43 }
44
45 pub(super) struct ChannelDelta {
46         pub(super) announcement: Option<AnnouncementDelta>,
47         pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
48         pub(super) first_bidirectional_updates_seen: Option<u32>,
49         /// The seen timestamp of the older of the two latest directional updates
50         pub(super) requires_reminder: bool,
51 }
52
53 impl Default for ChannelDelta {
54         fn default() -> Self {
55                 Self {
56                         announcement: None,
57                         updates: (None, None),
58                         first_bidirectional_updates_seen: None,
59                         requires_reminder: false,
60                 }
61         }
62 }
63
64 impl Default for DirectedUpdateDelta {
65         fn default() -> Self {
66                 Self {
67                         last_update_before_seen: None,
68                         mutated_properties: MutatedProperties::default(),
69                         latest_update_after_seen: None,
70                         serialization_update_flags: None,
71                 }
72         }
73 }
74
75 /// Fetch all the channel announcements that are presently in the network graph, regardless of
76 /// whether they had been seen before.
77 /// Also include all announcements for which the first update was announced
78 /// after `last_sync_timestamp`
79 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
80         log_info!(logger, "Obtaining channel ids from network graph");
81         let channel_ids = {
82                 let read_only_graph = network_graph.read_only();
83                 log_info!(logger, "Retrieved read-only network graph copy");
84                 let channel_iterator = read_only_graph.channels().unordered_iter();
85                 channel_iterator
86                         .filter(|c| c.1.announcement_message.is_some())
87                         .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
88                         .collect::<Vec<_>>()
89         };
90         #[cfg(test)]
91         log_info!(logger, "Channel IDs: {:?}", channel_ids);
92         log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
93         let last_sync_timestamp_float = last_sync_timestamp as f64;
94
95         log_info!(logger, "Obtaining corresponding database entries");
96         // get all the channel announcements that are currently in the network graph
97         let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
98         let mut pinned_rows = Box::pin(announcement_rows);
99
100         let mut announcement_count = 0;
101         while let Some(row_res) = pinned_rows.next().await {
102                 let current_announcement_row = row_res.unwrap();
103                 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
104                 let mut readable = Cursor::new(blob);
105                 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
106
107                 let scid = unsigned_announcement.short_channel_id;
108                 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
109
110                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
111                 (*current_channel_delta).announcement = Some(AnnouncementDelta {
112                         announcement: unsigned_announcement,
113                         seen: current_seen_timestamp,
114                 });
115
116                 announcement_count += 1;
117         }
118         log_info!(logger, "Fetched {} announcement rows", announcement_count);
119
120         {
121                 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
122
123                 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
124                 // Steps:
125                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
126                 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
127                 // This will allow us to mark the first time updates in both directions were seen
128
129                 // here is where the channels whose first update in either direction occurred after
130                 // `last_seen_timestamp` are added to the selection
131                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
132                         [&channel_ids, &last_sync_timestamp_float];
133                 let newer_oldest_directional_updates = client.query_raw("
134                         SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
135                                 SELECT DISTINCT ON (short_channel_id) *
136                                 FROM (
137                                         SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
138                                         FROM channel_updates
139                                         WHERE short_channel_id = any($1)
140                                         ORDER BY short_channel_id ASC, direction ASC, seen ASC
141                                 ) AS directional_last_seens
142                                 ORDER BY short_channel_id ASC, seen DESC
143                         ) AS distinct_chans
144                         WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
145                         ", params).await.unwrap();
146                 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
147
148                 let mut newer_oldest_directional_update_count = 0;
149                 while let Some(row_res) = pinned_updates.next().await {
150                         let current_row = row_res.unwrap();
151
152                         let scid: i64 = current_row.get("short_channel_id");
153                         let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
154
155                         // the newer of the two oldest seen directional updates came after last sync timestamp
156                         let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
157                         // first time a channel was seen in both directions
158                         (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
159
160                         newer_oldest_directional_update_count += 1;
161                 }
162                 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
163         }
164
165         {
166                 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
167
168                 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
169                 // Steps:
170                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
171                 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
172                 let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
173
174                 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
175                 let reminder_lookup_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE * 3).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
176                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
177
178                 /*
179                 What exactly is the below query doing?
180
181                 First, the inner query groups all channel updates by their scid/direction combination,
182                 and then sorts those in reverse chronological order by the "seen" column.
183
184                 Then, each row is annotated based on whether its subsequent row for the same scid/direction
185                 combination has a different value for any one of these six fields:
186                 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
187                 Those are simply the properties we use to keep track of channel mutations.
188
189                 The outer query takes all of those results and selects the first value that has a distinct
190                 successor for each scid/direction combination. That yields the first instance at which
191                 a given channel configuration was received after any prior mutations.
192
193                 Knowing that, we can check whether or not there have been any mutations within the
194                 reminder requirement window. Because we only care about that window (and potentially the
195                 2-week-window), we pre-filter the scanned updates by only those that were received within
196                 3x the timeframe that we consider necessitates reminders.
197                 */
198
199                 let mutated_updates = client.query_raw("
200                 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
201                         SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
202                                 disable<>lead(disable) OVER w1
203                                         OR
204                                 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
205                                         OR
206                                 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
207                                         OR
208                                 fee_base_msat<>lead(fee_base_msat) OVER w1
209                                         OR
210                                 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
211                                         OR
212                                 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
213                                 TRUE
214                         ) has_distinct_successor
215                         FROM channel_updates
216                         WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
217                         WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
218                 ) _
219                 WHERE has_distinct_successor
220                 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
221                 ", params).await.unwrap();
222
223                 let mut pinned_updates = Box::pin(mutated_updates);
224                 let mut older_latest_directional_update_count = 0;
225                 while let Some(row_res) = pinned_updates.next().await {
226                         let current_row = row_res.unwrap();
227                         let seen = current_row.get::<_, i64>("seen") as u32;
228
229                         if seen < reminder_threshold_timestamp as u32 {
230                                 let blob: Vec<u8> = current_row.get("blob_signed");
231                                 let mut readable = Cursor::new(blob);
232                                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
233
234                                 let scid = unsigned_channel_update.short_channel_id;
235                                 let direction: bool = current_row.get("direction");
236
237                                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
238
239                                 // We might be able to get away with not using this
240                                 (*current_channel_delta).requires_reminder = true;
241                                 older_latest_directional_update_count += 1;
242
243                                 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
244                                         if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
245                                                 // we don't send reminders if we don't have bidirectional update data
246                                                 continue;
247                                         }
248
249                                         if let Some(info) = current_channel_info.one_to_two.as_ref() {
250                                                 let flags: u8 = if info.enabled { 0 } else { 2 };
251                                                 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
252                                                 current_update.serialization_update_flags = Some(flags);
253                                         }
254
255                                         if let Some(info) = current_channel_info.two_to_one.as_ref() {
256                                                 let flags: u8 = if info.enabled { 1 } else { 3 };
257                                                 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
258                                                 current_update.serialization_update_flags = Some(flags);
259                                         }
260                                 } else {
261                                         // we don't send reminders if we don't have the channel
262                                         continue;
263                                 }
264
265                                 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
266                         }
267                 }
268                 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
269         }
270 }
271
272 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
273         let start = Instant::now();
274         let last_sync_timestamp_float = last_sync_timestamp as f64;
275
276         // get the latest channel update in each direction prior to last_sync_timestamp, provided
277         // there was an update in either direction that happened after the last sync (to avoid
278         // collecting too many reference updates)
279         let reference_rows = client.query_raw("
280                 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
281                 WHERE id IN (
282                         SELECT DISTINCT ON (short_channel_id, direction) id
283                         FROM channel_updates
284                         WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
285                                 SELECT DISTINCT ON (short_channel_id) short_channel_id
286                                 FROM channel_updates
287                                 WHERE seen >= TO_TIMESTAMP($1)
288                         )
289                         ORDER BY short_channel_id ASC, direction ASC, seen DESC
290                 )
291                 ", [last_sync_timestamp_float]).await.unwrap();
292         let mut pinned_rows = Box::pin(reference_rows);
293
294         log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
295
296         let mut last_seen_update_ids: Vec<i32> = Vec::new();
297         let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
298         let mut reference_row_count = 0;
299
300         while let Some(row_res) = pinned_rows.next().await {
301                 let current_reference = row_res.unwrap();
302                 let update_id: i32 = current_reference.get("id");
303                 last_seen_update_ids.push(update_id);
304                 non_intermediate_ids.insert(update_id);
305
306                 let direction: bool = current_reference.get("direction");
307                 let seen = current_reference.get::<_, i64>("seen") as u32;
308                 let blob: Vec<u8> = current_reference.get("blob_signed");
309                 let mut readable = Cursor::new(blob);
310                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
311                 let scid = unsigned_channel_update.short_channel_id;
312
313                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
314                 let update_delta = if !direction {
315                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
316                 } else {
317                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
318                 };
319                 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
320                 update_delta.last_update_before_seen = Some(UpdateDelta {
321                         seen,
322                         update: unsigned_channel_update,
323                 });
324
325                 reference_row_count += 1;
326         }
327
328         log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
329                 reference_row_count, delta_set.len(), start.elapsed());
330
331         // get all the intermediate channel updates
332         // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
333         // have been omitted)
334
335         let intermediate_updates = client.query_raw("
336                 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
337                 FROM channel_updates
338                 WHERE seen >= TO_TIMESTAMP($1)
339                 ORDER BY short_channel_id ASC, timestamp DESC
340                 ", [last_sync_timestamp_float]).await.unwrap();
341         let mut pinned_updates = Box::pin(intermediate_updates);
342         log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
343
344         let mut previous_scid = u64::MAX;
345         let mut previously_seen_directions = (false, false);
346
347         // let mut previously_seen_directions = (false, false);
348         let mut intermediate_update_count = 0;
349         while let Some(row_res) = pinned_updates.next().await {
350                 let intermediate_update = row_res.unwrap();
351                 let update_id: i32 = intermediate_update.get("id");
352                 if non_intermediate_ids.contains(&update_id) {
353                         continue;
354                 }
355                 intermediate_update_count += 1;
356
357                 let direction: bool = intermediate_update.get("direction");
358                 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
359                 let blob: Vec<u8> = intermediate_update.get("blob_signed");
360                 let mut readable = Cursor::new(blob);
361                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
362
363                 let scid = unsigned_channel_update.short_channel_id;
364                 if scid != previous_scid {
365                         previous_scid = scid;
366                         previously_seen_directions = (false, false);
367                 }
368
369                 // get the write configuration for this particular channel's directional details
370                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
371                 let update_delta = if !direction {
372                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
373                 } else {
374                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
375                 };
376
377                 {
378                         // handle the latest deltas
379                         if !direction && !previously_seen_directions.0 {
380                                 previously_seen_directions.0 = true;
381                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
382                                         seen: current_seen_timestamp,
383                                         update: unsigned_channel_update.clone(),
384                                 });
385                         } else if direction && !previously_seen_directions.1 {
386                                 previously_seen_directions.1 = true;
387                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
388                                         seen: current_seen_timestamp,
389                                         update: unsigned_channel_update.clone(),
390                                 });
391                         }
392                 }
393
394                 // determine mutations
395                 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
396                         if unsigned_channel_update.flags != last_seen_update.update.flags {
397                                 update_delta.mutated_properties.flags = true;
398                         }
399                         if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
400                                 update_delta.mutated_properties.cltv_expiry_delta = true;
401                         }
402                         if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
403                                 update_delta.mutated_properties.htlc_minimum_msat = true;
404                         }
405                         if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
406                                 update_delta.mutated_properties.fee_base_msat = true;
407                         }
408                         if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
409                                 update_delta.mutated_properties.fee_proportional_millionths = true;
410                         }
411                         if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
412                                 update_delta.mutated_properties.htlc_maximum_msat = true;
413                         }
414                 }
415         }
416         log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
417 }
418
419 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
420         let original_length = delta_set.len();
421         let keys: Vec<u64> = delta_set.keys().cloned().collect();
422         for k in keys {
423                 let v = delta_set.get(&k).unwrap();
424                 if v.announcement.is_none() {
425                         // this channel is not currently in the network graph
426                         delta_set.remove(&k);
427                         continue;
428                 }
429
430                 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
431                         if update.is_none() {
432                                 return false;
433                         };
434                         let update_reference = update.as_ref().unwrap();
435                         // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
436                         // if there has been an update after the channel was first seen
437
438                         v.requires_reminder || update_reference.latest_update_after_seen.is_some()
439                 };
440
441                 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
442                 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
443
444                 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
445                         delta_set.remove(&k);
446                 }
447         }
448
449         let new_length = delta_set.len();
450         if original_length != new_length {
451                 log_info!(logger, "length modified!");
452         }
453 }