]> git.bitcoin.ninja Git - rapid-gossip-sync-server/commitdiff
Only consider node announcements from current graph.
authorArik Sosman <git@arik.io>
Tue, 2 Jul 2024 07:20:19 +0000 (00:20 -0700)
committerArik Sosman <git@arik.io>
Wed, 18 Sep 2024 17:07:23 +0000 (10:07 -0700)
We want to ignore any node announcements that have already been
pruned. To do so, we extract all the node IDs from the network graph,
and use those to filter our queries.

src/lib.rs
src/lookup.rs

index 9b66e7354d4dfb6b49720bb974b794f742855ba4..21e21d674afc391cd209a5406bbdc7f71ef0294c 100644 (file)
@@ -187,11 +187,11 @@ async fn calculate_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
        // for announcement-free incremental-only updates, chain hash can be skipped
 
        let mut delta_set = DeltaSet::new();
-       lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
+       lookup::fetch_channel_announcements(&mut delta_set, Arc::clone(&network_graph), &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
        log_info!(logger, "announcement channel count: {}", delta_set.len());
        lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
        log_info!(logger, "update-fetched channel count: {}", delta_set.len());
-       let node_delta_set = lookup::fetch_node_updates(&client, last_sync_timestamp, logger.clone()).await;
+       let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, logger.clone()).await;
        log_info!(logger, "update-fetched node count: {}", node_delta_set.len());
        lookup::filter_delta_set(&mut delta_set, logger.clone());
        log_info!(logger, "update-filtered channel count: {}", delta_set.len());
index 9a584c41ba47bb2f63fe3f608010de1be1b250d2..209e5ec1956837183d56358128c6b27ba66a2b18 100644 (file)
@@ -11,6 +11,7 @@ use lightning::util::ser::Readable;
 use tokio_postgres::Client;
 
 use futures::StreamExt;
+use hex_conservative::DisplayHex;
 use lightning::{log_debug, log_gossip, log_info};
 use lightning::ln::features::NodeFeatures;
 use lightning::util::logger::Logger;
@@ -475,19 +476,45 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
        log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
 }
 
-pub(super) async fn fetch_node_updates<L: Deref>(client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger {
+pub(super) async fn fetch_node_updates<L: Deref>(network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger {
        let start = Instant::now();
        let last_sync_timestamp_float = last_sync_timestamp as f64;
 
-       let mut delta_set = NodeDeltaSet::new();
+       let mut delta_set: NodeDeltaSet = {
+               let read_only_graph = network_graph.read_only();
+               read_only_graph.nodes().unordered_iter().flat_map(|(node_id, node_info)| {
+                       let details: NodeDetails = if let Some(details) = node_info.announcement_info.as_ref() {
+                               NodeDetails {
+                                       seen: 0,
+                                       features: details.features().clone(),
+                                       addresses: details.addresses().into_iter().cloned().collect(),
+                               }
+                       } else {
+                               return None;
+                       };
+                       Some((node_id.clone(), NodeDelta {
+                               latest_details: Some(details),
+                               has_feature_set_changed: false,
+                               has_address_set_changed: false,
+                               last_details_before_seen: None,
+                       }))
+               }).collect()
+       };
+
+       let node_ids: Vec<String> = delta_set.keys().into_iter().map(|id| id.as_slice().to_lower_hex_string()).collect();
+       #[cfg(test)]
+       log_info!(logger, "Node IDs: {:?}", node_ids);
 
        // get the latest node updates prior to last_sync_timestamp
+       let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float];
        let reference_rows = client.query_raw("
                SELECT DISTINCT ON (public_key) public_key, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, announcement_signed
                FROM node_announcements
-               WHERE seen < TO_TIMESTAMP($1)
+               WHERE
+                       public_key = ANY($1) AND
+                       seen < TO_TIMESTAMP($2)
                ORDER BY public_key ASC, seen DESC
-               ", [last_sync_timestamp_float]).await.unwrap();
+               ", params).await.unwrap();
        let mut pinned_rows = Box::pin(reference_rows);
 
        log_info!(logger, "Fetched node announcement reference rows in {:?}", start.elapsed());
@@ -524,12 +551,15 @@ pub(super) async fn fetch_node_updates<L: Deref>(client: &Client, last_sync_time
        // get all the intermediate node updates
        // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
        // have been omitted)
+       let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float];
        let intermediate_updates = client.query_raw("
                SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
                FROM node_announcements
-               WHERE seen >= TO_TIMESTAMP($1)
+               WHERE
+                       public_key = ANY($1) AND
+                       seen >= TO_TIMESTAMP($2)
                ORDER BY public_key ASC, timestamp DESC
-               ", [last_sync_timestamp_float]).await.unwrap();
+               ", params).await.unwrap();
        let mut pinned_updates = Box::pin(intermediate_updates);
        log_info!(logger, "Fetched intermediate node announcement rows in {:?}", start.elapsed());