From 0a101c2bf1b6907331168a238220f8e9b5f96ba2 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 20 Nov 2024 22:48:37 +0000 Subject: [PATCH] Backfill TXO values --- src/config.rs | 37 +++++++++++++++++++++++++++++++++++-- src/persistence.rs | 4 ++-- src/verifier.rs | 4 ++++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index 2a83d65..6479b44 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,7 +1,10 @@ use crate::hex_utils; +use crate::verifier::ChainVerifier; use std::env; use std::net::{SocketAddr, ToSocketAddrs}; +use std::ops::Deref; +use std::sync::Arc; use std::time::Duration; use bitcoin::io::Cursor; @@ -10,10 +13,14 @@ use bitcoin::hashes::hex::FromHex; use bitcoin::secp256k1::PublicKey; use futures::stream::{FuturesUnordered, StreamExt}; use lightning::ln::msgs::ChannelAnnouncement; +use lightning::util::logger::Logger; use lightning::util::ser::Readable; use lightning_block_sync::http::HttpEndpoint; +use lightning_block_sync::rest::RestClient; use tokio_postgres::Config; +use tokio::sync::Semaphore; + pub(crate) const SCHEMA_VERSION: i32 = 15; pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks @@ -167,7 +174,9 @@ pub(crate) fn db_index_creation_query() -> &'static str { " } -pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) { +pub(crate) async fn upgrade_db( + schema: i32, client: &mut tokio_postgres::Client, logger: L, +) where L::Target: Logger { if schema == 1 { let tx = client.transaction().await.unwrap(); tx.execute("ALTER TABLE channel_updates DROP COLUMN chain_hash", &[]).await.unwrap(); @@ -314,7 +323,31 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) tx.commit().await.unwrap(); } if schema >= 1 && schema <= 14 { - client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await.unwrap(); + println!("Upgrading to schema 14 requiring UTXO lookups for each historical channel announcement. This may take some time"); + // Note that we don't bother doing this one in a transaction, and as such need to support + // resuming on a crash. + let _ = client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await; + tokio::spawn(async move { + let client = crate::connect_to_db().await; + let mut scids = Box::pin(client.query_raw("SELECT DISTINCT ON (short_channel_id) short_channel_id FROM channel_announcements WHERE funding_amount_sats IS NULL;", &[0i64][1..]).await.unwrap()); + let sem = Arc::new(Semaphore::new(16)); + while let Some(scid_res) = scids.next().await { + let scid: i64 = scid_res.unwrap().get(0); + let permit = Arc::clone(&sem).acquire_owned().await.unwrap(); + let logger = logger.clone(); + tokio::spawn(async move { + let rest_client = Arc::new(RestClient::new(bitcoin_rest_endpoint()).unwrap()); + let txo = ChainVerifier::retrieve_txo(rest_client, scid as u64, logger).await + .expect("We shouldn't have accepted a channel announce with a bad TXO"); + let client = crate::connect_to_db().await; + client.execute("UPDATE channel_announcements SET funding_amount_sats = $1 WHERE short_channel_id = $2", &[&(txo.value.to_sat() as i64), &scid]).await.unwrap(); + std::mem::drop(permit); + }); + } + let _all_updates_complete = sem.acquire_many(16).await.unwrap(); + client.execute("ALTER TABLE channel_announcements ALTER funding_amount_sats SET NOT NULL", &[]).await.unwrap(); + client.execute("UPDATE config SET db_schema = 15 WHERE id = 1", &[]).await.unwrap(); + }); } if schema <= 1 || schema > SCHEMA_VERSION { panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION); diff --git a/src/persistence.rs b/src/persistence.rs index a7d96c0..b600480 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -23,7 +23,7 @@ pub(crate) struct GossipPersister where L::Target: Logger { logger: L } -impl GossipPersister where L::Target: Logger { +impl GossipPersister where L::Target: Logger { pub fn new(network_graph: Arc>, logger: L) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); @@ -50,7 +50,7 @@ impl GossipPersister where L::Target: Logger { let cur_schema = client.query("SELECT db_schema FROM config WHERE id = $1", &[&1]).await.unwrap(); if !cur_schema.is_empty() { - config::upgrade_db(cur_schema[0].get(0), &mut client).await; + config::upgrade_db(cur_schema[0].get(0), &mut client, self.logger.clone()).await; } let preparation = client.execute("set time zone UTC", &[]).await; diff --git a/src/verifier.rs b/src/verifier.rs index a7e3e13..4c686de 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -54,6 +54,10 @@ impl ChainVerifier where L::Target: .await.map(|txo| txo.value.to_sat()) } + pub(crate) async fn retrieve_txo(client: Arc, short_channel_id: u64, logger: L) -> Result { + Self::retrieve_cache_txo(client, None, short_channel_id, logger).await + } + async fn retrieve_cache_txo(client: Arc, scid_value: Option>>>, short_channel_id: u64, logger: L) -> Result { let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32; -- 2.39.5