]> git.bitcoin.ninja Git - rapid-gossip-sync-server/commitdiff
Store TXO values
authorMatt Corallo <git@bluematt.me>
Wed, 20 Nov 2024 22:48:18 +0000 (22:48 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 21 Nov 2024 01:48:40 +0000 (01:48 +0000)
src/config.rs
src/downloader.rs
src/persistence.rs
src/types.rs
src/verifier.rs

index 54c7b01465d19618d35188a8603003b70a82b716..2a83d6507239e521ce992f490a8e82af068624f0 100644 (file)
@@ -14,7 +14,7 @@ use lightning::util::ser::Readable;
 use lightning_block_sync::http::HttpEndpoint;
 use tokio_postgres::Config;
 
-pub(crate) const SCHEMA_VERSION: i32 = 14;
+pub(crate) const SCHEMA_VERSION: i32 = 15;
 pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
 pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
 // generate symlinks based on a 3-hour-granularity
@@ -313,6 +313,9 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
                tx.execute("UPDATE config SET db_schema = 14 WHERE id = 1", &[]).await.unwrap();
                tx.commit().await.unwrap();
        }
+       if schema >= 1 && schema <= 14 {
+               client.execute("ALTER TABLE channel_announcements ADD COLUMN funding_amount_sats bigint DEFAULT null", &[]).await.unwrap();
+       }
        if schema <= 1 || schema > SCHEMA_VERSION {
                panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
        }
index 49e3019b858f362b0192ffd409835950ca6826db..696ae43865a7dcb91201699eb5c9a3b318f71a45 100644 (file)
@@ -64,7 +64,16 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
                        counter.channel_announcements += 1;
                }
 
-               let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
+               let mut funding_amount_sats = self.verifier.get_cached_funding_value(msg.contents.short_channel_id);
+               if funding_amount_sats.is_none() {
+                       tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async {
+                               funding_amount_sats = self.verifier.retrieve_funding_value(msg.contents.short_channel_id).await.ok();
+                       })});
+               }
+               let funding_amount_sats = funding_amount_sats
+                       .expect("If we've accepted a ChannelAnnouncement, we must be able to fetch the TXO for it");
+
+               let gossip_message = GossipMessage::ChannelAnnouncement(msg, funding_amount_sats, None);
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
                        tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
index 0db3091beba7dd465db84754f47ac9c8eaed064b..a7d96c06442c4b092f265a77aa5bd3031bc6810f 100644 (file)
@@ -185,7 +185,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                                        #[cfg(test)]
                                        tasks_spawned.push(_task);
                                },
-                               GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
+                               GossipMessage::ChannelAnnouncement(announcement, funding_value, seen_override) => {
                                        let scid = announcement.contents.short_channel_id as i64;
 
                                        // start with the type prefix, which is already known a priori
@@ -198,19 +198,23 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                                                                .execute("INSERT INTO channel_announcements (\
                                                                short_channel_id, \
                                                                announcement_signed, \
+                                                               funding_amount_sats, \
                                                                seen \
-                                                       ) VALUES ($1, $2, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
+                                                       ) VALUES ($1, $2, $3, TO_TIMESTAMP($3)) ON CONFLICT (short_channel_id) DO NOTHING", &[
                                                                        &scid,
                                                                        &announcement_signed,
+                                                                       &(funding_value as i64),
                                                                        &(seen_override.unwrap() as f64)
                                                                ])).await.unwrap().unwrap();
                                                } else {
                                                        tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
                                                                .execute("INSERT INTO channel_announcements (\
                                                                short_channel_id, \
+                                                               funding_amount_sats, \
                                                                announcement_signed \
-                                                       ) VALUES ($1, $2) ON CONFLICT (short_channel_id) DO NOTHING", &[
+                                                       ) VALUES ($1, $2, $3) ON CONFLICT (short_channel_id) DO NOTHING", &[
                                                                        &scid,
+                                                                       &(funding_value as i64),
                                                                        &announcement_signed
                                                                ])).await.unwrap().unwrap();
                                                }
index f38a3760b316897b78729d4e96aa865c122b8969..b0684480b55bbd3d820c5b9a5fe5ff4d1f22b23e 100644 (file)
@@ -16,7 +16,7 @@ pub(crate) type GossipPeerManager<L> = Arc<PeerManager<lightning_net_tokio::Sock
 pub(crate) enum GossipMessage {
        NodeAnnouncement(NodeAnnouncement, Option<u32>),
        // the second element is an optional override for the seen value
-       ChannelAnnouncement(ChannelAnnouncement, Option<u32>),
+       ChannelAnnouncement(ChannelAnnouncement, u64, Option<u32>),
        ChannelUpdate(ChannelUpdate, Option<u32>),
 }
 
index 8256916d912f1804b05edaa69208349083e4a18d..a7e3e13e817cc59df755c59b34778bd139b487af 100644 (file)
@@ -1,3 +1,4 @@
+use std::collections::HashMap;
 use std::io::ErrorKind;
 use std::ops::Deref;
 use std::sync::Arc;
@@ -23,6 +24,7 @@ pub(crate) struct ChainVerifier<L: Deref + Clone + Send + Sync + 'static> where
        graph: Arc<NetworkGraph<L>>,
        outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
        peer_handler: Mutex<Option<GossipPeerManager<L>>>,
+       scid_value: Arc<Mutex<HashMap<u64, u64>>>,
        logger: L
 }
 
@@ -35,14 +37,24 @@ impl<L: Deref + Clone + Send + Sync + 'static> ChainVerifier<L> where L::Target:
                        outbound_gossiper,
                        graph,
                        peer_handler: Mutex::new(None),
-                       logger
+                       scid_value: Arc::new(Mutex::new(HashMap::new())),
+                       logger,
                }
        }
        pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager<L>) {
                *self.peer_handler.lock().unwrap() = Some(peer_handler);
        }
 
-       async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
+       pub(crate) fn get_cached_funding_value(&self, scid: u64) -> Option<u64> {
+               self.scid_value.lock().unwrap().get(&scid).map(|v| *v)
+       }
+
+       pub(crate) async fn retrieve_funding_value(&self, scid: u64) -> Result<u64, UtxoLookupError> {
+               Self::retrieve_cache_txo(Arc::clone(&self.rest_client), Some(Arc::clone(&self.scid_value)), scid, self.logger.clone())
+                       .await.map(|txo| txo.value.to_sat())
+       }
+
+       async fn retrieve_cache_txo(client: Arc<RestClient>, scid_value: Option<Arc<Mutex<HashMap<u64, u64>>>>, short_channel_id: u64, logger: L) -> Result<TxOut, UtxoLookupError> {
                let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
                let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
                let output_index = (short_channel_id & 0xffff) as u16;
@@ -57,7 +69,11 @@ impl<L: Deref + Clone + Send + Sync + 'static> ChainVerifier<L> where L::Target:
                        log_error!(logger, "Could't find output {} in transaction {}", output_index, transaction.compute_txid());
                        return Err(UtxoLookupError::UnknownTx);
                }
-               Ok(transaction.output.swap_remove(output_index as usize))
+               let txo = transaction.output.swap_remove(output_index as usize);
+               if let Some(scid_value) = scid_value {
+                       scid_value.lock().unwrap().insert(short_channel_id, txo.value.to_sat());
+               }
+               Ok(txo)
        }
 
        async fn retrieve_block(client: Arc<RestClient>, block_height: u32, logger: L) -> Result<Block, UtxoLookupError> {
@@ -99,10 +115,11 @@ impl<L: Deref + Clone + Send + Sync + 'static> UtxoLookup for ChainVerifier<L> w
                let graph_ref = Arc::clone(&self.graph);
                let client_ref = Arc::clone(&self.rest_client);
                let gossip_ref = Arc::clone(&self.outbound_gossiper);
+               let scid_value_cache_ref = Arc::clone(&self.scid_value);
                let pm_ref = self.peer_handler.lock().unwrap().clone();
                let logger_ref = self.logger.clone();
                tokio::spawn(async move {
-                       let res = Self::retrieve_utxo(client_ref, short_channel_id, logger_ref).await;
+                       let res = Self::retrieve_cache_txo(client_ref, Some(scid_value_cache_ref), short_channel_id, logger_ref).await;
                        fut.resolve(&*graph_ref, &*gossip_ref, res);
                        if let Some(pm) = pm_ref { pm.process_events(); }
                });