Initial commit.
[rapid-gossip-sync-server] / src / downloader.rs
1 use std::sync::{Arc, RwLock};
2 use std::time::{SystemTime, UNIX_EPOCH};
3
4 use bitcoin::secp256k1::PublicKey;
5 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
6 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
7 use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
8 use tokio::sync::mpsc;
9
10 use crate::{GossipChainAccess, TestLogger};
11 use crate::types::{DetectedGossipMessage, GossipMessage};
12
13 pub(crate) struct GossipCounter {
14         pub(crate) channel_announcements: u64,
15         pub(crate) channel_updates: u64,
16         pub(crate) channel_updates_without_htlc_max_msats: u64,
17         pub(crate) channel_announcements_with_mismatched_scripts: u64
18 }
19
20 impl GossipCounter {
21         pub(crate) fn new() -> Self {
22                 Self {
23                         channel_announcements: 0,
24                         channel_updates: 0,
25                         channel_updates_without_htlc_max_msats: 0,
26                         channel_announcements_with_mismatched_scripts: 0,
27                 }
28         }
29 }
30
31 pub(crate) struct GossipRouter {
32         pub(crate) native_router: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<TestLogger>>>, GossipChainAccess, Arc<TestLogger>>>,
33         pub(crate) counter: RwLock<GossipCounter>,
34         pub(crate) sender: mpsc::Sender<DetectedGossipMessage>,
35 }
36
37 impl MessageSendEventsProvider for GossipRouter {
38         fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
39                 self.native_router.get_and_clear_pending_msg_events()
40         }
41 }
42
43 impl RoutingMessageHandler for GossipRouter {
44         fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
45                 self.native_router.handle_node_announcement(msg)
46         }
47
48         fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
49                 let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
50
51                 let mut counter = self.counter.write().unwrap();
52
53                 let output_value = self.native_router.handle_channel_announcement(msg).map_err(|error| {
54                         let error_string = format!("{:?}", error);
55                         if error_string.contains("announced on an unknown chain"){
56                                 return error;
57                         }
58                         counter.channel_announcements_with_mismatched_scripts += 1;
59                         error
60                 })?;
61
62                 counter.channel_announcements += 1;
63                 let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
64                 let detected_gossip_message = DetectedGossipMessage {
65                         message: gossip_message,
66                         timestamp_seen: timestamp_seen as u32,
67                 };
68                 let sender = self.sender.clone();
69                 tokio::spawn(async move {
70                         let _ = sender.send(detected_gossip_message).await;
71                 });
72
73                 Ok(output_value)
74         }
75
76         fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
77                 let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
78                 let output_value = self.native_router.handle_channel_update(msg)?;
79
80                 let mut counter = self.counter.write().unwrap();
81                 counter.channel_updates += 1;
82                 let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
83                 let detected_gossip_message = DetectedGossipMessage {
84                         message: gossip_message,
85                         timestamp_seen: timestamp_seen as u32,
86                 };
87                 let sender = self.sender.clone();
88                 tokio::spawn(async move {
89                         let _ = sender.send(detected_gossip_message).await;
90                 });
91
92                 Ok(output_value)
93         }
94
95         fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
96                 self.native_router.get_next_channel_announcements(starting_point, batch_amount)
97         }
98
99         fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement> {
100                 self.native_router.get_next_node_announcements(starting_point, batch_amount)
101         }
102
103         fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) {
104                 self.native_router.peer_connected(their_node_id, init)
105         }
106
107         fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
108                 self.native_router.handle_reply_channel_range(their_node_id, msg)
109         }
110
111         fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
112                 self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg)
113         }
114
115         fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> {
116                 self.native_router.handle_query_channel_range(their_node_id, msg)
117         }
118
119         fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
120                 self.native_router.handle_query_short_channel_ids(their_node_id, msg)
121         }
122 }