pub(crate) fn db_index_creation_query() -> &'static str {
"
+ CREATE INDEX IF NOT EXISTS channel_updates_seen_with_id_direction_blob ON channel_updates(seen) INCLUDE (id, direction, blob_signed);
CREATE INDEX IF NOT EXISTS channel_updates_seen_scid ON channel_updates(seen, short_channel_id);
CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_asc ON channel_updates(short_channel_id, direction, seen);
CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_desc_with_id ON channel_updates(short_channel_id ASC, direction ASC, seen DESC) INCLUDE (id);
blob
}
-async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32, consider_intermediate_updates: bool) -> SerializedResponse {
+async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32) -> SerializedResponse {
let (client, connection) = lookup::connect_to_db().await;
network_graph.remove_stale_channels_and_tracking();
let mut delta_set = DeltaSet::new();
lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await;
println!("announcement channel count: {}", delta_set.len());
- lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, consider_intermediate_updates).await;
+ lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp).await;
println!("update-fetched channel count: {}", delta_set.len());
lookup::filter_delta_set(&mut delta_set);
println!("update-filtered channel count: {}", delta_set.len());
}
}
-pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, consider_intermediate_updates: bool) {
+pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32) {
let start = Instant::now();
let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
// (to calculate the set of mutated fields for snapshotting, where intermediate updates may
// have been omitted)
- let mut intermediate_update_prefix = "";
- if !consider_intermediate_updates {
- intermediate_update_prefix = "DISTINCT ON (short_channel_id, direction)";
- }
-
- let query_string = format!("
- SELECT {} id, direction, blob_signed, seen
+ let intermediate_updates = client.query("
+ SELECT id, direction, blob_signed, seen
FROM channel_updates
WHERE seen >= $1
- ORDER BY short_channel_id ASC, direction ASC, seen DESC
- ", intermediate_update_prefix);
- let intermediate_updates = client.query(&query_string, &[&last_sync_timestamp_object]).await.unwrap();
+ ", &[&last_sync_timestamp_object]).await.unwrap();
println!("Fetched intermediate rows ({}): {:?}", intermediate_updates.len(), start.elapsed());
let mut previous_scid = u64::MAX;
{
println!("Calculating {}-day snapshot", day_range);
// calculate the snapshot
- let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, true).await;
+ let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32).await;
// persist the snapshot and update the symlink
let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);