From: Arik Sosman Date: Tue, 2 Jul 2024 07:20:19 +0000 (-0700) Subject: Only consider node announcements from current graph. X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=63aaf6111213a74386a3cbc76ec7e2bf6b0b7f9c;p=rapid-gossip-sync-server Only consider node announcements from current graph. We want to ignore any node announcements that have already been pruned. To do so, we extract all the node IDs from the network graph, and use those to filter our queries. --- diff --git a/src/lib.rs b/src/lib.rs index 9b66e73..21e21d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -187,11 +187,11 @@ async fn calculate_delta(network_graph: Arc>, // for announcement-free incremental-only updates, chain hash can be skipped let mut delta_set = DeltaSet::new(); - lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; + lookup::fetch_channel_announcements(&mut delta_set, Arc::clone(&network_graph), &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; log_info!(logger, "announcement channel count: {}", delta_set.len()); lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await; log_info!(logger, "update-fetched channel count: {}", delta_set.len()); - let node_delta_set = lookup::fetch_node_updates(&client, last_sync_timestamp, logger.clone()).await; + let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, logger.clone()).await; log_info!(logger, "update-fetched node count: {}", node_delta_set.len()); lookup::filter_delta_set(&mut delta_set, logger.clone()); log_info!(logger, "update-filtered channel count: {}", delta_set.len()); diff --git a/src/lookup.rs b/src/lookup.rs index 9a584c4..209e5ec 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -11,6 +11,7 @@ use lightning::util::ser::Readable; use tokio_postgres::Client; use futures::StreamExt; +use hex_conservative::DisplayHex; use lightning::{log_debug, log_gossip, log_info}; use lightning::ln::features::NodeFeatures; use lightning::util::logger::Logger; @@ -475,19 +476,45 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); } -pub(super) async fn fetch_node_updates(client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger { +pub(super) async fn fetch_node_updates(network_graph: Arc>, client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger { let start = Instant::now(); let last_sync_timestamp_float = last_sync_timestamp as f64; - let mut delta_set = NodeDeltaSet::new(); + let mut delta_set: NodeDeltaSet = { + let read_only_graph = network_graph.read_only(); + read_only_graph.nodes().unordered_iter().flat_map(|(node_id, node_info)| { + let details: NodeDetails = if let Some(details) = node_info.announcement_info.as_ref() { + NodeDetails { + seen: 0, + features: details.features().clone(), + addresses: details.addresses().into_iter().cloned().collect(), + } + } else { + return None; + }; + Some((node_id.clone(), NodeDelta { + latest_details: Some(details), + has_feature_set_changed: false, + has_address_set_changed: false, + last_details_before_seen: None, + })) + }).collect() + }; + + let node_ids: Vec = delta_set.keys().into_iter().map(|id| id.as_slice().to_lower_hex_string()).collect(); + #[cfg(test)] + log_info!(logger, "Node IDs: {:?}", node_ids); // get the latest node updates prior to last_sync_timestamp + let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float]; let reference_rows = client.query_raw(" SELECT DISTINCT ON (public_key) public_key, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, announcement_signed FROM node_announcements - WHERE seen < TO_TIMESTAMP($1) + WHERE + public_key = ANY($1) AND + seen < TO_TIMESTAMP($2) ORDER BY public_key ASC, seen DESC - ", [last_sync_timestamp_float]).await.unwrap(); + ", params).await.unwrap(); let mut pinned_rows = Box::pin(reference_rows); log_info!(logger, "Fetched node announcement reference rows in {:?}", start.elapsed()); @@ -524,12 +551,15 @@ pub(super) async fn fetch_node_updates(client: &Client, last_sync_time // get all the intermediate node updates // (to calculate the set of mutated fields for snapshotting, where intermediate updates may // have been omitted) + let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float]; let intermediate_updates = client.query_raw(" SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM node_announcements - WHERE seen >= TO_TIMESTAMP($1) + WHERE + public_key = ANY($1) AND + seen >= TO_TIMESTAMP($2) ORDER BY public_key ASC, timestamp DESC - ", [last_sync_timestamp_float]).await.unwrap(); + ", params).await.unwrap(); let mut pinned_updates = Box::pin(intermediate_updates); log_info!(logger, "Fetched intermediate node announcement rows in {:?}", start.elapsed());