2 use std::sync::{Arc, RwLock};
4 use bitcoin::secp256k1::PublicKey;
5 use lightning::events::{MessageSendEvent, MessageSendEventsProvider};
6 use lightning::ln::features::{InitFeatures, NodeFeatures};
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
8 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
9 use lightning::util::logger::Logger;
10 use tokio::sync::mpsc;
11 use tokio::sync::mpsc::error::TrySendError;
13 use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
14 use crate::verifier::ChainVerifier;
16 pub(crate) struct GossipCounter {
17 pub(crate) channel_announcements: u64,
18 pub(crate) channel_updates: u64,
19 pub(crate) channel_updates_without_htlc_max_msats: u64,
20 pub(crate) channel_announcements_with_mismatched_scripts: u64
24 pub(crate) fn new() -> Self {
26 channel_announcements: 0,
28 channel_updates_without_htlc_max_msats: 0,
29 channel_announcements_with_mismatched_scripts: 0,
34 pub(crate) struct GossipRouter<L: Deref + Clone + Send + Sync + 'static> where L::Target: Logger {
35 native_router: P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>,
36 pub(crate) counter: RwLock<GossipCounter>,
37 sender: mpsc::Sender<GossipMessage>,
38 verifier: Arc<ChainVerifier<L>>,
39 outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>>,
42 impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
43 pub(crate) fn new(network_graph: Arc<NetworkGraph<L>>, sender: mpsc::Sender<GossipMessage>, logger: L) -> Self {
44 let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone()));
45 let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone()));
47 native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()),
49 counter: RwLock::new(GossipCounter::new()),
55 pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager<L>) {
56 self.verifier.set_ph(peer_handler);
59 fn new_channel_announcement(&self, msg: ChannelAnnouncement) {
61 let mut counter = self.counter.write().unwrap();
62 counter.channel_announcements += 1;
65 let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
66 if let Err(err) = self.sender.try_send(gossip_message) {
67 let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
68 tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
69 self.sender.send(gossip_message).await.unwrap();
74 fn new_channel_update(&self, msg: ChannelUpdate) {
75 self.counter.write().unwrap().channel_updates += 1;
76 let gossip_message = GossipMessage::ChannelUpdate(msg, None);
78 if let Err(err) = self.sender.try_send(gossip_message) {
79 let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
80 tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
81 self.sender.send(gossip_message).await.unwrap();
87 impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<L> where L::Target: Logger {
88 fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
89 let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
90 for ev in gossip_evs {
92 MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => {
93 self.new_channel_announcement(msg);
95 MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
96 MessageSendEvent::BroadcastChannelUpdate { msg } => {
97 self.new_channel_update(msg);
99 _ => { unreachable!() },
102 self.native_router.get_and_clear_pending_msg_events()
106 impl<L: Deref + Clone + Send + Sync> RoutingMessageHandler for GossipRouter<L> where L::Target: Logger {
107 fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
108 self.native_router.handle_node_announcement(msg)
111 fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
112 let res = self.native_router.handle_channel_announcement(msg)?;
113 self.new_channel_announcement(msg.clone());
117 fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
118 let res = self.native_router.handle_channel_update(msg)?;
119 self.new_channel_update(msg.clone());
123 fn processing_queue_high(&self) -> bool {
124 self.native_router.processing_queue_high()
127 fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
128 self.native_router.get_next_channel_announcement(starting_point)
131 fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> {
132 self.native_router.get_next_node_announcement(starting_point)
135 fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> {
136 self.native_router.peer_connected(their_node_id, init, inbound)
139 fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
140 self.native_router.handle_reply_channel_range(their_node_id, msg)
143 fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
144 self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg)
147 fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> {
148 self.native_router.handle_query_channel_range(their_node_id, msg)
151 fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
152 self.native_router.handle_query_short_channel_ids(their_node_id, msg)
155 fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures {
156 self.native_router.provided_init_features(their_node_id)
159 fn provided_node_features(&self) -> NodeFeatures {
160 self.native_router.provided_node_features()