From: Arik Sosman Date: Fri, 10 May 2024 01:40:37 +0000 (-0700) Subject: Allow passing a timestamp override to snapshot calculation. X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=405ab6a86d1b53730dd8e5969bbf1eb5580e0787;p=rapid-gossip-sync-server Allow passing a timestamp override to snapshot calculation. --- diff --git a/src/lib.rs b/src/lib.rs index 25d1308..8724ef1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ use std::fs::File; use std::io::BufReader; use std::ops::Deref; use std::sync::Arc; +use std::time::SystemTime; use bitcoin::blockdata::constants::ChainHash; use lightning::log_info; @@ -171,7 +172,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { blob } -async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32, logger: L) -> SerializedResponse where L::Target: Logger { +async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32, snapshot_calculation_time: Option, logger: L) -> SerializedResponse where L::Target: Logger { let client = connect_to_db().await; network_graph.remove_stale_channels_and_tracking(); @@ -200,7 +201,7 @@ async fn serialize_delta(network_graph: Arc>, }; let mut delta_set = DeltaSet::new(); - lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, logger.clone()).await; + lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_calculation_time, logger.clone()).await; log_info!(logger, "announcement channel count: {}", delta_set.len()); lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await; log_info!(logger, "update-fetched channel count: {}", delta_set.len()); diff --git a/src/lookup.rs b/src/lookup.rs index 5504279..76b7c20 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -76,7 +76,7 @@ impl Default for DirectedUpdateDelta { /// whether they had been seen before. /// Also include all announcements for which the first update was announced /// after `last_sync_timestamp` -pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger { +pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32, snapshot_calculation_time: Option, logger: L) where L::Target: Logger { log_info!(logger, "Obtaining channel ids from network graph"); let channel_ids = { let read_only_graph = network_graph.read_only(); @@ -92,15 +92,27 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp); let last_sync_timestamp_float = last_sync_timestamp as f64; - let current_time = SystemTime::now(); + let current_time = snapshot_calculation_time.unwrap_or(SystemTime::now()); let current_timestamp = current_time.duration_since(UNIX_EPOCH).unwrap().as_secs(); log_info!(logger, "Current timestamp: {}", current_timestamp); - let current_day = current_timestamp / (24 * 3600); - let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64); - log_debug!(logger, "Current day index: {}", current_day); - log_debug!(logger, "Snapshot scope: {}s", snapshot_scope); - let include_reminders = ((current_day % 5) == 0 || snapshot_scope > (40 * 3600)); + let include_reminders = { + let current_hour = current_timestamp / 3600; + let current_day = current_timestamp / (24 * 3600); + + log_debug!(logger, "Current day index: {}", current_day); + log_debug!(logger, "Current hour: {}", current_hour); + + // anytime between 11pm and 1am + let is_reminder_hour = current_hour < 2 || current_hour > 22; + let is_reminder_day = (current_day % 5) == 0; + + let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64); + let is_reminder_scope = snapshot_scope > (40 * 3600); + log_debug!(logger, "Snapshot scope: {}s", snapshot_scope); + + (is_reminder_hour && is_reminder_day) || is_reminder_scope + }; log_info!(logger, "Obtaining corresponding database entries"); // get all the channel announcements that are currently in the network graph diff --git a/src/snapshot.rs b/src/snapshot.rs index 07427a6..ed4a0e7 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -70,7 +70,8 @@ impl Snapshotter where L::Target: Logger { let relative_symlink_to_snapshot_path = "../snapshots"; // 1. get the current timestamp - let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + let snapshot_generation_time = SystemTime::now(); + let snapshot_generation_timestamp = snapshot_generation_time.duration_since(UNIX_EPOCH).unwrap().as_secs(); let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64); log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp); @@ -113,7 +114,7 @@ impl Snapshotter where L::Target: Logger { { log_info!(self.logger, "Calculating {}-second snapshot", current_scope); // calculate the snapshot - let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await; + let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, Some(snapshot_generation_time), self.logger.clone()).await; // persist the snapshot and update the symlink let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp); diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 33edf62..e668f0b 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -220,7 +220,7 @@ async fn test_trivial_setup() { persister.persist_gossip().await; } - let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await; logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); clean_test_db().await; @@ -305,7 +305,7 @@ async fn test_unidirectional_intermediate_update_consideration() { let client_graph_arc = Arc::new(client_graph); let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); - let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await; + let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await; logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1); logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1); @@ -372,7 +372,7 @@ async fn test_bidirectional_intermediate_update_consideration() { let channel_count = network_graph_arc.read_only().channels().len(); assert_eq!(channel_count, 1); - let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await; + let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await; logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1); logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1); @@ -455,7 +455,7 @@ async fn test_channel_reminders() { let channel_count = network_graph_arc.read_only().channels().len(); assert_eq!(channel_count, 2); - let serialization = serialize_delta(network_graph_arc.clone(), timestamp - channel_reminder_delta + 15, logger.clone()).await; + let serialization = serialize_delta(network_graph_arc.clone(), timestamp - channel_reminder_delta + 15, None, logger.clone()).await; logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1); logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 4 update rows of the latest update in the less recently updated direction", 1); @@ -524,7 +524,7 @@ async fn test_full_snapshot_recency() { let client_graph_arc = Arc::new(client_graph); { // sync after initial seed - let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await; logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); let channel_count = network_graph_arc.read_only().channels().len(); @@ -604,7 +604,7 @@ async fn test_full_snapshot_recency_with_wrong_seen_order() { let client_graph_arc = Arc::new(client_graph); { // sync after initial seed - let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await; logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); let channel_count = network_graph_arc.read_only().channels().len(); @@ -683,7 +683,7 @@ async fn test_full_snapshot_recency_with_wrong_propagation_order() { let client_graph_arc = Arc::new(client_graph); { // sync after initial seed - let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await; logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); let channel_count = network_graph_arc.read_only().channels().len(); @@ -816,7 +816,7 @@ async fn test_full_snapshot_mutiny_scenario() { let client_graph_arc = Arc::new(client_graph); { // sync after initial seed - let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await; logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); let channel_count = network_graph_arc.read_only().channels().len(); @@ -929,7 +929,7 @@ async fn test_full_snapshot_interlaced_channel_timestamps() { let client_graph_arc = Arc::new(client_graph); { // sync after initial seed - let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await; logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1); let channel_count = network_graph_arc.read_only().channels().len();