Merge pull request #82 from arik-so/gossip-v2-bugfixes
[rapid-gossip-sync-server] / src / downloader.rs
index 8d5cdb0005e7d8e3ea74f5036ee6c20220a9729e..49e3019b858f362b0192ffd409835950ca6826db 100644 (file)
@@ -14,6 +14,7 @@ use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
 use crate::verifier::ChainVerifier;
 
 pub(crate) struct GossipCounter {
+       pub(crate) node_announcements: u64,
        pub(crate) channel_announcements: u64,
        pub(crate) channel_updates: u64,
        pub(crate) channel_updates_without_htlc_max_msats: u64,
@@ -23,6 +24,7 @@ pub(crate) struct GossipCounter {
 impl GossipCounter {
        pub(crate) fn new() -> Self {
                Self {
+                       node_announcements: 0,
                        channel_announcements: 0,
                        channel_updates: 0,
                        channel_updates_without_htlc_max_msats: 0,
@@ -62,7 +64,22 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
                        counter.channel_announcements += 1;
                }
 
-               let gossip_message = GossipMessage::ChannelAnnouncement(msg);
+               let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
+               if let Err(err) = self.sender.try_send(gossip_message) {
+                       let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
+                       tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
+                               self.sender.send(gossip_message).await.unwrap();
+                       })});
+               }
+       }
+
+       fn new_node_announcement(&self, msg: NodeAnnouncement) {
+               {
+                       let mut counter = self.counter.write().unwrap();
+                       counter.node_announcements += 1;
+               }
+
+               let gossip_message = GossipMessage::NodeAnnouncement(msg, None);
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
                        tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
@@ -73,7 +90,7 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
 
        fn new_channel_update(&self, msg: ChannelUpdate) {
                self.counter.write().unwrap().channel_updates += 1;
-               let gossip_message = GossipMessage::ChannelUpdate(msg);
+               let gossip_message = GossipMessage::ChannelUpdate(msg, None);
 
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
@@ -92,7 +109,9 @@ impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<
                                MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => {
                                        self.new_channel_announcement(msg);
                                },
-                               MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
+                               MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
+                                       self.new_node_announcement(msg);
+                               },
                                MessageSendEvent::BroadcastChannelUpdate { msg } => {
                                        self.new_channel_update(msg);
                                },
@@ -105,7 +124,9 @@ impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<
 
 impl<L: Deref + Clone + Send + Sync> RoutingMessageHandler for GossipRouter<L> where L::Target: Logger {
        fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
-               self.native_router.handle_node_announcement(msg)
+               let res = self.native_router.handle_node_announcement(msg)?;
+               self.new_node_announcement(msg.clone());
+               Ok(res)
        }
 
        fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {