X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fconfig.rs;fp=src%2Fconfig.rs;h=a1289263ce155978246e469c743e924d36006aad;hb=7aaf4c6b6a6446506099516cb9b9edba4f14d0a4;hp=81db4cf7af95ad819e1f5f276f87d57b47b06f4e;hpb=a5b963c54a4eef59a0b2235e271e9481169db669;p=rapid-gossip-sync-server diff --git a/src/config.rs b/src/config.rs index 81db4cf..a128926 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,11 +1,17 @@ +use std::convert::TryInto; use std::env; use std::net::SocketAddr; +use std::io::Cursor; use bitcoin::secp256k1::PublicKey; +use lightning::ln::msgs::ChannelAnnouncement; +use lightning::util::ser::Readable; use lightning_block_sync::http::HttpEndpoint; use tokio_postgres::Config; use crate::hex_utils; -pub(crate) const SCHEMA_VERSION: i32 = 2; +use futures::stream::{FuturesUnordered, StreamExt}; + +pub(crate) const SCHEMA_VERSION: i32 = 5; pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true; @@ -47,7 +53,7 @@ pub(crate) fn db_config_table_creation_query() -> &'static str { pub(crate) fn db_announcement_table_creation_query() -> &'static str { "CREATE TABLE IF NOT EXISTS channel_announcements ( id SERIAL PRIMARY KEY, - short_channel_id character varying(255) NOT NULL UNIQUE, + short_channel_id bigint NOT NULL UNIQUE, block_height integer, announcement_signed BYTEA, seen timestamp NOT NULL DEFAULT NOW() @@ -55,13 +61,14 @@ pub(crate) fn db_announcement_table_creation_query() -> &'static str { } pub(crate) fn db_channel_update_table_creation_query() -> &'static str { + // We'll run out of room in composite index at block 8,388,608 or in the year 2286 "CREATE TABLE IF NOT EXISTS channel_updates ( id SERIAL PRIMARY KEY, - composite_index character varying(255) UNIQUE, - short_channel_id character varying(255), + composite_index character(29) UNIQUE, + short_channel_id bigint NOT NULL, timestamp bigint, channel_flags integer, - direction integer, + direction boolean NOT NULL, disable boolean, cltv_expiry_delta integer, htlc_minimum_msat bigint, @@ -77,7 +84,7 @@ pub(crate) fn db_index_creation_query() -> &'static str { " CREATE INDEX IF NOT EXISTS channels_seen ON channel_announcements(seen); CREATE INDEX IF NOT EXISTS channel_updates_scid ON channel_updates(short_channel_id); - CREATE INDEX IF NOT EXISTS channel_updates_direction ON channel_updates(direction); + CREATE INDEX IF NOT EXISTS channel_updates_direction ON channel_updates (short_channel_id, direction); CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen); " } @@ -90,6 +97,76 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) tx.execute("UPDATE config SET db_schema = 2 WHERE id = 1", &[]).await.unwrap(); tx.commit().await.unwrap(); } + if schema == 1 || schema == 2 { + let tx = client.transaction().await.unwrap(); + tx.execute("ALTER TABLE channel_updates DROP COLUMN short_channel_id", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ADD COLUMN short_channel_id bigint DEFAULT null", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates DROP COLUMN direction", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ADD COLUMN direction boolean DEFAULT null", &[]).await.unwrap(); + loop { + let rows = tx.query("SELECT id, composite_index FROM channel_updates WHERE short_channel_id IS NULL LIMIT 50000", &[]).await.unwrap(); + if rows.is_empty() { break; } + let mut updates = FuturesUnordered::new(); + for row in rows { + let id: i32 = row.get("id"); + let index: String = row.get("composite_index"); + let tx_ref = &tx; + updates.push(async move { + let mut index_iter = index.split(":"); + let scid_hex = index_iter.next().unwrap(); + index_iter.next().unwrap(); + let direction_str = index_iter.next().unwrap(); + assert!(direction_str == "1" || direction_str == "0"); + let direction = direction_str == "1"; + let scid_be_bytes = hex_utils::to_vec(scid_hex).unwrap(); + let scid = i64::from_be_bytes(scid_be_bytes.try_into().unwrap()); + assert!(scid > 0); // Will roll over in some 150 years or so + tx_ref.execute("UPDATE channel_updates SET short_channel_id = $1, direction = $2 WHERE id = $3", &[&scid, &direction, &id]).await.unwrap(); + }); + } + while let Some(_) = updates.next().await { } + } + tx.execute("CREATE INDEX channel_updates_scid ON channel_updates(short_channel_id)", &[]).await.unwrap(); + tx.execute("CREATE INDEX channel_updates_direction ON channel_updates (short_channel_id, direction)", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ALTER short_channel_id SET NOT NULL", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ALTER direction DROP DEFAULT", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_updates ALTER direction SET NOT NULL", &[]).await.unwrap(); + tx.execute("UPDATE config SET db_schema = 3 WHERE id = 1", &[]).await.unwrap(); + tx.commit().await.unwrap(); + } + if schema >= 1 && schema <= 3 { + let tx = client.transaction().await.unwrap(); + tx.execute("ALTER TABLE channel_announcements DROP COLUMN short_channel_id", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_announcements ADD COLUMN short_channel_id bigint DEFAULT null", &[]).await.unwrap(); + loop { + let rows = tx.query("SELECT id, announcement_signed FROM channel_announcements WHERE short_channel_id IS NULL LIMIT 10000", &[]).await.unwrap(); + if rows.is_empty() { break; } + let mut updates = FuturesUnordered::new(); + for row in rows { + let id: i32 = row.get("id"); + let announcement: Vec = row.get("announcement_signed"); + let tx_ref = &tx; + updates.push(async move { + let scid = ChannelAnnouncement::read(&mut Cursor::new(announcement)).unwrap().contents.short_channel_id as i64; + assert!(scid > 0); // Will roll over in some 150 years or so + tx_ref.execute("UPDATE channel_announcements SET short_channel_id = $1 WHERE id = $2", &[&scid, &id]).await.unwrap(); + }); + } + while let Some(_) = updates.next().await { } + } + tx.execute("ALTER TABLE channel_announcements ADD CONSTRAINT channel_announcements_short_channel_id_key UNIQUE (short_channel_id)", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_announcements ALTER short_channel_id DROP DEFAULT", &[]).await.unwrap(); + tx.execute("ALTER TABLE channel_announcements ALTER short_channel_id SET NOT NULL", &[]).await.unwrap(); + tx.execute("UPDATE config SET db_schema = 4 WHERE id = 1", &[]).await.unwrap(); + tx.commit().await.unwrap(); + } + if schema >= 1 && schema <= 4 { + let tx = client.transaction().await.unwrap(); + tx.execute("ALTER TABLE channel_updates ALTER composite_index SET DATA TYPE character(29)", &[]).await.unwrap(); + tx.execute("UPDATE config SET db_schema = 5 WHERE id = 1", &[]).await.unwrap(); + tx.commit().await.unwrap(); + } if schema <= 1 || schema > SCHEMA_VERSION { panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION); }