A config file where the Postgres credentials and Lightning peers can be adjusted. Most adjustments
can be made by setting environment variables, whose usage is as follows:
-| Name | Default | Description |
-|:-------------------------------------|:--------------------|:-----------------------------------------------------------------------------------------------------------|
-| RAPID_GOSSIP_SYNC_SERVER_DB_HOST | localhost | Domain of the Postgres database |
-| RAPID_GOSSIP_SYNC_SERVER_DB_USER | alice | Username to access Postgres |
-| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD | _None_ | Password to access Postgres |
-| RAPID_GOSSIP_SYNC_SERVER_DB_NAME | ln_graph_sync | Name of the database to be used for gossip storage |
-| RAPID_GOSSIP_SYNC_SERVER_NETWORK | mainnet | Network to operate in. Possible values are mainnet, testnet, signet, regtest |
-| BITCOIN_REST_DOMAIN | 127.0.0.1 | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
-| BITCOIN_REST_PORT | 8332 | HTTP port of the bitcoind REST server |
-| BITCOIN_REST_PATH | /rest/ | Path infix to access the bitcoind REST endpoints |
-| LN_PEERS | _Wallet of Satoshi_ | Comma separated list of LN peers to use for retrieving gossip |
+| Name | Default | Description |
+|:-------------------------------------------|:--------------------|:-----------------------------------------------------------------------------------------------------------|
+| RAPID_GOSSIP_SYNC_SERVER_DB_HOST | localhost | Domain of the Postgres database |
+| RAPID_GOSSIP_SYNC_SERVER_DB_USER | alice | Username to access Postgres |
+| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD | _None_ | Password to access Postgres |
+| RAPID_GOSSIP_SYNC_SERVER_DB_NAME | ln_graph_sync | Name of the database to be used for gossip storage |
+| RAPID_GOSSIP_SYNC_SERVER_NETWORK | mainnet | Network to operate in. Possible values are mainnet, testnet, signet, regtest |
+| RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL | 10800 | The interval in seconds between snapshots |
+| BITCOIN_REST_DOMAIN | 127.0.0.1 | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
+| BITCOIN_REST_PORT | 8332 | HTTP port of the bitcoind REST server |
+| BITCOIN_REST_PATH | /rest/ | Path infix to access the bitcoind REST endpoints |
+| LN_PEERS | _Wallet of Satoshi_ | Comma separated list of LN peers to use for retrieving gossip |
### downloader
The snapshotting module is responsible for calculating and storing snapshots. It's started up
as soon as the first full graph sync completes, and then keeps updating the snapshots at a
-24-hour-interval.
+configurable interval with a 3-hour-default.
### lookup
use tokio_postgres::Config;
pub(crate) const SCHEMA_VERSION: i32 = 12;
-pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds
+pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
+pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
+// generate symlinks based on a 3-hour-granularity
/// If the last update in either direction was more than six days ago, we send a reminder
/// That reminder may be either in the form of a channel announcement, or in the form of empty
/// updates in both directions.
pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60);
pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;
+pub(crate) fn snapshot_generation_interval() -> u32 {
+ let interval = env::var("RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL").unwrap_or(SYMLINK_GRANULARITY_INTERVAL.to_string())
+ .parse::<u32>()
+ .expect("RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL env variable must be a u32.");
+ assert!(interval > 0, "RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL must be positive");
+ assert_eq!(interval % SYMLINK_GRANULARITY_INTERVAL, 0, "RAPID_GOSSIP_SYNC_SERVER_SNAPSHOT_INTERVAL must be a multiple of {} (seconds)", SYMLINK_GRANULARITY_INTERVAL);
+ interval
+}
+
pub(crate) fn network() -> Network {
let network = env::var("RAPID_GOSSIP_SYNC_SERVER_NETWORK").unwrap_or("bitcoin".to_string()).to_lowercase();
match network.as_str() {
tx_ref.execute("UPDATE channel_updates SET short_channel_id = $1, direction = $2 WHERE id = $3", &[&scid, &direction, &id]).await.unwrap();
});
}
- while let Some(_) = updates.next().await { }
+ while let Some(_) = updates.next().await {}
}
tx.execute("ALTER TABLE channel_updates ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap();
tx.execute("ALTER TABLE channel_updates ALTER short_channel_id SET NOT NULL", &[]).await.unwrap();
tx_ref.execute("UPDATE channel_announcements SET short_channel_id = $1 WHERE id = $2", &[&scid, &id]).await.unwrap();
});
}
- while let Some(_) = updates.next().await { }
+ while let Some(_) = updates.next().await {}
}
tx.execute("ALTER TABLE channel_announcements ADD CONSTRAINT channel_announcements_short_channel_id_key UNIQUE (short_channel_id)", &[]).await.unwrap();
tx.execute("ALTER TABLE channel_announcements ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap();
pub(crate) struct Snapshotter<L: Deref + Clone> where L::Target: Logger {
network_graph: Arc<NetworkGraph<L>>,
- logger: L
+ logger: L,
}
impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
pub(crate) async fn snapshot_gossip(&self) {
log_info!(self.logger, "Initiating snapshotting service");
- let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
- const DAY_SECONDS: u64 = 60 * 60 * 24;
+ let snapshot_interval = config::snapshot_generation_interval() as u64;
+ let mut snapshot_scopes = vec![];
+ { // double the coefficient until it reaches the maximum (limited) snapshot scope
+ let mut current_scope = snapshot_interval;
+ loop {
+ snapshot_scopes.push(current_scope);
+ if current_scope >= config::MAX_SNAPSHOT_SCOPE as u64 {
+ snapshot_scopes.push(u64::MAX);
+ break;
+ }
+
+ // double the current factor
+ current_scope <<= 1;
+ }
+ }
let pending_snapshot_directory = format!("{}/snapshots_pending", cache_path());
let pending_symlink_directory = format!("{}/symlinks_pending", cache_path());
loop {
// 1. get the current timestamp
let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
- let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64);
+ let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval as u64);
log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
- // 2. sleep until the next round 24 hours
+ // 2. sleep until the next round interval
// 3. refresh all snapshots
// the stored snapshots should adhere to the following format
// channel updates
// purge and recreate the pending directories
- if fs::metadata(&pending_snapshot_directory).is_ok(){
+ if fs::metadata(&pending_snapshot_directory).is_ok() {
fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
}
- if fs::metadata(&pending_symlink_directory).is_ok(){
+ if fs::metadata(&pending_symlink_directory).is_ok() {
fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
}
fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
- for factor in &snapshot_sync_day_factors {
- // basically timestamp - day_seconds * factor
- let timestamp = reference_timestamp.saturating_sub(DAY_SECONDS.saturating_mul(factor.clone()));
- snapshot_sync_timestamps.push((factor.clone(), timestamp));
+ for current_scope in &snapshot_scopes {
+ let timestamp = reference_timestamp.saturating_sub(current_scope.clone());
+ snapshot_sync_timestamps.push((current_scope.clone(), timestamp));
};
- let mut snapshot_filenames_by_day_range: HashMap<u64, String> = HashMap::with_capacity(10);
+ let mut snapshot_filenames_by_scope: HashMap<u64, String> = HashMap::with_capacity(10);
- for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
+ for (current_scope, current_last_sync_timestamp) in &snapshot_sync_timestamps {
let network_graph_clone = self.network_graph.clone();
{
- log_info!(self.logger, "Calculating {}-day snapshot", day_range);
+ log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
// calculate the snapshot
let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await;
// persist the snapshot and update the symlink
- let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp);
+ let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);
let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
- log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
+ log_info!(self.logger, "Persisting {}-second snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", current_scope, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
fs::write(&snapshot_path, snapshot.data).unwrap();
- snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
+ snapshot_filenames_by_scope.insert(current_scope.clone(), snapshot_filename);
}
}
}
// Number of intervals since Jan 1, 2022, a few months before RGS server was released.
- let symlink_count = (reference_timestamp - 1640995200) / config::SNAPSHOT_CALCULATION_INTERVAL as u64;
+ let symlink_count = (reference_timestamp - 1640995200) / config::SYMLINK_GRANULARITY_INTERVAL as u64;
for i in 0..symlink_count {
// let's create non-dummy-symlinks
// first, determine which snapshot range should be referenced
- let referenced_day_range = if i == 0 {
+ let referenced_scope = if i == 0 {
// special-case 0 to always refer to a full/initial sync
u64::MAX
} else {
return on the first iteration that is at least equal to the requested interval.
Note, however, that the last value in the array is u64::max, which means that
- multiplying it with DAY_SECONDS will overflow. To avoid that, we use
+ multiplying it with snapshot_interval will overflow. To avoid that, we use
saturating_mul.
*/
- // find min(x) in snapshot_sync_day_factors where x >= i
- snapshot_sync_day_factors.iter().find(|x| {
- DAY_SECONDS.saturating_mul(**x) >= i * config::SNAPSHOT_CALCULATION_INTERVAL as u64
+ // find min(x) in snapshot_scopes where i * granularity <= x (the current scope)
+ snapshot_scopes.iter().find(|current_scope| {
+ i * config::SYMLINK_GRANULARITY_INTERVAL as u64 <= **current_scope
}).unwrap().clone()
};
- log_info!(self.logger, "i: {}, referenced day range: {}", i, referenced_day_range);
+ log_info!(self.logger, "i: {}, referenced scope: {}", i, referenced_scope);
- let snapshot_filename = snapshot_filenames_by_day_range.get(&referenced_day_range).unwrap();
+ let snapshot_filename = snapshot_filenames_by_scope.get(&referenced_scope).unwrap();
let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
let canonical_last_sync_timestamp = if i == 0 {
// special-case 0 to always refer to a full/initial sync
0
} else {
- reference_timestamp.saturating_sub((config::SNAPSHOT_CALCULATION_INTERVAL as u64).saturating_mul(i))
+ reference_timestamp.saturating_sub((config::SYMLINK_GRANULARITY_INTERVAL as u64).saturating_mul(i))
};
let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
- log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
+ log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_scope, symlink_path, relative_snapshot_path);
symlink(&relative_snapshot_path, &symlink_path).unwrap();
}
let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
fs::write(&update_time_path, format!("{}", update_time)).unwrap();
- if fs::metadata(&finalized_snapshot_directory).is_ok(){
+ if fs::metadata(&finalized_snapshot_directory).is_ok() {
fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
}
- if fs::metadata(&finalized_symlink_directory).is_ok(){
+ if fs::metadata(&finalized_symlink_directory).is_ok() {
fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
}
fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
// constructing the snapshots may have taken a while
let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
- let remainder = current_time % config::SNAPSHOT_CALCULATION_INTERVAL as u64;
- let time_until_next_day = config::SNAPSHOT_CALCULATION_INTERVAL as u64 - remainder;
- log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day);
+ // NOTE: we're waiting until the next multiple of snapshot_interval
+ // however, if the symlink granularity is lower, then during that time, no intermediate
+ // symlinks will be generated. That should be ok, because any timestamps previously
+ // returned would already have generated symlinks, but this does have bug potential
+ let remainder = current_time % snapshot_interval;
+ let time_until_next_generation = snapshot_interval - remainder;
+
+ log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_generation);
// add in an extra five seconds to assure the rounding down works correctly
- let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
+ let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_generation + 5));
sleep.await;
}
}