Update schema for node announcement and address storage.
authorArik Sosman <git@arik.io>
Thu, 28 Mar 2024 06:28:41 +0000 (23:28 -0700)
committerArik Sosman <git@arik.io>
Wed, 15 May 2024 16:11:20 +0000 (09:11 -0700)
src/config.rs
src/downloader.rs
src/persistence.rs
src/types.rs

index 0941e78953424e0619934a2ab02b4e91de170260..c46ddf82b2cb9a533b7d57ce4444c6acbff0201a 100644 (file)
@@ -14,7 +14,7 @@ use lightning::util::ser::Readable;
 use lightning_block_sync::http::HttpEndpoint;
 use tokio_postgres::Config;
 
-pub(crate) const SCHEMA_VERSION: i32 = 13;
+pub(crate) const SCHEMA_VERSION: i32 = 14;
 pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
 pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
 // generate symlinks based on a 3-hour-granularity
@@ -135,6 +135,18 @@ pub(crate) fn db_channel_update_table_creation_query() -> &'static str {
        )"
 }
 
+pub(crate) fn db_node_announcement_table_creation_query() -> &'static str {
+       "CREATE TABLE IF NOT EXISTS node_announcements (
+               id SERIAL PRIMARY KEY,
+               public_key varchar(66) NOT NULL,
+               features BYTEA NOT NULL,
+               socket_addresses BYTEA NOT NULL,
+               timestamp bigint NOT NULL,
+               announcement_signed BYTEA,
+               seen timestamp NOT NULL DEFAULT NOW()
+       )"
+}
+
 pub(crate) fn db_index_creation_query() -> &'static str {
        "
        CREATE INDEX IF NOT EXISTS channel_updates_seen_scid ON channel_updates(seen, short_channel_id);
index af854c8317d059444b9ee2df6598bb1edbeeccb7..49e3019b858f362b0192ffd409835950ca6826db 100644 (file)
@@ -14,6 +14,7 @@ use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
 use crate::verifier::ChainVerifier;
 
 pub(crate) struct GossipCounter {
+       pub(crate) node_announcements: u64,
        pub(crate) channel_announcements: u64,
        pub(crate) channel_updates: u64,
        pub(crate) channel_updates_without_htlc_max_msats: u64,
@@ -23,6 +24,7 @@ pub(crate) struct GossipCounter {
 impl GossipCounter {
        pub(crate) fn new() -> Self {
                Self {
+                       node_announcements: 0,
                        channel_announcements: 0,
                        channel_updates: 0,
                        channel_updates_without_htlc_max_msats: 0,
@@ -71,6 +73,21 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
                }
        }
 
+       fn new_node_announcement(&self, msg: NodeAnnouncement) {
+               {
+                       let mut counter = self.counter.write().unwrap();
+                       counter.node_announcements += 1;
+               }
+
+               let gossip_message = GossipMessage::NodeAnnouncement(msg, None);
+               if let Err(err) = self.sender.try_send(gossip_message) {
+                       let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
+                       tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
+                               self.sender.send(gossip_message).await.unwrap();
+                       })});
+               }
+       }
+
        fn new_channel_update(&self, msg: ChannelUpdate) {
                self.counter.write().unwrap().channel_updates += 1;
                let gossip_message = GossipMessage::ChannelUpdate(msg, None);
@@ -92,7 +109,9 @@ impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<
                                MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => {
                                        self.new_channel_announcement(msg);
                                },
-                               MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
+                               MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
+                                       self.new_node_announcement(msg);
+                               },
                                MessageSendEvent::BroadcastChannelUpdate { msg } => {
                                        self.new_channel_update(msg);
                                },
@@ -105,7 +124,9 @@ impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<
 
 impl<L: Deref + Clone + Send + Sync> RoutingMessageHandler for GossipRouter<L> where L::Target: Logger {
        fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
-               self.native_router.handle_node_announcement(msg)
+               let res = self.native_router.handle_node_announcement(msg)?;
+               self.new_node_announcement(msg.clone());
+               Ok(res)
        }
 
        fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
index a7cfb37caca8c17c9a21541f9e10d934d2c13477..e0c734d7a0907cce82401c8a2696625efcd72212 100644 (file)
@@ -69,21 +69,20 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
                                panic!("db init error: {}", initialization_error);
                        }
 
-                       let initialization = client
-                               .execute(config::db_announcement_table_creation_query(), &[])
-                               .await;
-                       if let Err(initialization_error) = initialization {
-                               panic!("db init error: {}", initialization_error);
-                       }
+                       let table_creation_queries = [
+                               config::db_announcement_table_creation_query(),
+                               config::db_channel_update_table_creation_query(),
+                               config::db_channel_update_table_creation_query(),
+                               config::db_node_announcement_table_creation_query()
+                       ];
 
-                       let initialization = client
-                               .execute(
-                                       config::db_channel_update_table_creation_query(),
-                                       &[],
-                               )
-                               .await;
-                       if let Err(initialization_error) = initialization {
-                               panic!("db init error: {}", initialization_error);
+                       for current_table_creation_query in table_creation_queries {
+                               let initialization = client
+                                       .execute(current_table_creation_query, &[])
+                                       .await;
+                               if let Err(initialization_error) = initialization {
+                                       panic!("db init error: {}", initialization_error);
+                               }
                        }
 
                        let initialization = client
@@ -133,6 +132,9 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
 
                        let connections_cache_ref = Arc::clone(&connections_cache);
                        match gossip_message {
+                               GossipMessage::NodeAnnouncement(_announcement, _seen_override) => {
+
+                               },
                                GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
                                        let scid = announcement.contents.short_channel_id as i64;
 
index 0c6c9b2533a7404c556e4e3a15bfa673bd178bc5..f38a3760b316897b78729d4e96aa865c122b8969 100644 (file)
@@ -1,7 +1,7 @@
 use std::sync::Arc;
 
 use lightning::sign::KeysManager;
-use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
+use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement};
 use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager};
 use lightning::util::logger::{Logger, Record};
 use crate::config;
@@ -14,6 +14,7 @@ pub(crate) type GossipPeerManager<L> = Arc<PeerManager<lightning_net_tokio::Sock
 
 #[derive(Debug)]
 pub(crate) enum GossipMessage {
+       NodeAnnouncement(NodeAnnouncement, Option<u32>),
        // the second element is an optional override for the seen value
        ChannelAnnouncement(ChannelAnnouncement, Option<u32>),
        ChannelUpdate(ChannelUpdate, Option<u32>),