1 use std::collections::{BTreeMap, HashMap, HashSet};
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, SocketAddress, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::{NetworkGraph, NodeId};
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
12 use futures::StreamExt;
13 use lightning::{log_debug, log_gossip, log_info};
14 use lightning::ln::features::NodeFeatures;
15 use lightning::util::logger::Logger;
18 use crate::serialization::MutatedProperties;
20 /// The delta set needs to be a BTreeMap so the keys are sorted.
21 /// That way, the scids in the response automatically grow monotonically
22 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
23 pub(super) type NodeDeltaSet = HashMap<NodeId, NodeDelta>;
25 pub(super) struct AnnouncementDelta {
27 pub(super) announcement: UnsignedChannelAnnouncement,
30 pub(super) struct UpdateDelta {
32 pub(super) update: UnsignedChannelUpdate,
35 pub(super) struct DirectedUpdateDelta {
36 /// the last update we saw prior to the user-provided timestamp
37 pub(super) last_update_before_seen: Option<UpdateDelta>,
38 /// the latest update we saw overall
39 pub(super) latest_update_after_seen: Option<UpdateDelta>,
40 /// the set of all mutated properties across all updates between the last seen by the user and
41 /// the latest one known to us
42 pub(super) mutated_properties: MutatedProperties,
43 /// Specifically for reminder updates, the flag-only value to send to the client
44 pub(super) serialization_update_flags: Option<u8>
47 pub(super) struct ChannelDelta {
48 pub(super) announcement: Option<AnnouncementDelta>,
49 pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
50 pub(super) first_bidirectional_updates_seen: Option<u32>,
51 /// The seen timestamp of the older of the two latest directional updates
52 pub(super) requires_reminder: bool,
55 pub(super) struct NodeDelta {
56 /// The most recently received, but new-to-the-client, node details
57 pub(super) latest_details_after_seen: Option<NodeDetails>,
59 /// Between last_details_before_seen and latest_details_after_seen, including any potential
60 /// intermediate updates that are not kept track of here, has the set of features this node
62 pub(super) has_feature_set_changed: bool,
64 /// Between last_details_before_seen and latest_details_after_seen, including any potential
65 /// intermediate updates that are not kept track of here, has the set of socket addresses this
66 /// node listens on changed?
67 pub(super) has_address_set_changed: bool,
69 /// The most recent node details that the client would have seen already
70 pub(super) last_details_before_seen: Option<NodeDetails>
73 pub(super) struct NodeDetails {
76 pub(super) features: NodeFeatures,
77 pub(super) addresses: HashSet<SocketAddress>
80 impl Default for ChannelDelta {
81 fn default() -> Self {
84 updates: (None, None),
85 first_bidirectional_updates_seen: None,
86 requires_reminder: false,
91 impl Default for NodeDelta {
92 fn default() -> Self {
94 latest_details_after_seen: None,
95 has_feature_set_changed: false,
96 has_address_set_changed: false,
97 last_details_before_seen: None,
102 impl Default for DirectedUpdateDelta {
103 fn default() -> Self {
105 last_update_before_seen: None,
106 mutated_properties: MutatedProperties::default(),
107 latest_update_after_seen: None,
108 serialization_update_flags: None,
113 /// Fetch all the channel announcements that are presently in the network graph, regardless of
114 /// whether they had been seen before.
115 /// Also include all announcements for which the first update was announced
116 /// after `last_sync_timestamp`
117 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) where L::Target: Logger {
118 log_info!(logger, "Obtaining channel ids from network graph");
120 let read_only_graph = network_graph.read_only();
121 log_info!(logger, "Retrieved read-only network graph copy");
122 let channel_iterator = read_only_graph.channels().unordered_iter();
124 .filter(|c| c.1.announcement_message.is_some() && c.1.one_to_two.is_some() && c.1.two_to_one.is_some())
125 .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
129 log_info!(logger, "Channel IDs: {:?}", channel_ids);
130 log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
131 let last_sync_timestamp_float = last_sync_timestamp as f64;
133 let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
134 log_info!(logger, "Current timestamp: {}", current_timestamp);
136 let include_reminders = {
137 let current_hour = current_timestamp / 3600;
138 let current_day = current_timestamp / (24 * 3600);
140 log_debug!(logger, "Current day index: {}", current_day);
141 log_debug!(logger, "Current hour: {}", current_hour);
143 // every 5th day at midnight
144 let is_reminder_hour = (current_hour % 24) == 0;
145 let is_reminder_day = (current_day % 5) == 0;
147 let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
148 let is_reminder_scope = snapshot_scope > (50 * 3600);
149 log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);
151 (is_reminder_hour && is_reminder_day) || is_reminder_scope
154 log_info!(logger, "Obtaining corresponding database entries");
155 // get all the channel announcements that are currently in the network graph
156 let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
157 let mut pinned_rows = Box::pin(announcement_rows);
159 let mut announcement_count = 0;
160 while let Some(row_res) = pinned_rows.next().await {
161 let current_announcement_row = row_res.unwrap();
162 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
163 let mut readable = Cursor::new(blob);
164 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
166 let scid = unsigned_announcement.short_channel_id;
167 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
169 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
170 (*current_channel_delta).announcement = Some(AnnouncementDelta {
171 announcement: unsigned_announcement,
172 seen: current_seen_timestamp,
175 announcement_count += 1;
177 log_info!(logger, "Fetched {} announcement rows", announcement_count);
180 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
182 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
184 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
185 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
186 // This will allow us to mark the first time updates in both directions were seen
188 // here is where the channels whose first update in either direction occurred after
189 // `last_seen_timestamp` are added to the selection
190 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
191 [&channel_ids, &last_sync_timestamp_float];
192 let newer_oldest_directional_updates = client.query_raw("
193 SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
194 SELECT DISTINCT ON (short_channel_id) *
196 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
198 WHERE short_channel_id = any($1)
199 ORDER BY short_channel_id ASC, direction ASC, seen ASC
200 ) AS directional_last_seens
201 ORDER BY short_channel_id ASC, seen DESC
203 WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
204 ", params).await.unwrap();
205 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
207 let mut newer_oldest_directional_update_count = 0;
208 while let Some(row_res) = pinned_updates.next().await {
209 let current_row = row_res.unwrap();
211 let scid: i64 = current_row.get("short_channel_id");
212 let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
214 // the newer of the two oldest seen directional updates came after last sync timestamp
215 let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
216 // first time a channel was seen in both directions
217 (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
219 newer_oldest_directional_update_count += 1;
221 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
224 if include_reminders {
225 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
227 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
229 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
230 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
231 let reminder_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as f64;
233 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
234 let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs() * 3).unwrap() as f64;
235 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
238 What exactly is the below query doing?
240 First, the inner query groups all channel updates by their scid/direction combination,
241 and then sorts those in reverse chronological order by the "seen" column.
243 Then, each row is annotated based on whether its subsequent row for the same scid/direction
244 combination has a different value for any one of these six fields:
245 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
246 Those are simply the properties we use to keep track of channel mutations.
248 The outer query takes all of those results and selects the first value that has a distinct
249 successor for each scid/direction combination. That yields the first instance at which
250 a given channel configuration was received after any prior mutations.
252 Knowing that, we can check whether or not there have been any mutations within the
253 reminder requirement window. Because we only care about that window (and potentially the
254 2-week-window), we pre-filter the scanned updates by only those that were received within
255 3x the timeframe that we consider necessitates reminders.
258 let mutated_updates = client.query_raw("
259 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
260 SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
261 disable<>lead(disable) OVER w1
263 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
265 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
267 fee_base_msat<>lead(fee_base_msat) OVER w1
269 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
271 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
273 ) has_distinct_successor
275 WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
276 WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
278 WHERE has_distinct_successor
279 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
280 ", params).await.unwrap();
282 let mut pinned_updates = Box::pin(mutated_updates);
283 let mut older_latest_directional_update_count = 0;
284 while let Some(row_res) = pinned_updates.next().await {
285 let current_row = row_res.unwrap();
286 let seen = current_row.get::<_, i64>("seen") as u32;
288 if seen < reminder_threshold_timestamp as u32 {
289 let blob: Vec<u8> = current_row.get("blob_signed");
290 let mut readable = Cursor::new(blob);
291 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
293 let scid = unsigned_channel_update.short_channel_id;
294 let direction: bool = current_row.get("direction");
296 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
298 // We might be able to get away with not using this
299 (*current_channel_delta).requires_reminder = true;
300 older_latest_directional_update_count += 1;
302 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
303 if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
304 // we don't send reminders if we don't have bidirectional update data
308 if let Some(info) = current_channel_info.one_to_two.as_ref() {
309 let flags: u8 = if info.enabled { 0 } else { 2 };
310 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
311 current_update.serialization_update_flags = Some(flags);
314 if let Some(info) = current_channel_info.two_to_one.as_ref() {
315 let flags: u8 = if info.enabled { 1 } else { 3 };
316 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
317 current_update.serialization_update_flags = Some(flags);
320 // we don't send reminders if we don't have the channel
324 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
327 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
331 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
332 let start = Instant::now();
333 let last_sync_timestamp_float = last_sync_timestamp as f64;
335 // get the latest channel update in each direction prior to last_sync_timestamp, provided
336 // there was an update in either direction that happened after the last sync (to avoid
337 // collecting too many reference updates)
338 let reference_rows = client.query_raw("
339 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
341 SELECT DISTINCT ON (short_channel_id, direction) id
343 WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
344 SELECT DISTINCT ON (short_channel_id) short_channel_id
346 WHERE seen >= TO_TIMESTAMP($1)
348 ORDER BY short_channel_id ASC, direction ASC, seen DESC
350 ", [last_sync_timestamp_float]).await.unwrap();
351 let mut pinned_rows = Box::pin(reference_rows);
353 log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
355 let mut last_seen_update_ids: Vec<i32> = Vec::new();
356 let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
357 let mut reference_row_count = 0;
359 while let Some(row_res) = pinned_rows.next().await {
360 let current_reference = row_res.unwrap();
361 let update_id: i32 = current_reference.get("id");
362 last_seen_update_ids.push(update_id);
363 non_intermediate_ids.insert(update_id);
365 let direction: bool = current_reference.get("direction");
366 let seen = current_reference.get::<_, i64>("seen") as u32;
367 let blob: Vec<u8> = current_reference.get("blob_signed");
368 let mut readable = Cursor::new(blob);
369 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
370 let scid = unsigned_channel_update.short_channel_id;
372 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
373 let update_delta = if !direction {
374 (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
376 (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
378 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
379 update_delta.last_update_before_seen = Some(UpdateDelta {
381 update: unsigned_channel_update,
384 reference_row_count += 1;
387 log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
388 reference_row_count, delta_set.len(), start.elapsed());
390 // get all the intermediate channel updates
391 // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
392 // have been omitted)
394 let intermediate_updates = client.query_raw("
395 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
397 WHERE seen >= TO_TIMESTAMP($1)
398 ORDER BY short_channel_id ASC, timestamp DESC
399 ", [last_sync_timestamp_float]).await.unwrap();
400 let mut pinned_updates = Box::pin(intermediate_updates);
401 log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
403 let mut previous_scid = u64::MAX;
404 let mut previously_seen_directions = (false, false);
406 let mut intermediate_update_count = 0;
407 while let Some(row_res) = pinned_updates.next().await {
408 let intermediate_update = row_res.unwrap();
409 let update_id: i32 = intermediate_update.get("id");
410 if non_intermediate_ids.contains(&update_id) {
413 intermediate_update_count += 1;
415 let direction: bool = intermediate_update.get("direction");
416 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
417 let blob: Vec<u8> = intermediate_update.get("blob_signed");
418 let mut readable = Cursor::new(blob);
419 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
421 let scid = unsigned_channel_update.short_channel_id;
422 if scid != previous_scid {
423 previous_scid = scid;
424 previously_seen_directions = (false, false);
427 // get the write configuration for this particular channel's directional details
428 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
429 let update_delta = if !direction {
430 (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
432 (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
436 // handle the latest deltas
437 if !direction && !previously_seen_directions.0 {
438 previously_seen_directions.0 = true;
439 update_delta.latest_update_after_seen = Some(UpdateDelta {
440 seen: current_seen_timestamp,
441 update: unsigned_channel_update.clone(),
443 } else if direction && !previously_seen_directions.1 {
444 previously_seen_directions.1 = true;
445 update_delta.latest_update_after_seen = Some(UpdateDelta {
446 seen: current_seen_timestamp,
447 update: unsigned_channel_update.clone(),
452 // determine mutations
453 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
454 if unsigned_channel_update.flags != last_seen_update.update.flags {
455 update_delta.mutated_properties.flags = true;
457 if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
458 update_delta.mutated_properties.cltv_expiry_delta = true;
460 if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
461 update_delta.mutated_properties.htlc_minimum_msat = true;
463 if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
464 update_delta.mutated_properties.fee_base_msat = true;
466 if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
467 update_delta.mutated_properties.fee_proportional_millionths = true;
469 if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
470 update_delta.mutated_properties.htlc_maximum_msat = true;
474 log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
477 pub(super) async fn fetch_node_updates<L: Deref>(client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger {
478 let start = Instant::now();
479 let last_sync_timestamp_float = last_sync_timestamp as f64;
481 let mut delta_set = NodeDeltaSet::new();
483 // get the latest node updates prior to last_sync_timestamp
484 let reference_rows = client.query_raw("
485 SELECT DISTINCT ON (public_key) public_key, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, announcement_signed
486 FROM node_announcements
487 WHERE seen < TO_TIMESTAMP($1)
488 ORDER BY public_key ASC, seen DESC
489 ", [last_sync_timestamp_float]).await.unwrap();
490 let mut pinned_rows = Box::pin(reference_rows);
492 log_info!(logger, "Fetched node announcement reference rows in {:?}", start.elapsed());
494 let mut reference_row_count = 0;
496 while let Some(row_res) = pinned_rows.next().await {
497 let current_reference = row_res.unwrap();
499 let seen = current_reference.get::<_, i64>("seen") as u32;
500 let blob: Vec<u8> = current_reference.get("announcement_signed");
501 let mut readable = Cursor::new(blob);
502 let unsigned_node_announcement = NodeAnnouncement::read(&mut readable).unwrap().contents;
503 let node_id = unsigned_node_announcement.node_id;
505 let current_node_delta = delta_set.entry(node_id).or_insert(NodeDelta::default());
506 (*current_node_delta).last_details_before_seen.get_or_insert_with(|| {
507 let address_set: HashSet<SocketAddress> = unsigned_node_announcement.addresses.into_iter().collect();
510 features: unsigned_node_announcement.features,
511 addresses: address_set,
514 log_gossip!(logger, "Node {} last update before seen: {} (seen at {})", node_id, unsigned_node_announcement.timestamp, seen);
516 reference_row_count += 1;
520 log_info!(logger, "Processed {} node announcement reference rows (delta size: {}) in {:?}",
521 reference_row_count, delta_set.len(), start.elapsed());
523 // get all the intermediate node updates
524 // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
525 // have been omitted)
526 let intermediate_updates = client.query_raw("
527 SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
528 FROM node_announcements
529 WHERE seen >= TO_TIMESTAMP($1)
530 ORDER BY public_key ASC, timestamp DESC
531 ", [last_sync_timestamp_float]).await.unwrap();
532 let mut pinned_updates = Box::pin(intermediate_updates);
533 log_info!(logger, "Fetched intermediate node announcement rows in {:?}", start.elapsed());
535 let mut previous_node_id: Option<NodeId> = None;
537 let mut intermediate_update_count = 0;
538 while let Some(row_res) = pinned_updates.next().await {
539 let intermediate_update = row_res.unwrap();
540 intermediate_update_count += 1;
542 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
543 let blob: Vec<u8> = intermediate_update.get("announcement_signed");
544 let mut readable = Cursor::new(blob);
545 let unsigned_node_announcement = NodeAnnouncement::read(&mut readable).unwrap().contents;
547 let node_id = unsigned_node_announcement.node_id;
548 let is_previously_processed_node_id = Some(node_id) == previous_node_id;
550 // get this node's address set
551 let current_node_delta = delta_set.entry(node_id).or_insert(NodeDelta::default());
552 let address_set: HashSet<SocketAddress> = unsigned_node_announcement.addresses.into_iter().collect();
554 // determine mutations
555 if let Some(last_seen_update) = current_node_delta.last_details_before_seen.as_ref() {
556 if unsigned_node_announcement.features != last_seen_update.features {
557 current_node_delta.has_feature_set_changed = true;
559 if address_set != last_seen_update.addresses {
560 current_node_delta.has_address_set_changed = true;
564 if !is_previously_processed_node_id {
565 (*current_node_delta).latest_details_after_seen.get_or_insert(NodeDetails {
566 seen: current_seen_timestamp,
567 features: unsigned_node_announcement.features,
568 addresses: address_set,
572 previous_node_id = Some(node_id);
574 log_info!(logger, "Processed intermediate node announcement rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
579 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
580 let original_length = delta_set.len();
581 let keys: Vec<u64> = delta_set.keys().cloned().collect();
583 let v = delta_set.get(&k).unwrap();
584 if v.announcement.is_none() {
585 // this channel is not currently in the network graph
586 delta_set.remove(&k);
590 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
591 if update.is_none() {
594 let update_reference = update.as_ref().unwrap();
595 // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
596 // if there has been an update after the channel was first seen
598 v.requires_reminder || update_reference.latest_update_after_seen.is_some()
601 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
602 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
604 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
605 delta_set.remove(&k);
609 let new_length = delta_set.len();
610 if original_length != new_length {
611 log_info!(logger, "length modified!");