From c5ad40eba4f970b0025cf590a1e2195a67cd541d Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 27 Mar 2024 23:28:41 -0700 Subject: [PATCH] Update schema for node announcement and address storage. --- src/config.rs | 14 +++++++++++++- src/downloader.rs | 25 +++++++++++++++++++++++-- src/persistence.rs | 30 ++++++++++++++++-------------- src/types.rs | 3 ++- 4 files changed, 54 insertions(+), 18 deletions(-) diff --git a/src/config.rs b/src/config.rs index 0941e78..c46ddf8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,7 +14,7 @@ use lightning::util::ser::Readable; use lightning_block_sync::http::HttpEndpoint; use tokio_postgres::Config; -pub(crate) const SCHEMA_VERSION: i32 = 13; +pub(crate) const SCHEMA_VERSION: i32 = 14; pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks // generate symlinks based on a 3-hour-granularity @@ -135,6 +135,18 @@ pub(crate) fn db_channel_update_table_creation_query() -> &'static str { )" } +pub(crate) fn db_node_announcement_table_creation_query() -> &'static str { + "CREATE TABLE IF NOT EXISTS node_announcements ( + id SERIAL PRIMARY KEY, + public_key varchar(66) NOT NULL, + features BYTEA NOT NULL, + socket_addresses BYTEA NOT NULL, + timestamp bigint NOT NULL, + announcement_signed BYTEA, + seen timestamp NOT NULL DEFAULT NOW() + )" +} + pub(crate) fn db_index_creation_query() -> &'static str { " CREATE INDEX IF NOT EXISTS channel_updates_seen_scid ON channel_updates(seen, short_channel_id); diff --git a/src/downloader.rs b/src/downloader.rs index af854c8..49e3019 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -14,6 +14,7 @@ use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager}; use crate::verifier::ChainVerifier; pub(crate) struct GossipCounter { + pub(crate) node_announcements: u64, pub(crate) channel_announcements: u64, pub(crate) channel_updates: u64, pub(crate) channel_updates_without_htlc_max_msats: u64, @@ -23,6 +24,7 @@ pub(crate) struct GossipCounter { impl GossipCounter { pub(crate) fn new() -> Self { Self { + node_announcements: 0, channel_announcements: 0, channel_updates: 0, channel_updates_without_htlc_max_msats: 0, @@ -71,6 +73,21 @@ impl GossipRouter where L::Target: Logger { } } + fn new_node_announcement(&self, msg: NodeAnnouncement) { + { + let mut counter = self.counter.write().unwrap(); + counter.node_announcements += 1; + } + + let gossip_message = GossipMessage::NodeAnnouncement(msg, None); + if let Err(err) = self.sender.try_send(gossip_message) { + let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; + tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { + self.sender.send(gossip_message).await.unwrap(); + })}); + } + } + fn new_channel_update(&self, msg: ChannelUpdate) { self.counter.write().unwrap().channel_updates += 1; let gossip_message = GossipMessage::ChannelUpdate(msg, None); @@ -92,7 +109,9 @@ impl MessageSendEventsProvider for GossipRouter< MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => { self.new_channel_announcement(msg); }, - MessageSendEvent::BroadcastNodeAnnouncement { .. } => {}, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + self.new_node_announcement(msg); + }, MessageSendEvent::BroadcastChannelUpdate { msg } => { self.new_channel_update(msg); }, @@ -105,7 +124,9 @@ impl MessageSendEventsProvider for GossipRouter< impl RoutingMessageHandler for GossipRouter where L::Target: Logger { fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { - self.native_router.handle_node_announcement(msg) + let res = self.native_router.handle_node_announcement(msg)?; + self.new_node_announcement(msg.clone()); + Ok(res) } fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result { diff --git a/src/persistence.rs b/src/persistence.rs index a7cfb37..e0c734d 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -69,21 +69,20 @@ impl GossipPersister where L::Target: Logger { panic!("db init error: {}", initialization_error); } - let initialization = client - .execute(config::db_announcement_table_creation_query(), &[]) - .await; - if let Err(initialization_error) = initialization { - panic!("db init error: {}", initialization_error); - } + let table_creation_queries = [ + config::db_announcement_table_creation_query(), + config::db_channel_update_table_creation_query(), + config::db_channel_update_table_creation_query(), + config::db_node_announcement_table_creation_query() + ]; - let initialization = client - .execute( - config::db_channel_update_table_creation_query(), - &[], - ) - .await; - if let Err(initialization_error) = initialization { - panic!("db init error: {}", initialization_error); + for current_table_creation_query in table_creation_queries { + let initialization = client + .execute(current_table_creation_query, &[]) + .await; + if let Err(initialization_error) = initialization { + panic!("db init error: {}", initialization_error); + } } let initialization = client @@ -133,6 +132,9 @@ impl GossipPersister where L::Target: Logger { let connections_cache_ref = Arc::clone(&connections_cache); match gossip_message { + GossipMessage::NodeAnnouncement(_announcement, _seen_override) => { + + }, GossipMessage::ChannelAnnouncement(announcement, seen_override) => { let scid = announcement.contents.short_channel_id as i64; diff --git a/src/types.rs b/src/types.rs index 0c6c9b2..f38a376 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use lightning::sign::KeysManager; -use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate}; +use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}; use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager}; use lightning::util::logger::{Logger, Record}; use crate::config; @@ -14,6 +14,7 @@ pub(crate) type GossipPeerManager = Arc), // the second element is an optional override for the seen value ChannelAnnouncement(ChannelAnnouncement, Option), ChannelUpdate(ChannelUpdate, Option), -- 2.39.5