use std::env;
use std::io::Cursor;
use std::net::{SocketAddr, ToSocketAddrs};
+use std::time::Duration;
use bitcoin::Network;
use bitcoin::hashes::hex::FromHex;
use lightning_block_sync::http::HttpEndpoint;
use tokio_postgres::Config;
-pub(crate) const SCHEMA_VERSION: i32 = 8;
+pub(crate) const SCHEMA_VERSION: i32 = 9;
pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds
/// If the last update in either direction was more than six days ago, we send a reminder
/// That reminder may be either in the form of a channel announcement, or in the form of empty
/// updates in both directions.
-pub(crate) const CHANNEL_REMINDER_AGE: u32 = 6 * 24 * 3600;
+pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60);
pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;
pub(crate) fn network() -> Network {
pub(crate) fn db_index_creation_query() -> &'static str {
"
- CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen, short_channel_id, direction) INCLUDE (id, blob_signed);
- CREATE INDEX IF NOT EXISTS channel_updates_scid_seen ON channel_updates(short_channel_id, seen) INCLUDE (blob_signed);
CREATE INDEX IF NOT EXISTS channel_updates_seen_scid ON channel_updates(seen, short_channel_id);
CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen ON channel_updates(short_channel_id ASC, direction ASC, seen DESC) INCLUDE (id, blob_signed);
+ CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_asc ON channel_updates(short_channel_id, direction, seen);
CREATE UNIQUE INDEX IF NOT EXISTS channel_updates_key ON channel_updates (short_channel_id, direction, timestamp);
"
}
tx.execute("UPDATE config SET db_schema = 8 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
+ if schema >= 1 && schema <= 8 {
+ let tx = client.transaction().await.unwrap();
+ tx.execute("DROP INDEX channel_updates_seen", &[]).await.unwrap();
+ tx.execute("DROP INDEX channel_updates_scid_seen", &[]).await.unwrap();
+ tx.execute("UPDATE config SET db_schema = 9 WHERE id = 1", &[]).await.unwrap();
+ tx.commit().await.unwrap();
+ }
if schema <= 1 || schema > SCHEMA_VERSION {
panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
}
use crate::lookup::DeltaSet;
use crate::persistence::GossipPersister;
-use crate::serialization::UpdateSerializationMechanism;
+use crate::serialization::UpdateSerialization;
use crate::snapshot::Snapshotter;
use crate::types::TestLogger;
let mut update_count_full = 0;
let mut update_count_incremental = 0;
for current_update in serialization_details.updates {
- match ¤t_update.mechanism {
- UpdateSerializationMechanism::Full => {
+ match ¤t_update {
+ UpdateSerialization::Full(_) => {
update_count_full += 1;
}
- UpdateSerializationMechanism::Incremental(_) | UpdateSerializationMechanism::Reminder => {
+ UpdateSerialization::Incremental(_, _) | UpdateSerialization::Reminder(_, _) => {
update_count_incremental += 1;
}
};
let mut stripped_update = serialization::serialize_stripped_channel_update(¤t_update, &default_update_values, previous_update_scid);
output.append(&mut stripped_update);
- previous_update_scid = current_update.update.short_channel_id;
+ previous_update_scid = current_update.scid();
}
// some stats
/// after `last_sync_timestamp`
pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
println!("Obtaining channel ids from network graph");
+ let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
let channel_ids = {
let read_only_graph = network_graph.read_only();
println!("Retrieved read-only network graph copy");
// here is where the channels whose first update in either direction occurred after
// `last_seen_timestamp` are added to the selection
let newer_oldest_directional_updates = client.query("
- SELECT DISTINCT ON (short_channel_id) *
- FROM (
- SELECT DISTINCT ON (short_channel_id, direction) blob_signed
- FROM channel_updates
- WHERE short_channel_id = any($1)
- ORDER BY seen ASC, short_channel_id ASC, direction ASC
- ) AS directional_last_seens
- ORDER BY short_channel_id ASC, seen DESC
- ", &[&channel_ids]).await.unwrap();
+ SELECT * FROM (
+ SELECT DISTINCT ON (short_channel_id) *
+ FROM (
+ SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
+ FROM channel_updates
+ WHERE short_channel_id = any($1)
+ ORDER BY short_channel_id ASC, direction ASC, seen ASC
+ ) AS directional_last_seens
+ ORDER BY short_channel_id ASC, seen DESC
+ ) AS distinct_chans
+ WHERE distinct_chans.seen >= $2
+ ", &[&channel_ids, &last_sync_timestamp_object]).await.unwrap();
for current_row in newer_oldest_directional_updates {
- let blob: Vec<u8> = current_row.get("blob_signed");
- let mut readable = Cursor::new(blob);
- let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
- let scid = unsigned_update.short_channel_id;
+ let scid: i64 = current_row.get("short_channel_id");
let current_seen_timestamp_object: SystemTime = current_row.get("seen");
let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
- if current_seen_timestamp > last_sync_timestamp {
- // the newer of the two oldest seen directional updates came after last sync timestamp
- let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
- // first time a channel was seen in both directions
- (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
- }
+ // the newer of the two oldest seen directional updates came after last sync timestamp
+ let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
+ // first time a channel was seen in both directions
+ (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
}
}
// Steps:
// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
// — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
- let current_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
- let reminder_threshold_timestamp = current_timestamp.saturating_sub(config::CHANNEL_REMINDER_AGE);
+ let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap();
let read_only_graph = network_graph.read_only();
let older_latest_directional_updates = client.query("
- SELECT DISTINCT ON (short_channel_id) *
- FROM (
- SELECT DISTINCT ON (short_channel_id, direction) *
- FROM channel_updates
- WHERE short_channel_id = any($1)
- ORDER BY short_channel_id ASC, direction ASC, seen DESC
- ) AS directional_last_seens
- ORDER BY short_channel_id ASC, seen ASC
- ", &[&channel_ids]).await.unwrap();
+ SELECT short_channel_id FROM (
+ SELECT DISTINCT ON (short_channel_id) *
+ FROM (
+ SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
+ FROM channel_updates
+ WHERE short_channel_id = any($1)
+ ORDER BY short_channel_id ASC, direction ASC, seen DESC
+ ) AS directional_last_seens
+ ORDER BY short_channel_id ASC, seen ASC
+ ) AS distinct_chans
+ WHERE distinct_chans.seen <= $2
+ ", &[&channel_ids, &reminder_threshold_timestamp]).await.unwrap();
for current_row in older_latest_directional_updates {
- let blob: Vec<u8> = current_row.get("blob_signed");
- let mut readable = Cursor::new(blob);
- let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
- let scid = unsigned_update.short_channel_id;
- let current_seen_timestamp_object: SystemTime = current_row.get("seen");
- let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+ let scid: i64 = current_row.get("short_channel_id");
+
+ // annotate this channel as requiring that reminders be sent to the client
+ let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
- if current_seen_timestamp <= reminder_threshold_timestamp {
- // annotate this channel as requiring that reminders be sent to the client
- let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
-
- // way might be able to get away with not using this
- (*current_channel_delta).requires_reminder = true;
-
- if let Some(current_channel_info) = read_only_graph.channel(scid) {
- if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
- // we don't send reminders if we don't have bidirectional update data
- continue;
- }
-
- if let Some(info) = current_channel_info.one_to_two.as_ref() {
- let flags: u8 = if info.enabled { 0 } else { 2 };
- let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
- current_update.serialization_update_flags = Some(flags);
- }
-
- if let Some(info) = current_channel_info.two_to_one.as_ref() {
- let flags: u8 = if info.enabled { 1 } else { 3 };
- let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
- current_update.serialization_update_flags = Some(flags);
- }
- } else {
- // we don't send reminders if we don't have the channel
+ // way might be able to get away with not using this
+ (*current_channel_delta).requires_reminder = true;
+
+ if let Some(current_channel_info) = read_only_graph.channel(scid as u64) {
+ if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
+ // we don't send reminders if we don't have bidirectional update data
continue;
}
+
+ if let Some(info) = current_channel_info.one_to_two.as_ref() {
+ let flags: u8 = if info.enabled { 0 } else { 2 };
+ let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
+ current_update.serialization_update_flags = Some(flags);
+ }
+
+ if let Some(info) = current_channel_info.two_to_one.as_ref() {
+ let flags: u8 = if info.enabled { 1 } else { 3 };
+ let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
+ current_update.serialization_update_flags = Some(flags);
+ }
+ } else {
+ // we don't send reminders if we don't have the channel
+ continue;
}
}
}
// get the latest channel update in each direction prior to last_sync_timestamp, provided
// there was an update in either direction that happened after the last sync (to avoid
// collecting too many reference updates)
- let reference_rows = client.query("SELECT DISTINCT ON (short_channel_id, direction) id, direction, blob_signed FROM channel_updates WHERE seen < $1 AND short_channel_id IN (SELECT short_channel_id FROM channel_updates WHERE seen >= $1 GROUP BY short_channel_id) ORDER BY short_channel_id ASC, direction ASC, seen DESC", &[&last_sync_timestamp_object]).await.unwrap();
+ let reference_rows = client.query("
+ SELECT DISTINCT ON (short_channel_id, direction) id, direction, blob_signed
+ FROM channel_updates
+ WHERE seen < $1 AND short_channel_id IN (
+ SELECT DISTINCT ON (short_channel_id) short_channel_id
+ FROM channel_updates
+ WHERE seen >= $1
+ )
+ ORDER BY short_channel_id ASC, direction ASC, seen DESC
+ ", &[&last_sync_timestamp_object]).await.unwrap();
println!("Fetched reference rows ({}): {:?}", reference_rows.len(), start.elapsed());
intermediate_update_prefix = "DISTINCT ON (short_channel_id, direction)";
}
- let query_string = format!("SELECT {} id, direction, blob_signed, seen FROM channel_updates WHERE seen >= $1 ORDER BY short_channel_id ASC, direction ASC, seen DESC", intermediate_update_prefix);
+ let query_string = format!("
+ SELECT {} id, direction, blob_signed, seen
+ FROM channel_updates
+ WHERE seen >= $1
+ ORDER BY short_channel_id ASC, direction ASC, seen DESC
+ ", intermediate_update_prefix);
let intermediate_updates = client.query(&query_string, &[&last_sync_timestamp_object]).await.unwrap();
println!("Fetched intermediate rows ({}): {:?}", intermediate_updates.len(), start.elapsed());
}
}
-pub(super) struct UpdateSerialization {
- pub(super) update: UnsignedChannelUpdate,
- pub(super) mechanism: UpdateSerializationMechanism,
-}
-
impl MutatedProperties {
/// Does not include flags because the flag byte is always sent in full
fn len(&self) -> u8 {
}
}
-pub(super) enum UpdateSerializationMechanism {
- Full,
- Incremental(MutatedProperties),
- Reminder,
+pub(super) enum UpdateSerialization {
+ Full(UnsignedChannelUpdate),
+ Incremental(UnsignedChannelUpdate, MutatedProperties),
+ Reminder(u64, u8),
+}
+impl UpdateSerialization {
+ pub(super) fn scid(&self) -> u64 {
+ match self {
+ UpdateSerialization::Full(latest_update)|
+ UpdateSerialization::Incremental(latest_update, _) => latest_update.short_channel_id,
+ UpdateSerialization::Reminder(scid, _) => *scid,
+ }
+ }
+
+ fn flags(&self) -> u8 {
+ match self {
+ UpdateSerialization::Full(latest_update)|
+ UpdateSerialization::Incremental(latest_update, _) => latest_update.flags,
+ UpdateSerialization::Reminder(_, flags) => *flags,
+ }
+ }
}
struct FullUpdateValueHistograms {
*full_update_histograms.htlc_maximum_msat.entry(full_update.htlc_maximum_msat).or_insert(0) += 1;
};
- for (_scid, channel_delta) in delta_set.into_iter() {
+ for (scid, channel_delta) in delta_set.into_iter() {
// any announcement chain hash is gonna be the same value. Just set it from the first one.
let channel_announcement_delta = channel_delta.announcement.as_ref().unwrap();
if let Some(updates) = directed_updates {
if let Some(latest_update_delta) = updates.latest_update_after_seen {
let latest_update = latest_update_delta.update;
+ assert_eq!(latest_update.short_channel_id, scid, "Update in DB had wrong SCID column");
// the returned seen timestamp should be the latest of all the returned
// announcements and latest updates
// serialize the update as a full update instead of as a change
// this way, the default values can be computed more efficiently
record_full_update_in_histograms(&latest_update);
- serialization_set.updates.push(UpdateSerialization {
- update: latest_update,
- mechanism: UpdateSerializationMechanism::Full,
- });
+ serialization_set.updates.push(UpdateSerialization::Full(latest_update));
} else if mutated_properties.len() > 0 || mutated_properties.flags {
// we don't count flags as mutated properties
- serialization_set.updates.push(UpdateSerialization {
- update: latest_update,
- mechanism: UpdateSerializationMechanism::Incremental(mutated_properties),
- });
+ serialization_set.updates.push(
+ UpdateSerialization::Incremental(latest_update, mutated_properties));
}
} else {
// serialize the full update
record_full_update_in_histograms(&latest_update);
- serialization_set.updates.push(UpdateSerialization {
- update: latest_update,
- mechanism: UpdateSerializationMechanism::Full,
- });
+ serialization_set.updates.push(UpdateSerialization::Full(latest_update));
}
} else if let Some(flags) = updates.serialization_update_flags {
- // we need to insert a fake channel update where the only information
- let fake_update = UnsignedChannelUpdate {
- flags,
- chain_hash: BlockHash::all_zeros(),
- short_channel_id: 0,
- cltv_expiry_delta: 0,
- fee_base_msat: 0,
- fee_proportional_millionths: 0,
- htlc_maximum_msat: 0,
- htlc_minimum_msat: 0,
- timestamp: 0,
- excess_data: Vec::with_capacity(0),
- };
- serialization_set.updates.push(UpdateSerialization {
- update: fake_update,
- mechanism: UpdateSerializationMechanism::Reminder
- })
+ serialization_set.updates.push(UpdateSerialization::Reminder(scid, flags));
}
}
};
}
pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, default_values: &DefaultUpdateValues, previous_scid: u64) -> Vec<u8> {
- let latest_update = &update.update;
- let mut serialized_flags = latest_update.flags;
+ let mut serialized_flags = update.flags();
- if previous_scid > latest_update.short_channel_id {
+ if previous_scid > update.scid() {
panic!("unsorted scids!");
}
let mut delta_serialization = Vec::new();
let mut prefixed_serialization = Vec::new();
- match &update.mechanism {
- UpdateSerializationMechanism::Full => {
+ match update {
+ UpdateSerialization::Full(latest_update) => {
if latest_update.cltv_expiry_delta != default_values.cltv_expiry_delta {
serialized_flags |= 0b_0100_0000;
latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap();
latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
}
}
-
- UpdateSerializationMechanism::Incremental(mutated_properties) => {
+ UpdateSerialization::Incremental(latest_update, mutated_properties) => {
// indicate that this update is incremental
serialized_flags |= 0b_1000_0000;
latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
}
},
-
- UpdateSerializationMechanism::Reminder => {
+ UpdateSerialization::Reminder(_, _) => {
// indicate that this update is incremental
serialized_flags |= 0b_1000_0000;
}
}
- let scid_delta = BigSize(latest_update.short_channel_id - previous_scid);
+ let scid_delta = BigSize(update.scid() - previous_scid);
scid_delta.write(&mut prefixed_serialization).unwrap();
serialized_flags.write(&mut prefixed_serialization).unwrap();