// for announcement-free incremental-only updates, chain hash can be skipped
let mut delta_set = DeltaSet::new();
- lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
+ lookup::fetch_channel_announcements(&mut delta_set, Arc::clone(&network_graph), &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
log_info!(logger, "announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched channel count: {}", delta_set.len());
- let node_delta_set = lookup::fetch_node_updates(&client, last_sync_timestamp, logger.clone()).await;
+ let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched node count: {}", node_delta_set.len());
lookup::filter_delta_set(&mut delta_set, logger.clone());
log_info!(logger, "update-filtered channel count: {}", delta_set.len());
use tokio_postgres::Client;
use futures::StreamExt;
+use hex_conservative::DisplayHex;
use lightning::{log_debug, log_gossip, log_info};
use lightning::ln::features::NodeFeatures;
use lightning::util::logger::Logger;
log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
}
-pub(super) async fn fetch_node_updates<L: Deref>(client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger {
+pub(super) async fn fetch_node_updates<L: Deref>(network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger {
let start = Instant::now();
let last_sync_timestamp_float = last_sync_timestamp as f64;
- let mut delta_set = NodeDeltaSet::new();
+ let mut delta_set: NodeDeltaSet = {
+ let read_only_graph = network_graph.read_only();
+ read_only_graph.nodes().unordered_iter().flat_map(|(node_id, node_info)| {
+ let details: NodeDetails = if let Some(details) = node_info.announcement_info.as_ref() {
+ NodeDetails {
+ seen: 0,
+ features: details.features().clone(),
+ addresses: details.addresses().into_iter().cloned().collect(),
+ }
+ } else {
+ return None;
+ };
+ Some((node_id.clone(), NodeDelta {
+ latest_details: Some(details),
+ has_feature_set_changed: false,
+ has_address_set_changed: false,
+ last_details_before_seen: None,
+ }))
+ }).collect()
+ };
+
+ let node_ids: Vec<String> = delta_set.keys().into_iter().map(|id| id.as_slice().to_lower_hex_string()).collect();
+ #[cfg(test)]
+ log_info!(logger, "Node IDs: {:?}", node_ids);
// get the latest node updates prior to last_sync_timestamp
+ let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float];
let reference_rows = client.query_raw("
SELECT DISTINCT ON (public_key) public_key, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, announcement_signed
FROM node_announcements
- WHERE seen < TO_TIMESTAMP($1)
+ WHERE
+ public_key = ANY($1) AND
+ seen < TO_TIMESTAMP($2)
ORDER BY public_key ASC, seen DESC
- ", [last_sync_timestamp_float]).await.unwrap();
+ ", params).await.unwrap();
let mut pinned_rows = Box::pin(reference_rows);
log_info!(logger, "Fetched node announcement reference rows in {:?}", start.elapsed());
// get all the intermediate node updates
// (to calculate the set of mutated fields for snapshotting, where intermediate updates may
// have been omitted)
+ let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float];
let intermediate_updates = client.query_raw("
SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
FROM node_announcements
- WHERE seen >= TO_TIMESTAMP($1)
+ WHERE
+ public_key = ANY($1) AND
+ seen >= TO_TIMESTAMP($2)
ORDER BY public_key ASC, timestamp DESC
- ", [last_sync_timestamp_float]).await.unwrap();
+ ", params).await.unwrap();
let mut pinned_updates = Box::pin(intermediate_updates);
log_info!(logger, "Fetched intermediate node announcement rows in {:?}", start.elapsed());