Allow custom logger types.
[rapid-gossip-sync-server] / src / downloader.rs
index 691a9e678667a7df2ab494079e8d976818784e53..0c672e36446c25d05d6dcf48905f67a47feb7d35 100644 (file)
@@ -1,14 +1,16 @@
 use std::sync::{Arc, RwLock};
 
 use bitcoin::secp256k1::PublicKey;
+use lightning::events::{MessageSendEvent, MessageSendEventsProvider};
+use lightning::ln::features::{InitFeatures, NodeFeatures};
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
-use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
-use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
+use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
+use lightning::util::logger::Logger;
 use tokio::sync::mpsc;
 use tokio::sync::mpsc::error::TrySendError;
 
-use crate::{GossipChainAccess, TestLogger};
-use crate::types::GossipMessage;
+use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
+use crate::verifier::ChainVerifier;
 
 pub(crate) struct GossipCounter {
        pub(crate) channel_announcements: u64,
@@ -28,53 +30,49 @@ impl GossipCounter {
        }
 }
 
-pub(crate) struct GossipRouter {
-       pub(crate) native_router: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<TestLogger>>>, GossipChainAccess, Arc<TestLogger>>>,
+pub(crate) struct GossipRouter<L: Logger + Send + Sync + 'static> {
+       native_router: P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, GossipChainAccess<L>, Arc<L>>,
        pub(crate) counter: RwLock<GossipCounter>,
-       pub(crate) sender: mpsc::Sender<GossipMessage>,
+       sender: mpsc::Sender<GossipMessage>,
+       verifier: Arc<ChainVerifier<L>>,
+       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, GossipChainAccess<L>, Arc<L>>>,
 }
 
-impl MessageSendEventsProvider for GossipRouter {
-       fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
-               self.native_router.get_and_clear_pending_msg_events()
+impl<L: Logger + Send + Sync> GossipRouter<L> {
+       pub(crate) fn new(network_graph: Arc<NetworkGraph<Arc<L>>>, sender: mpsc::Sender<GossipMessage>, logger: Arc<L>) -> Self {
+               let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone()));
+               let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
+               Self {
+                       native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()),
+                       outbound_gossiper,
+                       counter: RwLock::new(GossipCounter::new()),
+                       sender,
+                       verifier
+               }
        }
-}
 
-impl RoutingMessageHandler for GossipRouter {
-       fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
-               self.native_router.handle_node_announcement(msg)
+       pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager<L>) {
+               self.verifier.set_ph(peer_handler);
        }
 
-       fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
-               let mut counter = self.counter.write().unwrap();
-
-               let output_value = self.native_router.handle_channel_announcement(msg).map_err(|error| {
-                       let error_string = format!("{:?}", error);
-                       if error_string.contains("announced on an unknown chain"){
-                               return error;
-                       }
-                       counter.channel_announcements_with_mismatched_scripts += 1;
-                       error
-               })?;
+       fn new_channel_announcement(&self, msg: ChannelAnnouncement) {
+               {
+                       let mut counter = self.counter.write().unwrap();
+                       counter.channel_announcements += 1;
+               }
 
-               counter.channel_announcements += 1;
-               let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
+               let gossip_message = GossipMessage::ChannelAnnouncement(msg);
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
                        tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
                                self.sender.send(gossip_message).await.unwrap();
                        })});
                }
-
-               Ok(output_value)
        }
 
-       fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
-               let output_value = self.native_router.handle_channel_update(msg)?;
-
-               let mut counter = self.counter.write().unwrap();
-               counter.channel_updates += 1;
-               let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
+       fn new_channel_update(&self, msg: ChannelUpdate) {
+               self.counter.write().unwrap().channel_updates += 1;
+               let gossip_message = GossipMessage::ChannelUpdate(msg);
 
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
@@ -82,20 +80,59 @@ impl RoutingMessageHandler for GossipRouter {
                                self.sender.send(gossip_message).await.unwrap();
                        })});
                }
+       }
+}
 
-               Ok(output_value)
+impl<L: Logger + Send + Sync> MessageSendEventsProvider for GossipRouter<L> {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+               let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
+               for ev in gossip_evs {
+                       match ev {
+                               MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => {
+                                       self.new_channel_announcement(msg);
+                               },
+                               MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
+                               MessageSendEvent::BroadcastChannelUpdate { msg } => {
+                                       self.new_channel_update(msg);
+                               },
+                               _ => { unreachable!() },
+                       }
+               }
+               self.native_router.get_and_clear_pending_msg_events()
+       }
+}
+
+impl<L: Logger + Send + Sync> RoutingMessageHandler for GossipRouter<L> {
+       fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
+               self.native_router.handle_node_announcement(msg)
+       }
+
+       fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
+               let res = self.native_router.handle_channel_announcement(msg)?;
+               self.new_channel_announcement(msg.clone());
+               Ok(res)
        }
 
-       fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
-               self.native_router.get_next_channel_announcements(starting_point, batch_amount)
+       fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
+               let res = self.native_router.handle_channel_update(msg)?;
+               self.new_channel_update(msg.clone());
+               Ok(res)
+       }
+
+       fn processing_queue_high(&self) -> bool {
+               self.native_router.processing_queue_high()
+       }
+
+       fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
+               self.native_router.get_next_channel_announcement(starting_point)
        }
 
-       fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement> {
-               self.native_router.get_next_node_announcements(starting_point, batch_amount)
+       fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> {
+               self.native_router.get_next_node_announcement(starting_point)
        }
 
-       fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) {
-               self.native_router.peer_connected(their_node_id, init)
+       fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> {
+               self.native_router.peer_connected(their_node_id, init, inbound)
        }
 
        fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
@@ -113,4 +150,12 @@ impl RoutingMessageHandler for GossipRouter {
        fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
                self.native_router.handle_query_short_channel_ids(their_node_id, msg)
        }
+
+       fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures {
+               self.native_router.provided_init_features(their_node_id)
+       }
+
+       fn provided_node_features(&self) -> NodeFeatures {
+               self.native_router.provided_node_features()
+       }
 }