X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fdownloader.rs;h=af854c8317d059444b9ee2df6598bb1edbeeccb7;hb=1c181eeae185d576c70592571550b3e4ba004446;hp=4fc3d51c6c005f610af7680af999068747c54a04;hpb=df908c814c701f5a19e55706e7716dc0647dcda0;p=rapid-gossip-sync-server diff --git a/src/downloader.rs b/src/downloader.rs index 4fc3d51..af854c8 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -1,15 +1,16 @@ +use std::ops::Deref; use std::sync::{Arc, RwLock}; use bitcoin::secp256k1::PublicKey; +use lightning::events::{MessageSendEvent, MessageSendEventsProvider}; use lightning::ln::features::{InitFeatures, NodeFeatures}; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler}; -use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; -use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider}; +use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; +use lightning::util::logger::Logger; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use crate::TestLogger; -use crate::types::{GossipMessage, GossipChainAccess}; +use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager}; use crate::verifier::ChainVerifier; pub(crate) struct GossipCounter { @@ -30,63 +31,49 @@ impl GossipCounter { } } -pub(crate) struct GossipRouter { - native_router: P2PGossipSync>, GossipChainAccess, TestLogger>, +pub(crate) struct GossipRouter where L::Target: Logger { + native_router: P2PGossipSync>, GossipChainAccess, L>, pub(crate) counter: RwLock, sender: mpsc::Sender, + verifier: Arc>, + outbound_gossiper: Arc>, GossipChainAccess, L>>, } -impl GossipRouter { - pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender) -> Self { +impl GossipRouter where L::Target: Logger { + pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender, logger: L) -> Self { + let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone())); + let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone())); Self { - native_router: P2PGossipSync::new(network_graph, Some(Arc::new(ChainVerifier::new())), TestLogger::new()), + native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()), + outbound_gossiper, counter: RwLock::new(GossipCounter::new()), - sender + sender, + verifier } } -} - -impl MessageSendEventsProvider for GossipRouter { - fn get_and_clear_pending_msg_events(&self) -> Vec { - self.native_router.get_and_clear_pending_msg_events() - } -} -impl RoutingMessageHandler for GossipRouter { - fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { - self.native_router.handle_node_announcement(msg) + pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) { + self.verifier.set_ph(peer_handler); } - fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result { - let native_result = self.native_router.handle_channel_announcement(msg); - let output_value; + fn new_channel_announcement(&self, msg: ChannelAnnouncement) { { let mut counter = self.counter.write().unwrap(); - output_value = native_result.map_err(|error| { - if error.err.contains("didn't match on-chain script") { - counter.channel_announcements_with_mismatched_scripts += 1; - } - error - })?; counter.channel_announcements += 1; } - let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone()); + let gossip_message = GossipMessage::ChannelAnnouncement(msg, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { self.sender.send(gossip_message).await.unwrap(); })}); } - - Ok(output_value) } - fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result { - let output_value = self.native_router.handle_channel_update(msg)?; - + fn new_channel_update(&self, msg: ChannelUpdate) { self.counter.write().unwrap().channel_updates += 1; - let gossip_message = GossipMessage::ChannelUpdate(msg.clone()); + let gossip_message = GossipMessage::ChannelUpdate(msg, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; @@ -94,20 +81,59 @@ impl RoutingMessageHandler for GossipRouter { self.sender.send(gossip_message).await.unwrap(); })}); } + } +} + +impl MessageSendEventsProvider for GossipRouter where L::Target: Logger { + fn get_and_clear_pending_msg_events(&self) -> Vec { + let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events(); + for ev in gossip_evs { + match ev { + MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => { + self.new_channel_announcement(msg); + }, + MessageSendEvent::BroadcastNodeAnnouncement { .. } => {}, + MessageSendEvent::BroadcastChannelUpdate { msg } => { + self.new_channel_update(msg); + }, + _ => { unreachable!() }, + } + } + self.native_router.get_and_clear_pending_msg_events() + } +} + +impl RoutingMessageHandler for GossipRouter where L::Target: Logger { + fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { + self.native_router.handle_node_announcement(msg) + } + + fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result { + let res = self.native_router.handle_channel_announcement(msg)?; + self.new_channel_announcement(msg.clone()); + Ok(res) + } + + fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result { + let res = self.native_router.handle_channel_update(msg)?; + self.new_channel_update(msg.clone()); + Ok(res) + } - Ok(output_value) + fn processing_queue_high(&self) -> bool { + self.native_router.processing_queue_high() } fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option, Option)> { self.native_router.get_next_channel_announcement(starting_point) } - fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option { + fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option { self.native_router.get_next_node_announcement(starting_point) } - fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) -> Result<(), ()> { - self.native_router.peer_connected(their_node_id, init) + fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> { + self.native_router.peer_connected(their_node_id, init, inbound) } fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {