1 use std::sync::{Arc, RwLock};
3 use bitcoin::secp256k1::PublicKey;
4 use lightning::ln::features::{InitFeatures, NodeFeatures};
5 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
6 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
7 use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
9 use tokio::sync::mpsc::error::TrySendError;
11 use crate::TestLogger;
12 use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
13 use crate::verifier::ChainVerifier;
15 pub(crate) struct GossipCounter {
16 pub(crate) channel_announcements: u64,
17 pub(crate) channel_updates: u64,
18 pub(crate) channel_updates_without_htlc_max_msats: u64,
19 pub(crate) channel_announcements_with_mismatched_scripts: u64
23 pub(crate) fn new() -> Self {
25 channel_announcements: 0,
27 channel_updates_without_htlc_max_msats: 0,
28 channel_announcements_with_mismatched_scripts: 0,
33 pub(crate) struct GossipRouter {
34 native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
35 pub(crate) counter: RwLock<GossipCounter>,
36 sender: mpsc::Sender<GossipMessage>,
37 verifier: Arc<ChainVerifier>,
38 outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
42 pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
43 let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
44 let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
46 native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
48 counter: RwLock::new(GossipCounter::new()),
54 pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
55 self.verifier.set_ph(peer_handler);
58 fn new_channel_announcement(&self, msg: ChannelAnnouncement) {
60 let mut counter = self.counter.write().unwrap();
61 counter.channel_announcements += 1;
64 let gossip_message = GossipMessage::ChannelAnnouncement(msg);
65 if let Err(err) = self.sender.try_send(gossip_message) {
66 let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
67 tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
68 self.sender.send(gossip_message).await.unwrap();
73 fn new_channel_update(&self, msg: ChannelUpdate) {
74 self.counter.write().unwrap().channel_updates += 1;
75 let gossip_message = GossipMessage::ChannelUpdate(msg);
77 if let Err(err) = self.sender.try_send(gossip_message) {
78 let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
79 tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
80 self.sender.send(gossip_message).await.unwrap();
86 impl MessageSendEventsProvider for GossipRouter {
87 fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
88 let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
89 for ev in gossip_evs {
91 MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None } => {
92 self.new_channel_announcement(msg);
94 MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
95 MessageSendEvent::BroadcastChannelUpdate { msg } => {
96 self.new_channel_update(msg);
98 _ => { unreachable!() },
101 self.native_router.get_and_clear_pending_msg_events()
105 impl RoutingMessageHandler for GossipRouter {
106 fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
107 self.native_router.handle_node_announcement(msg)
110 fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
111 let res = self.native_router.handle_channel_announcement(msg)?;
112 self.new_channel_announcement(msg.clone());
116 fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
117 let res = self.native_router.handle_channel_update(msg)?;
118 self.new_channel_update(msg.clone());
122 fn processing_queue_high(&self) -> bool {
123 self.native_router.processing_queue_high()
126 fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
127 self.native_router.get_next_channel_announcement(starting_point)
130 fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> {
131 self.native_router.get_next_node_announcement(starting_point)
134 fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> {
135 self.native_router.peer_connected(their_node_id, init, inbound)
138 fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
139 self.native_router.handle_reply_channel_range(their_node_id, msg)
142 fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
143 self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg)
146 fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> {
147 self.native_router.handle_query_channel_range(their_node_id, msg)
150 fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
151 self.native_router.handle_query_short_channel_ids(their_node_id, msg)
154 fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures {
155 self.native_router.provided_init_features(their_node_id)
158 fn provided_node_features(&self) -> NodeFeatures {
159 self.native_router.provided_node_features()