Remove unused `consider_intermediate_updates` flag..optimizing query
authorMatt Corallo <git@bluematt.me>
Sat, 15 Jul 2023 07:01:16 +0000 (07:01 +0000)
committerMatt Corallo <git@bluematt.me>
Sun, 16 Jul 2023 05:58:32 +0000 (05:58 +0000)
The `consider_intermediate_updates` flag is always set, and must be
set for correctness, so we remove it. Further, we optimize the
query that hung on it somewhat by removing an uneccessary
`ORDER BY` clause which was only neccessary if
`consider_intermediate_updates` were unset.

src/config.rs
src/lib.rs
src/lookup.rs
src/snapshot.rs

index 2d93487262152bb1dd728f3c56d10ad560fa17f3..231632ab7d93c758a21f475ea72b9658a3f7bf97 100644 (file)
@@ -104,6 +104,7 @@ pub(crate) fn db_channel_update_table_creation_query() -> &'static str {
 
 pub(crate) fn db_index_creation_query() -> &'static str {
        "
+       CREATE INDEX IF NOT EXISTS channel_updates_seen_with_id_direction_blob ON channel_updates(seen) INCLUDE (id, direction, blob_signed);
        CREATE INDEX IF NOT EXISTS channel_updates_seen_scid ON channel_updates(seen, short_channel_id);
        CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_asc ON channel_updates(short_channel_id, direction, seen);
        CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_desc_with_id ON channel_updates(short_channel_id ASC, direction ASC, seen DESC) INCLUDE (id);
index 1115568025e213885af30372a93cc652b8e40494..37bbe4cb0338170998acef7ae0032c7056f7ea86 100644 (file)
@@ -136,7 +136,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
        blob
 }
 
-async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32, consider_intermediate_updates: bool) -> SerializedResponse {
+async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32) -> SerializedResponse {
        let (client, connection) = lookup::connect_to_db().await;
 
        network_graph.remove_stale_channels_and_tracking();
@@ -172,7 +172,7 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync
        let mut delta_set = DeltaSet::new();
        lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await;
        println!("announcement channel count: {}", delta_set.len());
-       lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, consider_intermediate_updates).await;
+       lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp).await;
        println!("update-fetched channel count: {}", delta_set.len());
        lookup::filter_delta_set(&mut delta_set);
        println!("update-filtered channel count: {}", delta_set.len());
index 5969b2a10bdbb0727f39c7a7a297c18a810fc02b..0534c1c8a7e82922bc240eab0be214f06d87eec8 100644 (file)
@@ -201,7 +201,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
        }
 }
 
-pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, consider_intermediate_updates: bool) {
+pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32) {
        let start = Instant::now();
        let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
 
@@ -253,18 +253,11 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
        // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
        // have been omitted)
 
-       let mut intermediate_update_prefix = "";
-       if !consider_intermediate_updates {
-               intermediate_update_prefix = "DISTINCT ON (short_channel_id, direction)";
-       }
-
-       let query_string = format!("
-               SELECT {} id, direction, blob_signed, seen
+       let intermediate_updates = client.query("
+               SELECT id, direction, blob_signed, seen
                FROM channel_updates
                WHERE seen >= $1
-               ORDER BY short_channel_id ASC, direction ASC, seen DESC
-               ", intermediate_update_prefix);
-       let intermediate_updates = client.query(&query_string, &[&last_sync_timestamp_object]).await.unwrap();
+               ", &[&last_sync_timestamp_object]).await.unwrap();
        println!("Fetched intermediate rows ({}): {:?}", intermediate_updates.len(), start.elapsed());
 
        let mut previous_scid = u64::MAX;
index bbe94a93618037a227dac3cad4aab0c815658b78..ac800795bf6de11125818ed49da813320ff3b938 100644 (file)
@@ -77,7 +77,7 @@ impl Snapshotter {
                                {
                                        println!("Calculating {}-day snapshot", day_range);
                                        // calculate the snapshot
-                                       let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, true).await;
+                                       let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32).await;
 
                                        // persist the snapshot and update the symlink
                                        let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);