Correct v2 symlink paths
[rapid-gossip-sync-server] / src / lookup.rs
1 use std::collections::{BTreeMap, HashMap, HashSet};
2 use std::io::Cursor;
3 use std::ops::Deref;
4 use std::sync::Arc;
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, SocketAddress, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::{NetworkGraph, NodeId};
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
11
12 use futures::StreamExt;
13 use lightning::{log_debug, log_gossip, log_info};
14 use lightning::ln::features::NodeFeatures;
15 use lightning::util::logger::Logger;
16
17 use crate::config;
18 use crate::serialization::MutatedProperties;
19
20 /// The delta set needs to be a BTreeMap so the keys are sorted.
21 /// That way, the scids in the response automatically grow monotonically
22 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
23 pub(super) type NodeDeltaSet = HashMap<NodeId, NodeDelta>;
24
25 pub(super) struct AnnouncementDelta {
26         pub(super) seen: u32,
27         pub(super) announcement: UnsignedChannelAnnouncement,
28 }
29
30 pub(super) struct UpdateDelta {
31         pub(super) seen: u32,
32         pub(super) update: UnsignedChannelUpdate,
33 }
34
35 pub(super) struct DirectedUpdateDelta {
36         /// the last update we saw prior to the user-provided timestamp
37         pub(super) last_update_before_seen: Option<UpdateDelta>,
38         /// the latest update we saw overall
39         pub(super) latest_update_after_seen: Option<UpdateDelta>,
40         /// the set of all mutated properties across all updates between the last seen by the user and
41         /// the latest one known to us
42         pub(super) mutated_properties: MutatedProperties,
43         /// Specifically for reminder updates, the flag-only value to send to the client
44         pub(super) serialization_update_flags: Option<u8>
45 }
46
47 pub(super) struct ChannelDelta {
48         pub(super) announcement: Option<AnnouncementDelta>,
49         pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
50         pub(super) first_bidirectional_updates_seen: Option<u32>,
51         /// The seen timestamp of the older of the two latest directional updates
52         pub(super) requires_reminder: bool,
53 }
54
55 pub(super) struct NodeDelta {
56         /// The most recently received, but new-to-the-client, node details
57         pub(super) latest_details_after_seen: Option<NodeDetails>,
58
59         /// Between last_details_before_seen and latest_details_after_seen, including any potential
60         /// intermediate updates that are not kept track of here, has the set of features this node
61         /// supports changed?
62         pub(super) has_feature_set_changed: bool,
63
64         /// Between last_details_before_seen and latest_details_after_seen, including any potential
65         /// intermediate updates that are not kept track of here, has the set of socket addresses this
66         /// node listens on changed?
67         pub(super) has_address_set_changed: bool,
68
69         /// The most recent node details that the client would have seen already
70         pub(super) last_details_before_seen: Option<NodeDetails>
71 }
72
73 pub(super) struct NodeDetails {
74         #[allow(unused)]
75         pub(super) seen: u32,
76         pub(super) features: NodeFeatures,
77         pub(super) addresses: HashSet<SocketAddress>
78 }
79
80 impl Default for ChannelDelta {
81         fn default() -> Self {
82                 Self {
83                         announcement: None,
84                         updates: (None, None),
85                         first_bidirectional_updates_seen: None,
86                         requires_reminder: false,
87                 }
88         }
89 }
90
91 impl Default for NodeDelta {
92         fn default() -> Self {
93                 Self {
94                         latest_details_after_seen: None,
95                         has_feature_set_changed: false,
96                         has_address_set_changed: false,
97                         last_details_before_seen: None,
98                 }
99         }
100 }
101
102 impl Default for DirectedUpdateDelta {
103         fn default() -> Self {
104                 Self {
105                         last_update_before_seen: None,
106                         mutated_properties: MutatedProperties::default(),
107                         latest_update_after_seen: None,
108                         serialization_update_flags: None,
109                 }
110         }
111 }
112
113 /// Fetch all the channel announcements that are presently in the network graph, regardless of
114 /// whether they had been seen before.
115 /// Also include all announcements for which the first update was announced
116 /// after `last_sync_timestamp`
117 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) where L::Target: Logger {
118         log_info!(logger, "Obtaining channel ids from network graph");
119         let channel_ids = {
120                 let read_only_graph = network_graph.read_only();
121                 log_info!(logger, "Retrieved read-only network graph copy");
122                 let channel_iterator = read_only_graph.channels().unordered_iter();
123                 channel_iterator
124                         .filter(|c| c.1.announcement_message.is_some() && c.1.one_to_two.is_some() && c.1.two_to_one.is_some())
125                         .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
126                         .collect::<Vec<_>>()
127         };
128         #[cfg(test)]
129         log_info!(logger, "Channel IDs: {:?}", channel_ids);
130         log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
131         let last_sync_timestamp_float = last_sync_timestamp as f64;
132
133         let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
134         log_info!(logger, "Current timestamp: {}", current_timestamp);
135
136         let include_reminders = {
137                 let current_hour = current_timestamp / 3600;
138                 let current_day = current_timestamp / (24 * 3600);
139
140                 log_debug!(logger, "Current day index: {}", current_day);
141                 log_debug!(logger, "Current hour: {}", current_hour);
142
143                 // every 5th day at midnight
144                 let is_reminder_hour = (current_hour % 24) == 0;
145                 let is_reminder_day = (current_day % 5) == 0;
146
147                 let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
148                 let is_reminder_scope = snapshot_scope > (50 * 3600);
149                 log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);
150
151                 (is_reminder_hour && is_reminder_day) || is_reminder_scope
152         };
153
154         log_info!(logger, "Obtaining corresponding database entries");
155         // get all the channel announcements that are currently in the network graph
156         let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
157         let mut pinned_rows = Box::pin(announcement_rows);
158
159         let mut announcement_count = 0;
160         while let Some(row_res) = pinned_rows.next().await {
161                 let current_announcement_row = row_res.unwrap();
162                 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
163                 let mut readable = Cursor::new(blob);
164                 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
165
166                 let scid = unsigned_announcement.short_channel_id;
167                 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
168
169                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
170                 (*current_channel_delta).announcement = Some(AnnouncementDelta {
171                         announcement: unsigned_announcement,
172                         seen: current_seen_timestamp,
173                 });
174
175                 announcement_count += 1;
176         }
177         log_info!(logger, "Fetched {} announcement rows", announcement_count);
178
179         {
180                 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
181
182                 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
183                 // Steps:
184                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
185                 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
186                 // This will allow us to mark the first time updates in both directions were seen
187
188                 // here is where the channels whose first update in either direction occurred after
189                 // `last_seen_timestamp` are added to the selection
190                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
191                         [&channel_ids, &last_sync_timestamp_float];
192                 let newer_oldest_directional_updates = client.query_raw("
193                         SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
194                                 SELECT DISTINCT ON (short_channel_id) *
195                                 FROM (
196                                         SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
197                                         FROM channel_updates
198                                         WHERE short_channel_id = any($1)
199                                         ORDER BY short_channel_id ASC, direction ASC, seen ASC
200                                 ) AS directional_last_seens
201                                 ORDER BY short_channel_id ASC, seen DESC
202                         ) AS distinct_chans
203                         WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
204                         ", params).await.unwrap();
205                 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
206
207                 let mut newer_oldest_directional_update_count = 0;
208                 while let Some(row_res) = pinned_updates.next().await {
209                         let current_row = row_res.unwrap();
210
211                         let scid: i64 = current_row.get("short_channel_id");
212                         let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
213
214                         // the newer of the two oldest seen directional updates came after last sync timestamp
215                         let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
216                         // first time a channel was seen in both directions
217                         (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
218
219                         newer_oldest_directional_update_count += 1;
220                 }
221                 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
222         }
223
224         if include_reminders {
225                 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
226
227                 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
228                 // Steps:
229                 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
230                 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
231                 let reminder_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as f64;
232
233                 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
234                 let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs() * 3).unwrap() as f64;
235                 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
236
237                 /*
238                 What exactly is the below query doing?
239
240                 First, the inner query groups all channel updates by their scid/direction combination,
241                 and then sorts those in reverse chronological order by the "seen" column.
242
243                 Then, each row is annotated based on whether its subsequent row for the same scid/direction
244                 combination has a different value for any one of these six fields:
245                 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
246                 Those are simply the properties we use to keep track of channel mutations.
247
248                 The outer query takes all of those results and selects the first value that has a distinct
249                 successor for each scid/direction combination. That yields the first instance at which
250                 a given channel configuration was received after any prior mutations.
251
252                 Knowing that, we can check whether or not there have been any mutations within the
253                 reminder requirement window. Because we only care about that window (and potentially the
254                 2-week-window), we pre-filter the scanned updates by only those that were received within
255                 3x the timeframe that we consider necessitates reminders.
256                 */
257
258                 let mutated_updates = client.query_raw("
259                 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
260                         SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
261                                 disable<>lead(disable) OVER w1
262                                         OR
263                                 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
264                                         OR
265                                 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
266                                         OR
267                                 fee_base_msat<>lead(fee_base_msat) OVER w1
268                                         OR
269                                 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
270                                         OR
271                                 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
272                                 TRUE
273                         ) has_distinct_successor
274                         FROM channel_updates
275                         WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
276                         WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
277                 ) _
278                 WHERE has_distinct_successor
279                 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
280                 ", params).await.unwrap();
281
282                 let mut pinned_updates = Box::pin(mutated_updates);
283                 let mut older_latest_directional_update_count = 0;
284                 while let Some(row_res) = pinned_updates.next().await {
285                         let current_row = row_res.unwrap();
286                         let seen = current_row.get::<_, i64>("seen") as u32;
287
288                         if seen < reminder_threshold_timestamp as u32 {
289                                 let blob: Vec<u8> = current_row.get("blob_signed");
290                                 let mut readable = Cursor::new(blob);
291                                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
292
293                                 let scid = unsigned_channel_update.short_channel_id;
294                                 let direction: bool = current_row.get("direction");
295
296                                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
297
298                                 // We might be able to get away with not using this
299                                 (*current_channel_delta).requires_reminder = true;
300                                 older_latest_directional_update_count += 1;
301
302                                 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
303                                         if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
304                                                 // we don't send reminders if we don't have bidirectional update data
305                                                 continue;
306                                         }
307
308                                         if let Some(info) = current_channel_info.one_to_two.as_ref() {
309                                                 let flags: u8 = if info.enabled { 0 } else { 2 };
310                                                 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
311                                                 current_update.serialization_update_flags = Some(flags);
312                                         }
313
314                                         if let Some(info) = current_channel_info.two_to_one.as_ref() {
315                                                 let flags: u8 = if info.enabled { 1 } else { 3 };
316                                                 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
317                                                 current_update.serialization_update_flags = Some(flags);
318                                         }
319                                 } else {
320                                         // we don't send reminders if we don't have the channel
321                                         continue;
322                                 }
323
324                                 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
325                         }
326                 }
327                 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
328         }
329 }
330
331 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
332         let start = Instant::now();
333         let last_sync_timestamp_float = last_sync_timestamp as f64;
334
335         // get the latest channel update in each direction prior to last_sync_timestamp, provided
336         // there was an update in either direction that happened after the last sync (to avoid
337         // collecting too many reference updates)
338         let reference_rows = client.query_raw("
339                 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
340                 WHERE id IN (
341                         SELECT DISTINCT ON (short_channel_id, direction) id
342                         FROM channel_updates
343                         WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
344                                 SELECT DISTINCT ON (short_channel_id) short_channel_id
345                                 FROM channel_updates
346                                 WHERE seen >= TO_TIMESTAMP($1)
347                         )
348                         ORDER BY short_channel_id ASC, direction ASC, seen DESC
349                 )
350                 ", [last_sync_timestamp_float]).await.unwrap();
351         let mut pinned_rows = Box::pin(reference_rows);
352
353         log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
354
355         let mut last_seen_update_ids: Vec<i32> = Vec::new();
356         let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
357         let mut reference_row_count = 0;
358
359         while let Some(row_res) = pinned_rows.next().await {
360                 let current_reference = row_res.unwrap();
361                 let update_id: i32 = current_reference.get("id");
362                 last_seen_update_ids.push(update_id);
363                 non_intermediate_ids.insert(update_id);
364
365                 let direction: bool = current_reference.get("direction");
366                 let seen = current_reference.get::<_, i64>("seen") as u32;
367                 let blob: Vec<u8> = current_reference.get("blob_signed");
368                 let mut readable = Cursor::new(blob);
369                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
370                 let scid = unsigned_channel_update.short_channel_id;
371
372                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
373                 let update_delta = if !direction {
374                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
375                 } else {
376                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
377                 };
378                 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
379                 update_delta.last_update_before_seen = Some(UpdateDelta {
380                         seen,
381                         update: unsigned_channel_update,
382                 });
383
384                 reference_row_count += 1;
385         }
386
387         log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
388                 reference_row_count, delta_set.len(), start.elapsed());
389
390         // get all the intermediate channel updates
391         // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
392         // have been omitted)
393
394         let intermediate_updates = client.query_raw("
395                 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
396                 FROM channel_updates
397                 WHERE seen >= TO_TIMESTAMP($1)
398                 ORDER BY short_channel_id ASC, timestamp DESC
399                 ", [last_sync_timestamp_float]).await.unwrap();
400         let mut pinned_updates = Box::pin(intermediate_updates);
401         log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
402
403         let mut previous_scid = u64::MAX;
404         let mut previously_seen_directions = (false, false);
405
406         let mut intermediate_update_count = 0;
407         while let Some(row_res) = pinned_updates.next().await {
408                 let intermediate_update = row_res.unwrap();
409                 let update_id: i32 = intermediate_update.get("id");
410                 if non_intermediate_ids.contains(&update_id) {
411                         continue;
412                 }
413                 intermediate_update_count += 1;
414
415                 let direction: bool = intermediate_update.get("direction");
416                 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
417                 let blob: Vec<u8> = intermediate_update.get("blob_signed");
418                 let mut readable = Cursor::new(blob);
419                 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
420
421                 let scid = unsigned_channel_update.short_channel_id;
422                 if scid != previous_scid {
423                         previous_scid = scid;
424                         previously_seen_directions = (false, false);
425                 }
426
427                 // get the write configuration for this particular channel's directional details
428                 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
429                 let update_delta = if !direction {
430                         (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
431                 } else {
432                         (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
433                 };
434
435                 {
436                         // handle the latest deltas
437                         if !direction && !previously_seen_directions.0 {
438                                 previously_seen_directions.0 = true;
439                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
440                                         seen: current_seen_timestamp,
441                                         update: unsigned_channel_update.clone(),
442                                 });
443                         } else if direction && !previously_seen_directions.1 {
444                                 previously_seen_directions.1 = true;
445                                 update_delta.latest_update_after_seen = Some(UpdateDelta {
446                                         seen: current_seen_timestamp,
447                                         update: unsigned_channel_update.clone(),
448                                 });
449                         }
450                 }
451
452                 // determine mutations
453                 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
454                         if unsigned_channel_update.flags != last_seen_update.update.flags {
455                                 update_delta.mutated_properties.flags = true;
456                         }
457                         if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
458                                 update_delta.mutated_properties.cltv_expiry_delta = true;
459                         }
460                         if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
461                                 update_delta.mutated_properties.htlc_minimum_msat = true;
462                         }
463                         if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
464                                 update_delta.mutated_properties.fee_base_msat = true;
465                         }
466                         if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
467                                 update_delta.mutated_properties.fee_proportional_millionths = true;
468                         }
469                         if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
470                                 update_delta.mutated_properties.htlc_maximum_msat = true;
471                         }
472                 }
473         }
474         log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
475 }
476
477 pub(super) async fn fetch_node_updates<L: Deref>(client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger {
478         let start = Instant::now();
479         let last_sync_timestamp_float = last_sync_timestamp as f64;
480
481         let mut delta_set = NodeDeltaSet::new();
482
483         // get the latest node updates prior to last_sync_timestamp
484         let reference_rows = client.query_raw("
485                 SELECT DISTINCT ON (public_key) public_key, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, announcement_signed
486                 FROM node_announcements
487                 WHERE seen < TO_TIMESTAMP($1)
488                 ORDER BY public_key ASC, seen DESC
489                 ", [last_sync_timestamp_float]).await.unwrap();
490         let mut pinned_rows = Box::pin(reference_rows);
491
492         log_info!(logger, "Fetched node announcement reference rows in {:?}", start.elapsed());
493
494         let mut reference_row_count = 0;
495
496         while let Some(row_res) = pinned_rows.next().await {
497                 let current_reference = row_res.unwrap();
498
499                 let seen = current_reference.get::<_, i64>("seen") as u32;
500                 let blob: Vec<u8> = current_reference.get("announcement_signed");
501                 let mut readable = Cursor::new(blob);
502                 let unsigned_node_announcement = NodeAnnouncement::read(&mut readable).unwrap().contents;
503                 let node_id = unsigned_node_announcement.node_id;
504
505                 let current_node_delta = delta_set.entry(node_id).or_insert(NodeDelta::default());
506                 (*current_node_delta).last_details_before_seen.get_or_insert_with(|| {
507                         let address_set: HashSet<SocketAddress> = unsigned_node_announcement.addresses.into_iter().collect();
508                         NodeDetails {
509                                 seen,
510                                 features: unsigned_node_announcement.features,
511                                 addresses: address_set,
512                         }
513                 });
514                 log_gossip!(logger, "Node {} last update before seen: {} (seen at {})", node_id, unsigned_node_announcement.timestamp, seen);
515
516                 reference_row_count += 1;
517         }
518
519
520         log_info!(logger, "Processed {} node announcement reference rows (delta size: {}) in {:?}",
521                 reference_row_count, delta_set.len(), start.elapsed());
522
523         // get all the intermediate node updates
524         // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
525         // have been omitted)
526         let intermediate_updates = client.query_raw("
527                 SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
528                 FROM node_announcements
529                 WHERE seen >= TO_TIMESTAMP($1)
530                 ORDER BY public_key ASC, timestamp DESC
531                 ", [last_sync_timestamp_float]).await.unwrap();
532         let mut pinned_updates = Box::pin(intermediate_updates);
533         log_info!(logger, "Fetched intermediate node announcement rows in {:?}", start.elapsed());
534
535         let mut previous_node_id: Option<NodeId> = None;
536
537         let mut intermediate_update_count = 0;
538         while let Some(row_res) = pinned_updates.next().await {
539                 let intermediate_update = row_res.unwrap();
540                 intermediate_update_count += 1;
541
542                 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
543                 let blob: Vec<u8> = intermediate_update.get("announcement_signed");
544                 let mut readable = Cursor::new(blob);
545                 let unsigned_node_announcement = NodeAnnouncement::read(&mut readable).unwrap().contents;
546
547                 let node_id = unsigned_node_announcement.node_id;
548                 let is_previously_processed_node_id = Some(node_id) == previous_node_id;
549
550                 // get this node's address set
551                 let current_node_delta = delta_set.entry(node_id).or_insert(NodeDelta::default());
552                 let address_set: HashSet<SocketAddress> = unsigned_node_announcement.addresses.into_iter().collect();
553
554                 // determine mutations
555                 if let Some(last_seen_update) = current_node_delta.last_details_before_seen.as_ref() {
556                         if unsigned_node_announcement.features != last_seen_update.features {
557                                 current_node_delta.has_feature_set_changed = true;
558                         }
559                         if address_set != last_seen_update.addresses {
560                                 current_node_delta.has_address_set_changed = true;
561                         }
562                 }
563
564                 if !is_previously_processed_node_id {
565                         (*current_node_delta).latest_details_after_seen.get_or_insert(NodeDetails {
566                                 seen: current_seen_timestamp,
567                                 features: unsigned_node_announcement.features,
568                                 addresses: address_set,
569                         });
570                 }
571
572                 previous_node_id = Some(node_id);
573         }
574         log_info!(logger, "Processed intermediate node announcement rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
575
576         delta_set
577 }
578
579 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
580         let original_length = delta_set.len();
581         let keys: Vec<u64> = delta_set.keys().cloned().collect();
582         for k in keys {
583                 let v = delta_set.get(&k).unwrap();
584                 if v.announcement.is_none() {
585                         // this channel is not currently in the network graph
586                         delta_set.remove(&k);
587                         continue;
588                 }
589
590                 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
591                         if update.is_none() {
592                                 return false;
593                         };
594                         let update_reference = update.as_ref().unwrap();
595                         // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
596                         // if there has been an update after the channel was first seen
597
598                         v.requires_reminder || update_reference.latest_update_after_seen.is_some()
599                 };
600
601                 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
602                 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
603
604                 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
605                         delta_set.remove(&k);
606                 }
607         }
608
609         let new_length = delta_set.len();
610         if original_length != new_length {
611                 log_info!(logger, "length modified!");
612         }
613 }