Merge pull request #10 from TheBlueMatt/2022-08-no-pin-cpufeatures
[rapid-gossip-sync-server] / src / downloader.rs
1 use std::sync::{Arc, RwLock};
2
3 use bitcoin::secp256k1::PublicKey;
4 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
5 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
6 use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
7 use tokio::sync::mpsc;
8 use tokio::sync::mpsc::error::TrySendError;
9
10 use crate::TestLogger;
11 use crate::types::{GossipMessage, GossipChainAccess};
12 use crate::verifier::ChainVerifier;
13
14 pub(crate) struct GossipCounter {
15         pub(crate) channel_announcements: u64,
16         pub(crate) channel_updates: u64,
17         pub(crate) channel_updates_without_htlc_max_msats: u64,
18         pub(crate) channel_announcements_with_mismatched_scripts: u64
19 }
20
21 impl GossipCounter {
22         pub(crate) fn new() -> Self {
23                 Self {
24                         channel_announcements: 0,
25                         channel_updates: 0,
26                         channel_updates_without_htlc_max_msats: 0,
27                         channel_announcements_with_mismatched_scripts: 0,
28                 }
29         }
30 }
31
32 pub(crate) struct GossipRouter {
33         native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
34         pub(crate) counter: RwLock<GossipCounter>,
35         sender: mpsc::Sender<GossipMessage>,
36 }
37
38 impl GossipRouter {
39         pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
40                 Self {
41                         native_router: P2PGossipSync::new(network_graph, Some(Arc::new(ChainVerifier::new())), TestLogger::new()),
42                         counter: RwLock::new(GossipCounter::new()),
43                         sender
44                 }
45         }
46 }
47
48 impl MessageSendEventsProvider for GossipRouter {
49         fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
50                 self.native_router.get_and_clear_pending_msg_events()
51         }
52 }
53
54 impl RoutingMessageHandler for GossipRouter {
55         fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
56                 self.native_router.handle_node_announcement(msg)
57         }
58
59         fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
60                 let native_result = self.native_router.handle_channel_announcement(msg);
61                 let output_value;
62                 {
63                         let mut counter = self.counter.write().unwrap();
64                         output_value = native_result.map_err(|error| {
65                                 if error.err.contains("didn't match on-chain script") {
66                                         counter.channel_announcements_with_mismatched_scripts += 1;
67                                 }
68                                 error
69                         })?;
70                         counter.channel_announcements += 1;
71                 }
72
73                 let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
74                 if let Err(err) = self.sender.try_send(gossip_message) {
75                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
76                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
77                                 self.sender.send(gossip_message).await.unwrap();
78                         })});
79                 }
80
81                 Ok(output_value)
82         }
83
84         fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
85                 let output_value = self.native_router.handle_channel_update(msg)?;
86
87                 self.counter.write().unwrap().channel_updates += 1;
88                 let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
89
90                 if let Err(err) = self.sender.try_send(gossip_message) {
91                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
92                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
93                                 self.sender.send(gossip_message).await.unwrap();
94                         })});
95                 }
96
97                 Ok(output_value)
98         }
99
100         fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
101                 self.native_router.get_next_channel_announcements(starting_point, batch_amount)
102         }
103
104         fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement> {
105                 self.native_router.get_next_node_announcements(starting_point, batch_amount)
106         }
107
108         fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) {
109                 self.native_router.peer_connected(their_node_id, init)
110         }
111
112         fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
113                 self.native_router.handle_reply_channel_range(their_node_id, msg)
114         }
115
116         fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
117                 self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg)
118         }
119
120         fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> {
121                 self.native_router.handle_query_channel_range(their_node_id, msg)
122         }
123
124         fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
125                 self.native_router.handle_query_short_channel_ids(their_node_id, msg)
126         }
127 }