X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fdownloader.rs;h=49e3019b858f362b0192ffd409835950ca6826db;hb=529c6666501812eb7b23778f02b6a61940634c88;hp=9256100b581748100147d523fbe9f1a9ae0cee0b;hpb=b1d015229e3e6dc97e9bd24b892e2005e3b90779;p=rapid-gossip-sync-server diff --git a/src/downloader.rs b/src/downloader.rs index 9256100..49e3019 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -1,18 +1,20 @@ +use std::ops::Deref; use std::sync::{Arc, RwLock}; use bitcoin::secp256k1::PublicKey; +use lightning::events::{MessageSendEvent, MessageSendEventsProvider}; use lightning::ln::features::{InitFeatures, NodeFeatures}; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; -use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider}; +use lightning::util::logger::Logger; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use crate::TestLogger; use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager}; use crate::verifier::ChainVerifier; pub(crate) struct GossipCounter { + pub(crate) node_announcements: u64, pub(crate) channel_announcements: u64, pub(crate) channel_updates: u64, pub(crate) channel_updates_without_htlc_max_msats: u64, @@ -22,6 +24,7 @@ pub(crate) struct GossipCounter { impl GossipCounter { pub(crate) fn new() -> Self { Self { + node_announcements: 0, channel_announcements: 0, channel_updates: 0, channel_updates_without_htlc_max_msats: 0, @@ -30,28 +33,28 @@ impl GossipCounter { } } -pub(crate) struct GossipRouter { - native_router: P2PGossipSync>, GossipChainAccess, TestLogger>, +pub(crate) struct GossipRouter where L::Target: Logger { + native_router: P2PGossipSync>, GossipChainAccess, L>, pub(crate) counter: RwLock, sender: mpsc::Sender, - verifier: Arc, - outbound_gossiper: Arc>, GossipChainAccess, TestLogger>>, + verifier: Arc>, + outbound_gossiper: Arc>, GossipChainAccess, L>>, } -impl GossipRouter { - pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender) -> Self { - let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new())); - let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper))); +impl GossipRouter where L::Target: Logger { + pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender, logger: L) -> Self { + let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone())); + let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone())); Self { - native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()), + native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()), outbound_gossiper, counter: RwLock::new(GossipCounter::new()), sender, - verifier, + verifier } } - pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) { + pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) { self.verifier.set_ph(peer_handler); } @@ -61,7 +64,22 @@ impl GossipRouter { counter.channel_announcements += 1; } - let gossip_message = GossipMessage::ChannelAnnouncement(msg); + let gossip_message = GossipMessage::ChannelAnnouncement(msg, None); + if let Err(err) = self.sender.try_send(gossip_message) { + let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; + tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { + self.sender.send(gossip_message).await.unwrap(); + })}); + } + } + + fn new_node_announcement(&self, msg: NodeAnnouncement) { + { + let mut counter = self.counter.write().unwrap(); + counter.node_announcements += 1; + } + + let gossip_message = GossipMessage::NodeAnnouncement(msg, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { @@ -72,7 +90,7 @@ impl GossipRouter { fn new_channel_update(&self, msg: ChannelUpdate) { self.counter.write().unwrap().channel_updates += 1; - let gossip_message = GossipMessage::ChannelUpdate(msg); + let gossip_message = GossipMessage::ChannelUpdate(msg, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; @@ -83,15 +101,17 @@ impl GossipRouter { } } -impl MessageSendEventsProvider for GossipRouter { +impl MessageSendEventsProvider for GossipRouter where L::Target: Logger { fn get_and_clear_pending_msg_events(&self) -> Vec { let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events(); for ev in gossip_evs { match ev { - MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None } => { + MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => { self.new_channel_announcement(msg); }, - MessageSendEvent::BroadcastNodeAnnouncement { .. } => {}, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + self.new_node_announcement(msg); + }, MessageSendEvent::BroadcastChannelUpdate { msg } => { self.new_channel_update(msg); }, @@ -102,9 +122,11 @@ impl MessageSendEventsProvider for GossipRouter { } } -impl RoutingMessageHandler for GossipRouter { +impl RoutingMessageHandler for GossipRouter where L::Target: Logger { fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { - self.native_router.handle_node_announcement(msg) + let res = self.native_router.handle_node_announcement(msg)?; + self.new_node_announcement(msg.clone()); + Ok(res) } fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result {