From 7aaf4c6b6a6446506099516cb9b9edba4f14d0a4 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 11 Sep 2022 21:06:34 +0000 Subject: [PATCH] Use more appropriate data types for SCIDs, direction, composite_index --- Cargo.toml | 1 + src/config.rs | 89 ++++++++++++++++++++++++++++++++++++++++++---- src/lookup.rs | 26 ++++++-------- src/persistence.rs | 11 +++--- 4 files changed, 100 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 315c66f..d2e1435 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ lightning-block-sync = { version = "0.0.110", features=["rest-client"] } lightning-net-tokio = { version = "0.0.110" } tokio = { version = "1.14.1", features = ["full"] } tokio-postgres = { version="0.7.5" } +futures = "0.3" [profile.release] opt-level = 3 diff --git a/src/config.rs b/src/config.rs index 81db4cf..a128926 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,11 +1,17 @@ +use std::convert::TryInto; use std::env; use std::net::SocketAddr; +use std::io::Cursor; use bitcoin::secp256k1::PublicKey; +use lightning::ln::msgs::ChannelAnnouncement; +use lightning::util::ser::Readable; use lightning_block_sync::http::HttpEndpoint; use tokio_postgres::Config; use crate::hex_utils; -pub(crate) const SCHEMA_VERSION: i32 = 2; +use futures::stream::{FuturesUnordered, StreamExt}; + +pub(crate) const SCHEMA_VERSION: i32 = 5; pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true; @@ -47,7 +53,7 @@ pub(crate) fn db_config_table_creation_query() -> &'static str { pub(crate) fn db_announcement_table_creation_query() -> &'static str { "CREATE TABLE IF NOT EXISTS channel_announcements ( id SERIAL PRIMARY KEY, - short_channel_id character varying(255) NOT NULL UNIQUE, + short_channel_id bigint NOT NULL UNIQUE, block_height integer, announcement_signed BYTEA, seen timestamp NOT NULL DEFAULT NOW() @@ -55,13 +61,14 @@ pub(crate) fn db_announcement_table_creation_query() -> &'static str { } pub(crate) fn db_channel_update_table_creation_query() -> &'static str { + // We'll run out of room in composite index at block 8,388,608 or in the year 2286 "CREATE TABLE IF NOT EXISTS channel_updates ( id SERIAL PRIMARY KEY, - composite_index character varying(255) UNIQUE, - short_channel_id character varying(255), + composite_index character(29) UNIQUE, + short_channel_id bigint NOT NULL, timestamp bigint, channel_flags integer, - direction integer, + direction boolean NOT NULL, disable boolean, cltv_expiry_delta integer, htlc_minimum_msat bigint, @@ -77,7 +84,7 @@ pub(crate) fn db_index_creation_query() -> &'static str { " CREATE INDEX IF NOT EXISTS channels_seen ON channel_announcements(seen); CREATE INDEX IF NOT EXISTS channel_updates_scid ON channel_updates(short_channel_id); - CREATE INDEX IF NOT EXISTS channel_updates_direction ON channel_updates(direction); + CREATE INDEX IF NOT EXISTS channel_updates_direction ON channel_updates (short_channel_id, direction); CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen); " } @@ -90,6 +97,76 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) tx.execute("UPDATE config SET db_schema = 2 WHERE id = 1", &[]).await.unwrap(); tx.commit().await.unwrap(); } + if schema == 1 || schema == 2 { + let tx = client.transaction().await.unwrap(); + tx.execute("ALTER TABLE channel_updates DROP COLUMN short_channel_id", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ADD COLUMN short_channel_id bigint DEFAULT null", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates DROP COLUMN direction", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ADD COLUMN direction boolean DEFAULT null", &[]).await.unwrap(); + loop { + let rows = tx.query("SELECT id, composite_index FROM channel_updates WHERE short_channel_id IS NULL LIMIT 50000", &[]).await.unwrap(); + if rows.is_empty() { break; } + let mut updates = FuturesUnordered::new(); + for row in rows { + let id: i32 = row.get("id"); + let index: String = row.get("composite_index"); + let tx_ref = &tx; + updates.push(async move { + let mut index_iter = index.split(":"); + let scid_hex = index_iter.next().unwrap(); + index_iter.next().unwrap(); + let direction_str = index_iter.next().unwrap(); + assert!(direction_str == "1" || direction_str == "0"); + let direction = direction_str == "1"; + let scid_be_bytes = hex_utils::to_vec(scid_hex).unwrap(); + let scid = i64::from_be_bytes(scid_be_bytes.try_into().unwrap()); + assert!(scid > 0); // Will roll over in some 150 years or so + tx_ref.execute("UPDATE channel_updates SET short_channel_id = $1, direction = $2 WHERE id = $3", &[&scid, &direction, &id]).await.unwrap(); + }); + } + while let Some(_) = updates.next().await { } + } + tx.execute("CREATE INDEX channel_updates_scid ON channel_updates(short_channel_id)", &[]).await.unwrap(); + tx.execute("CREATE INDEX channel_updates_direction ON channel_updates (short_channel_id, direction)", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ALTER short_channel_id SET NOT NULL", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ALTER direction DROP DEFAULT", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ALTER direction SET NOT NULL", &[]).await.unwrap(); + tx.execute("UPDATE config SET db_schema = 3 WHERE id = 1", &[]).await.unwrap(); + tx.commit().await.unwrap(); + } + if schema >= 1 && schema <= 3 { + let tx = client.transaction().await.unwrap(); + tx.execute("ALTER TABLE channel_announcements DROP COLUMN short_channel_id", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_announcements ADD COLUMN short_channel_id bigint DEFAULT null", &[]).await.unwrap(); + loop { + let rows = tx.query("SELECT id, announcement_signed FROM channel_announcements WHERE short_channel_id IS NULL LIMIT 10000", &[]).await.unwrap(); + if rows.is_empty() { break; } + let mut updates = FuturesUnordered::new(); + for row in rows { + let id: i32 = row.get("id"); + let announcement: Vec = row.get("announcement_signed"); + let tx_ref = &tx; + updates.push(async move { + let scid = ChannelAnnouncement::read(&mut Cursor::new(announcement)).unwrap().contents.short_channel_id as i64; + assert!(scid > 0); // Will roll over in some 150 years or so + tx_ref.execute("UPDATE channel_announcements SET short_channel_id = $1 WHERE id = $2", &[&scid, &id]).await.unwrap(); + }); + } + while let Some(_) = updates.next().await { } + } + tx.execute("ALTER TABLE channel_announcements ADD CONSTRAINT channel_announcements_short_channel_id_key UNIQUE (short_channel_id)", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_announcements ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_announcements ALTER short_channel_id SET NOT NULL", &[]).await.unwrap(); + tx.execute("UPDATE config SET db_schema = 4 WHERE id = 1", &[]).await.unwrap(); + tx.commit().await.unwrap(); + } + if schema >= 1 && schema <= 4 { + let tx = client.transaction().await.unwrap(); + tx.execute("ALTER TABLE channel_updates ALTER composite_index SET DATA TYPE character(29)", &[]).await.unwrap(); + tx.execute("UPDATE config SET db_schema = 5 WHERE id = 1", &[]).await.unwrap(); + tx.commit().await.unwrap(); + } if schema <= 1 || schema > SCHEMA_VERSION { panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION); } diff --git a/src/lookup.rs b/src/lookup.rs index 83768da..0da340d 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -10,7 +10,7 @@ use lightning::util::ser::Readable; use tokio_postgres::{Client, Connection, NoTls, Socket}; use tokio_postgres::tls::NoTlsStream; -use crate::{config, hex_utils, TestLogger}; +use crate::{config, TestLogger}; use crate::serialization::MutatedProperties; /// The delta set needs to be a BTreeMap so the keys are sorted. @@ -73,8 +73,8 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ let channel_iterator = read_only_graph.channels().into_iter(); channel_iterator .filter(|c| c.1.announcement_message.is_some()) - .map(|c| hex_utils::hex_str(&c.1.announcement_message.as_ref().unwrap().contents.short_channel_id.to_be_bytes())) - .collect::>() + .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64) + .collect::>() }; println!("Obtaining corresponding database entries"); @@ -135,19 +135,17 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli last_seen_update_ids.push(update_id); non_intermediate_ids.insert(update_id); - let direction: i32 = current_reference.get("direction"); + let direction: bool = current_reference.get("direction"); let blob: Vec = current_reference.get("blob_signed"); let mut readable = Cursor::new(blob); let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; let scid = unsigned_channel_update.short_channel_id; let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default()); - let mut update_delta = if direction == 0 { + let mut update_delta = if !direction { (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default()) - } else if direction == 1 { - (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()) } else { - panic!("Channel direction must be binary!") + (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()) }; update_delta.last_update_before_seen = Some(unsigned_channel_update); } @@ -179,7 +177,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli } intermediate_update_count += 1; - let direction: i32 = intermediate_update.get("direction"); + let direction: bool = intermediate_update.get("direction"); let current_seen_timestamp_object: SystemTime = intermediate_update.get("seen"); let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; let blob: Vec = intermediate_update.get("blob_signed"); @@ -194,23 +192,21 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli // get the write configuration for this particular channel's directional details let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default()); - let update_delta = if direction == 0 { + let update_delta = if !direction { (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default()) - } else if direction == 1 { - (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()) } else { - panic!("Channel direction must be binary!") + (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()) }; { // handle the latest deltas - if direction == 0 && !previously_seen_directions.0 { + if !direction && !previously_seen_directions.0 { previously_seen_directions.0 = true; update_delta.latest_update_after_seen = Some(UpdateDelta { seen: current_seen_timestamp, update: unsigned_channel_update.clone(), }); - } else if direction == 1 && !previously_seen_directions.1 { + } else if direction && !previously_seen_directions.1 { previously_seen_directions.1 = true; update_delta.latest_update_after_seen = Some(UpdateDelta { seen: current_seen_timestamp, diff --git a/src/persistence.rs b/src/persistence.rs index 8fbf166..ceeec9d 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -109,8 +109,7 @@ impl GossipPersister { match &gossip_message { GossipMessage::ChannelAnnouncement(announcement) => { - let scid = announcement.contents.short_channel_id; - let scid_hex = hex_utils::hex_str(&scid.to_be_bytes()); + let scid = announcement.contents.short_channel_id as i64; // scid is 8 bytes // block height is the first three bytes // to obtain block height, shift scid right by 5 bytes (40 bits) @@ -126,7 +125,7 @@ impl GossipPersister { block_height, \ announcement_signed \ ) VALUES ($1, $2, $3) ON CONFLICT (short_channel_id) DO NOTHING", &[ - &scid_hex, + &scid, &block_height, &announcement_signed ]).await; @@ -135,7 +134,7 @@ impl GossipPersister { } } GossipMessage::ChannelUpdate(update) => { - let scid = update.contents.short_channel_id; + let scid = update.contents.short_channel_id as i64; let scid_hex = hex_utils::hex_str(&scid.to_be_bytes()); let timestamp = update.contents.timestamp as i64; @@ -173,10 +172,10 @@ impl GossipPersister { blob_signed \ ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT (composite_index) DO NOTHING", &[ &composite_index, - &scid_hex, + &scid, ×tamp, &channel_flags, - &direction, + &(direction == 1), &disable, &cltv_expiry_delta, &htlc_minimum_msat, -- 2.39.5