]> git.bitcoin.ninja Git - rapid-gossip-sync-server/commitdiff
Backfill TXO values 2024-11-val-field
authorMatt Corallo <git@bluematt.me>
Wed, 20 Nov 2024 22:48:37 +0000 (22:48 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 21 Nov 2024 02:51:31 +0000 (02:51 +0000)
src/config.rs
src/persistence.rs
src/verifier.rs

index 2a83d6507239e521ce992f490a8e82af068624f0..6479b446675fe81fcad755499c33a30768f30789 100644 (file)
@@ -1,7 +1,10 @@
 use crate::hex_utils;
+use crate::verifier::ChainVerifier;
 
 use std::env;
 use std::net::{SocketAddr, ToSocketAddrs};
+use std::ops::Deref;
+use std::sync::Arc;
 use std::time::Duration;
 
 use bitcoin::io::Cursor;
@@ -10,10 +13,14 @@ use bitcoin::hashes::hex::FromHex;
 use bitcoin::secp256k1::PublicKey;
 use futures::stream::{FuturesUnordered, StreamExt};
 use lightning::ln::msgs::ChannelAnnouncement;
+use lightning::util::logger::Logger;
 use lightning::util::ser::Readable;
 use lightning_block_sync::http::HttpEndpoint;
+use lightning_block_sync::rest::RestClient;
 use tokio_postgres::Config;
 
+use tokio::sync::Semaphore;
+
 pub(crate) const SCHEMA_VERSION: i32 = 15;
 pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
 pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
@@ -167,7 +174,9 @@ pub(crate) fn db_index_creation_query() -> &'static str {
        "
 }
 
-pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client) {
+pub(crate) async fn upgrade_db<L: Deref + Clone + Send + Sync + 'static>(
+       schema: i32, client: &mut tokio_postgres::Client, logger: L,
+) where L::Target: Logger {
        if schema == 1 {
                let tx = client.transaction().await.unwrap();
                tx.execute("ALTER TABLE channel_updates DROP COLUMN chain_hash", &[]).await.unwrap();
@@ -314,7 +323,31 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
                tx.commit().await.unwrap();
        }
        if schema >= 1 && schema <= 14 {
-               client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await.unwrap();
+               println!("Upgrading to schema 14 requiring UTXO lookups for each historical channel announcement. This may take some time");
+               // Note that we don't bother doing this one in a transaction, and as such need to support
+               // resuming on a crash.
+               let _ = client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await;
+               tokio::spawn(async move {
+                       let client = crate::connect_to_db().await;
+                       let mut scids = Box::pin(client.query_raw("SELECT DISTINCT ON (short_channel_id) short_channel_id FROM channel_announcements WHERE funding_amount_sats IS NULL;", &[0i64][1..]).await.unwrap());
+                       let sem = Arc::new(Semaphore::new(16));
+                       while let Some(scid_res) = scids.next().await {
+                               let scid: i64 = scid_res.unwrap().get(0);
+                               let permit = Arc::clone(&sem).acquire_owned().await.unwrap();
+                               let logger = logger.clone();
+                               tokio::spawn(async move {
+                                       let rest_client = Arc::new(RestClient::new(bitcoin_rest_endpoint()).unwrap());
+                                       let txo = ChainVerifier::retrieve_txo(rest_client, scid as u64, logger).await
+                                               .expect("We shouldn't have accepted a channel announce with a bad TXO");
+                                       let client = crate::connect_to_db().await;
+                                       client.execute("UPDATE channel_announcements SET funding_amount_sats = $1 WHERE short_channel_id = $2", &[&(txo.value.to_sat() as i64), &scid]).await.unwrap();
+                                       std::mem::drop(permit);
+                               });
+                       }
+                       let _all_updates_complete = sem.acquire_many(16).await.unwrap();
+                       client.execute("ALTER TABLE channel_announcements ALTER funding_amount_sats SET NOT NULL", &[]).await.unwrap();
+                       client.execute("UPDATE config SET db_schema = 15 WHERE id = 1", &[]).await.unwrap();
+               });
        }
        if schema <= 1 || schema > SCHEMA_VERSION {
                panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
index a7d96c06442c4b092f265a77aa5bd3031bc6810f..b60048070e8f466df25b49c6090423f59b6c869f 100644 (file)
@@ -23,7 +23,7 @@ pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
        logger: L
 }
 
-impl<L: Deref> GossipPersister<L> where L::Target: Logger {
+impl<L: Deref + Clone + Send + Sync + 'static> GossipPersister<L> where L::Target: Logger {
        pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
                let (gossip_persistence_sender, gossip_persistence_receiver) =
                        mpsc::channel::<GossipMessage>(100);
@@ -50,7 +50,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
 
                        let cur_schema = client.query("SELECT db_schema FROM config WHERE id = $1", &[&1]).await.unwrap();
                        if !cur_schema.is_empty() {
-                               config::upgrade_db(cur_schema[0].get(0), &mut client).await;
+                               config::upgrade_db(cur_schema[0].get(0), &mut client, self.logger.clone()).await;
                        }
 
                        let preparation = client.execute("set time zone UTC", &[]).await;
index a7e3e13e817cc59df755c59b34778bd139b487af..4c686deb7962b5964e3e74c70b492751f3dc4e5f 100644 (file)
@@ -54,6 +54,10 @@ impl<L: Deref + Clone + Send + Sync + 'static> ChainVerifier<L> where L::Target:
                        .await.map(|txo| txo.value.to_sat())
        }
 
+       pub(crate) async fn retrieve_txo(client: Arc<RestClient>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
+               Self::retrieve_cache_txo(client, None, short_channel_id, logger).await
+       }
+
        async fn retrieve_cache_txo(client: Arc<RestClient>, scid_value: Option<Arc<Mutex<HashMap<u64, u64>>>>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
                let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
                let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;