Update LDK to 0.0.112
[rapid-gossip-sync-server] / src / downloader.rs
1 use std::sync::{Arc, RwLock};
2
3 use bitcoin::secp256k1::PublicKey;
4 use lightning::ln::features::{InitFeatures, NodeFeatures};
5 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
6 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
7 use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
8 use tokio::sync::mpsc;
9 use tokio::sync::mpsc::error::TrySendError;
10
11 use crate::TestLogger;
12 use crate::types::{GossipMessage, GossipChainAccess};
13 use crate::verifier::ChainVerifier;
14
15 pub(crate) struct GossipCounter {
16         pub(crate) channel_announcements: u64,
17         pub(crate) channel_updates: u64,
18         pub(crate) channel_updates_without_htlc_max_msats: u64,
19         pub(crate) channel_announcements_with_mismatched_scripts: u64
20 }
21
22 impl GossipCounter {
23         pub(crate) fn new() -> Self {
24                 Self {
25                         channel_announcements: 0,
26                         channel_updates: 0,
27                         channel_updates_without_htlc_max_msats: 0,
28                         channel_announcements_with_mismatched_scripts: 0,
29                 }
30         }
31 }
32
33 pub(crate) struct GossipRouter {
34         native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
35         pub(crate) counter: RwLock<GossipCounter>,
36         sender: mpsc::Sender<GossipMessage>,
37 }
38
39 impl GossipRouter {
40         pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
41                 Self {
42                         native_router: P2PGossipSync::new(network_graph, Some(Arc::new(ChainVerifier::new())), TestLogger::new()),
43                         counter: RwLock::new(GossipCounter::new()),
44                         sender
45                 }
46         }
47 }
48
49 impl MessageSendEventsProvider for GossipRouter {
50         fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
51                 self.native_router.get_and_clear_pending_msg_events()
52         }
53 }
54
55 impl RoutingMessageHandler for GossipRouter {
56         fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
57                 self.native_router.handle_node_announcement(msg)
58         }
59
60         fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
61                 let native_result = self.native_router.handle_channel_announcement(msg);
62                 let output_value;
63                 {
64                         let mut counter = self.counter.write().unwrap();
65                         output_value = native_result.map_err(|error| {
66                                 if error.err.contains("didn't match on-chain script") {
67                                         counter.channel_announcements_with_mismatched_scripts += 1;
68                                 }
69                                 error
70                         })?;
71                         counter.channel_announcements += 1;
72                 }
73
74                 let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
75                 if let Err(err) = self.sender.try_send(gossip_message) {
76                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
77                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
78                                 self.sender.send(gossip_message).await.unwrap();
79                         })});
80                 }
81
82                 Ok(output_value)
83         }
84
85         fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
86                 let output_value = self.native_router.handle_channel_update(msg)?;
87
88                 self.counter.write().unwrap().channel_updates += 1;
89                 let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
90
91                 if let Err(err) = self.sender.try_send(gossip_message) {
92                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
93                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
94                                 self.sender.send(gossip_message).await.unwrap();
95                         })});
96                 }
97
98                 Ok(output_value)
99         }
100
101         fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
102                 self.native_router.get_next_channel_announcement(starting_point)
103         }
104
105         fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> {
106                 self.native_router.get_next_node_announcement(starting_point)
107         }
108
109         fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) -> Result<(), ()> {
110                 self.native_router.peer_connected(their_node_id, init)
111         }
112
113         fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
114                 self.native_router.handle_reply_channel_range(their_node_id, msg)
115         }
116
117         fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
118                 self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg)
119         }
120
121         fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> {
122                 self.native_router.handle_query_channel_range(their_node_id, msg)
123         }
124
125         fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
126                 self.native_router.handle_query_short_channel_ids(their_node_id, msg)
127         }
128
129         fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures {
130                 self.native_router.provided_init_features(their_node_id)
131         }
132
133         fn provided_node_features(&self) -> NodeFeatures {
134                 self.native_router.provided_node_features()
135         }
136 }