Update to 0.0.115.
[rapid-gossip-sync-server] / src / downloader.rs
1 use std::sync::{Arc, RwLock};
2
3 use bitcoin::secp256k1::PublicKey;
4 use lightning::events::{MessageSendEvent, MessageSendEventsProvider};
5 use lightning::ln::features::{InitFeatures, NodeFeatures};
6 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
7 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
8 use tokio::sync::mpsc;
9 use tokio::sync::mpsc::error::TrySendError;
10
11 use crate::TestLogger;
12 use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
13 use crate::verifier::ChainVerifier;
14
15 pub(crate) struct GossipCounter {
16         pub(crate) channel_announcements: u64,
17         pub(crate) channel_updates: u64,
18         pub(crate) channel_updates_without_htlc_max_msats: u64,
19         pub(crate) channel_announcements_with_mismatched_scripts: u64
20 }
21
22 impl GossipCounter {
23         pub(crate) fn new() -> Self {
24                 Self {
25                         channel_announcements: 0,
26                         channel_updates: 0,
27                         channel_updates_without_htlc_max_msats: 0,
28                         channel_announcements_with_mismatched_scripts: 0,
29                 }
30         }
31 }
32
33 pub(crate) struct GossipRouter {
34         native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
35         pub(crate) counter: RwLock<GossipCounter>,
36         sender: mpsc::Sender<GossipMessage>,
37         verifier: Arc<ChainVerifier>,
38         outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
39 }
40
41 impl GossipRouter {
42         pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
43                 let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
44                 let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
45                 Self {
46                         native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
47                         outbound_gossiper,
48                         counter: RwLock::new(GossipCounter::new()),
49                         sender,
50                         verifier,
51                 }
52         }
53
54         pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
55                 self.verifier.set_ph(peer_handler);
56         }
57
58         fn new_channel_announcement(&self, msg: ChannelAnnouncement) {
59                 {
60                         let mut counter = self.counter.write().unwrap();
61                         counter.channel_announcements += 1;
62                 }
63
64                 let gossip_message = GossipMessage::ChannelAnnouncement(msg);
65                 if let Err(err) = self.sender.try_send(gossip_message) {
66                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
67                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
68                                 self.sender.send(gossip_message).await.unwrap();
69                         })});
70                 }
71         }
72
73         fn new_channel_update(&self, msg: ChannelUpdate) {
74                 self.counter.write().unwrap().channel_updates += 1;
75                 let gossip_message = GossipMessage::ChannelUpdate(msg);
76
77                 if let Err(err) = self.sender.try_send(gossip_message) {
78                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
79                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
80                                 self.sender.send(gossip_message).await.unwrap();
81                         })});
82                 }
83         }
84 }
85
86 impl MessageSendEventsProvider for GossipRouter {
87         fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
88                 let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
89                 for ev in gossip_evs {
90                         match ev {
91                                 MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => {
92                                         self.new_channel_announcement(msg);
93                                 },
94                                 MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
95                                 MessageSendEvent::BroadcastChannelUpdate { msg } => {
96                                         self.new_channel_update(msg);
97                                 },
98                                 _ => { unreachable!() },
99                         }
100                 }
101                 self.native_router.get_and_clear_pending_msg_events()
102         }
103 }
104
105 impl RoutingMessageHandler for GossipRouter {
106         fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
107                 self.native_router.handle_node_announcement(msg)
108         }
109
110         fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
111                 let res = self.native_router.handle_channel_announcement(msg)?;
112                 self.new_channel_announcement(msg.clone());
113                 Ok(res)
114         }
115
116         fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
117                 let res = self.native_router.handle_channel_update(msg)?;
118                 self.new_channel_update(msg.clone());
119                 Ok(res)
120         }
121
122         fn processing_queue_high(&self) -> bool {
123                 self.native_router.processing_queue_high()
124         }
125
126         fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
127                 self.native_router.get_next_channel_announcement(starting_point)
128         }
129
130         fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> {
131                 self.native_router.get_next_node_announcement(starting_point)
132         }
133
134         fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> {
135                 self.native_router.peer_connected(their_node_id, init, inbound)
136         }
137
138         fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
139                 self.native_router.handle_reply_channel_range(their_node_id, msg)
140         }
141
142         fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
143                 self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg)
144         }
145
146         fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> {
147                 self.native_router.handle_query_channel_range(their_node_id, msg)
148         }
149
150         fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
151                 self.native_router.handle_query_short_channel_ids(their_node_id, msg)
152         }
153
154         fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures {
155                 self.native_router.provided_init_features(their_node_id)
156         }
157
158         fn provided_node_features(&self) -> NodeFeatures {
159                 self.native_router.provided_node_features()
160         }
161 }