Allow setting custom gossip seen timestamps.
[rapid-gossip-sync-server] / src / downloader.rs
1 use std::ops::Deref;
2 use std::sync::{Arc, RwLock};
3
4 use bitcoin::secp256k1::PublicKey;
5 use lightning::events::{MessageSendEvent, MessageSendEventsProvider};
6 use lightning::ln::features::{InitFeatures, NodeFeatures};
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
8 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
9 use lightning::util::logger::Logger;
10 use tokio::sync::mpsc;
11 use tokio::sync::mpsc::error::TrySendError;
12
13 use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
14 use crate::verifier::ChainVerifier;
15
16 pub(crate) struct GossipCounter {
17         pub(crate) channel_announcements: u64,
18         pub(crate) channel_updates: u64,
19         pub(crate) channel_updates_without_htlc_max_msats: u64,
20         pub(crate) channel_announcements_with_mismatched_scripts: u64
21 }
22
23 impl GossipCounter {
24         pub(crate) fn new() -> Self {
25                 Self {
26                         channel_announcements: 0,
27                         channel_updates: 0,
28                         channel_updates_without_htlc_max_msats: 0,
29                         channel_announcements_with_mismatched_scripts: 0,
30                 }
31         }
32 }
33
34 pub(crate) struct GossipRouter<L: Deref + Clone + Send + Sync + 'static> where L::Target: Logger {
35         native_router: P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>,
36         pub(crate) counter: RwLock<GossipCounter>,
37         sender: mpsc::Sender<GossipMessage>,
38         verifier: Arc<ChainVerifier<L>>,
39         outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>>,
40 }
41
42 impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
43         pub(crate) fn new(network_graph: Arc<NetworkGraph<L>>, sender: mpsc::Sender<GossipMessage>, logger: L) -> Self {
44                 let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone()));
45                 let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone()));
46                 Self {
47                         native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()),
48                         outbound_gossiper,
49                         counter: RwLock::new(GossipCounter::new()),
50                         sender,
51                         verifier
52                 }
53         }
54
55         pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager<L>) {
56                 self.verifier.set_ph(peer_handler);
57         }
58
59         fn new_channel_announcement(&self, msg: ChannelAnnouncement) {
60                 {
61                         let mut counter = self.counter.write().unwrap();
62                         counter.channel_announcements += 1;
63                 }
64
65                 let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
66                 if let Err(err) = self.sender.try_send(gossip_message) {
67                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
68                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
69                                 self.sender.send(gossip_message).await.unwrap();
70                         })});
71                 }
72         }
73
74         fn new_channel_update(&self, msg: ChannelUpdate) {
75                 self.counter.write().unwrap().channel_updates += 1;
76                 let gossip_message = GossipMessage::ChannelUpdate(msg, None);
77
78                 if let Err(err) = self.sender.try_send(gossip_message) {
79                         let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
80                         tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
81                                 self.sender.send(gossip_message).await.unwrap();
82                         })});
83                 }
84         }
85 }
86
87 impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<L> where L::Target: Logger {
88         fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
89                 let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
90                 for ev in gossip_evs {
91                         match ev {
92                                 MessageSendEvent::BroadcastChannelAnnouncement { msg, .. } => {
93                                         self.new_channel_announcement(msg);
94                                 },
95                                 MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
96                                 MessageSendEvent::BroadcastChannelUpdate { msg } => {
97                                         self.new_channel_update(msg);
98                                 },
99                                 _ => { unreachable!() },
100                         }
101                 }
102                 self.native_router.get_and_clear_pending_msg_events()
103         }
104 }
105
106 impl<L: Deref + Clone + Send + Sync> RoutingMessageHandler for GossipRouter<L> where L::Target: Logger {
107         fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
108                 self.native_router.handle_node_announcement(msg)
109         }
110
111         fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
112                 let res = self.native_router.handle_channel_announcement(msg)?;
113                 self.new_channel_announcement(msg.clone());
114                 Ok(res)
115         }
116
117         fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
118                 let res = self.native_router.handle_channel_update(msg)?;
119                 self.new_channel_update(msg.clone());
120                 Ok(res)
121         }
122
123         fn processing_queue_high(&self) -> bool {
124                 self.native_router.processing_queue_high()
125         }
126
127         fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
128                 self.native_router.get_next_channel_announcement(starting_point)
129         }
130
131         fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> {
132                 self.native_router.get_next_node_announcement(starting_point)
133         }
134
135         fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> {
136                 self.native_router.peer_connected(their_node_id, init, inbound)
137         }
138
139         fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
140                 self.native_router.handle_reply_channel_range(their_node_id, msg)
141         }
142
143         fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
144                 self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg)
145         }
146
147         fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> {
148                 self.native_router.handle_query_channel_range(their_node_id, msg)
149         }
150
151         fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
152                 self.native_router.handle_query_short_channel_ids(their_node_id, msg)
153         }
154
155         fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures {
156                 self.native_router.provided_init_features(their_node_id)
157         }
158
159         fn provided_node_features(&self) -> NodeFeatures {
160                 self.native_router.provided_node_features()
161         }
162 }