Correct v2 symlink paths
[rapid-gossip-sync-server] / src / downloader.rs
1 use std::ops::Deref;
2 use std::sync::{Arc, RwLock};
3
4 use bitcoin::secp256k1::PublicKey;
5 use lightning::events::{MessageSendEvent, MessageSendEventsProvider};
6 use lightning::ln::features::{InitFeatures, NodeFeatures};
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
8 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
9 use lightning::util::logger::Logger;
10 use tokio::sync::mpsc;
11 use tokio::sync::mpsc::error::TrySendError;
12
13 use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
14 use crate::verifier::ChainVerifier;
15
16 pub(crate) struct GossipCounter {
17         pub(crate) node_announcements: u64,
18         pub(crate) channel_announcements: u64,
19         pub(crate) channel_updates: u64,
20         pub(crate) channel_updates_without_htlc_max_msats: u64,
21         pub(crate) channel_announcements_with_mismatched_scripts: u64
22 }
23
24 impl GossipCounter {
25         pub(crate) fn new() -> Self {
26                 Self {
27                         node_announcements: 0,
28                         channel_announcements: 0,
29                         channel_updates: 0,
30                         channel_updates_without_htlc_max_msats: 0,
31                         channel_announcements_with_mismatched_scripts: 0,
32                 }
33         }
34 }
35
36 pub(crate) struct GossipRouter<L: Deref + Clone + Send + Sync + 'static> where L::Target: Logger {
37         native_router: P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>,
38         pub(crate) counter: RwLock<GossipCounter>,
39         sender: mpsc::Sender<GossipMessage>,
40         verifier: Arc<ChainVerifier<L>>,
41         outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>>,
42 }
43
44 impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
45         pub(crate) fn new(network_graph: Arc<NetworkGraph<L>>, sender: mpsc::Sender<GossipMessage>, logger: L) -> Self {
46                 let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone()));
47                 let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone()));
48                 Self {
49                         native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()),
50                         outbound_gossiper,
51                         counter: RwLock::new(GossipCounter::new()),
52                         sender,
53                         verifier
54                 }
55         }
56
57         pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager<L>) {
58                 self.verifier.set_ph(peer_handler);
59         }
60
61         fn new_channel_announcement(&self, msg: ChannelAnnouncement) {
62                 {
63                         let mut counter = self.counter.write().unwrap();
64                         counter.channel_announcements += 1;
65                 }
66
67                 let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
68                 if let Err(err) = self.sender.try_send(gossip_message) {
69                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
70                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
71                                 self.sender.send(gossip_message).await.unwrap();
72                         })});
73                 }
74         }
75
76         fn new_node_announcement(&self, msg: NodeAnnouncement) {
77                 {
78                         let mut counter = self.counter.write().unwrap();
79                         counter.node_announcements += 1;
80                 }
81
82                 let gossip_message = GossipMessage::NodeAnnouncement(msg, None);
83                 if let Err(err) = self.sender.try_send(gossip_message) {
84                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
85                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
86                                 self.sender.send(gossip_message).await.unwrap();
87                         })});
88                 }
89         }
90
91         fn new_channel_update(&self, msg: ChannelUpdate) {
92                 self.counter.write().unwrap().channel_updates += 1;
93                 let gossip_message = GossipMessage::ChannelUpdate(msg, None);
94
95                 if let Err(err) = self.sender.try_send(gossip_message) {
96                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
97                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
98                                 self.sender.send(gossip_message).await.unwrap();
99                         })});
100                 }
101         }
102 }
103
104 impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<L> where L::Target: Logger {
105         fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
106                 let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
107                 for ev in gossip_evs {
108                         match ev {
109                                 MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => {
110                                         self.new_channel_announcement(msg);
111                                 },
112                                 MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
113                                         self.new_node_announcement(msg);
114                                 },
115                                 MessageSendEvent::BroadcastChannelUpdate { msg } => {
116                                         self.new_channel_update(msg);
117                                 },
118                                 _ => { unreachable!() },
119                         }
120                 }
121                 self.native_router.get_and_clear_pending_msg_events()
122         }
123 }
124
125 impl<L: Deref + Clone + Send + Sync> RoutingMessageHandler for GossipRouter<L> where L::Target: Logger {
126         fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
127                 let res = self.native_router.handle_node_announcement(msg)?;
128                 self.new_node_announcement(msg.clone());
129                 Ok(res)
130         }
131
132         fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
133                 let res = self.native_router.handle_channel_announcement(msg)?;
134                 self.new_channel_announcement(msg.clone());
135                 Ok(res)
136         }
137
138         fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
139                 let res = self.native_router.handle_channel_update(msg)?;
140                 self.new_channel_update(msg.clone());
141                 Ok(res)
142         }
143
144         fn processing_queue_high(&self) -> bool {
145                 self.native_router.processing_queue_high()
146         }
147
148         fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
149                 self.native_router.get_next_channel_announcement(starting_point)
150         }
151
152         fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> {
153                 self.native_router.get_next_node_announcement(starting_point)
154         }
155
156         fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> {
157                 self.native_router.peer_connected(their_node_id, init, inbound)
158         }
159
160         fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
161                 self.native_router.handle_reply_channel_range(their_node_id, msg)
162         }
163
164         fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
165                 self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg)
166         }
167
168         fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> {
169                 self.native_router.handle_query_channel_range(their_node_id, msg)
170         }
171
172         fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
173                 self.native_router.handle_query_short_channel_ids(their_node_id, msg)
174         }
175
176         fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures {
177                 self.native_router.provided_init_features(their_node_id)
178         }
179
180         fn provided_node_features(&self) -> NodeFeatures {
181                 self.native_router.provided_node_features()
182         }
183 }