Initiate sync only after receiving `GossipTimestampFilter`.
authorElias Rohrer <ero@tnull.de>
Tue, 3 May 2022 07:13:09 +0000 (09:13 +0200)
committerElias Rohrer <ero@tnull.de>
Tue, 3 May 2022 07:13:09 +0000 (09:13 +0200)
lightning/src/ln/peer_handler.rs
lightning/src/util/test_utils.rs

index d12f8c06eed0409630864359e54c20d3b714930d..3207f3f179be91359d24387d4ea89a53a66392bd 100644 (file)
@@ -339,6 +339,7 @@ struct Peer {
        msgs_sent_since_pong: usize,
        awaiting_pong_timer_tick_intervals: i8,
        received_message_since_timer_tick: bool,
+       sent_gossip_timestamp_filter: bool,
 }
 
 impl Peer {
@@ -348,7 +349,11 @@ impl Peer {
        /// announcements/updates for the given channel_id then we will send it when we get to that
        /// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
        /// sent the old versions, we should send the update, and so return true here.
-       fn should_forward_channel_announcement(&self, channel_id: u64)->bool{
+       fn should_forward_channel_announcement(&self, channel_id: u64) -> bool {
+               if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
+                       !self.sent_gossip_timestamp_filter {
+                               return false;
+                       }
                match self.sync_status {
                        InitSyncTracker::NoSyncRequested => true,
                        InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
@@ -358,6 +363,10 @@ impl Peer {
 
        /// Similar to the above, but for node announcements indexed by node_id.
        fn should_forward_node_announcement(&self, node_id: PublicKey) -> bool {
+               if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
+                       !self.sent_gossip_timestamp_filter {
+                               return false;
+                       }
                match self.sync_status {
                        InitSyncTracker::NoSyncRequested => true,
                        InitSyncTracker::ChannelsSyncing(_) => false,
@@ -619,6 +628,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                        msgs_sent_since_pong: 0,
                        awaiting_pong_timer_tick_intervals: 0,
                        received_message_since_timer_tick: false,
+                       sent_gossip_timestamp_filter: false,
                }).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -665,6 +675,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                        msgs_sent_since_pong: 0,
                        awaiting_pong_timer_tick_intervals: 0,
                        received_message_since_timer_tick: false,
+                       sent_gossip_timestamp_filter: false,
                }).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -1058,7 +1069,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
                                log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.features);
 
-                               if msg.features.initial_routing_sync() {
+                               // For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter.
+                               if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() {
                                        peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
                                }
                                if !msg.features.supports_static_remote_key() {
@@ -1205,7 +1217,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
                        },
                        wire::Message::GossipTimestampFilter(_msg) => {
-                               // TODO: handle message
+                               // When supporting gossip messages, start inital gossip sync only after we receive
+                               // a GossipTimestampFilter
+                               if peer.their_features.as_ref().unwrap().supports_gossip_queries() &&
+                                       !peer.sent_gossip_timestamp_filter {
+                                       peer.sent_gossip_timestamp_filter = true;
+                                       peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
+                               }
                        },
 
                        // Unknown messages:
@@ -1799,6 +1817,8 @@ mod tests {
                assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
                peer_b.process_events();
                assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               peer_a.process_events();
+               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
                (fd_a.clone(), fd_b.clone())
        }
 
@@ -1862,21 +1882,21 @@ mod tests {
                let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
 
                // Make each peer to read the messages that the other peer just wrote to them. Note that
-               // due to the max-messagse-before-ping limits this may take a few iterations to complete.
+               // due to the max-message-before-ping limits this may take a few iterations to complete.
                for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
-                       peers[0].process_events();
-                       let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
-                       assert!(!b_read_data.is_empty());
-
-                       peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
                        peers[1].process_events();
-
                        let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
                        assert!(!a_read_data.is_empty());
+
                        peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
+                       peers[0].process_events();
 
-                       peers[1].process_events();
-                       assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
+                       let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+                       assert!(!b_read_data.is_empty());
+                       peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
+
+                       peers[0].process_events();
+                       assert_eq!(fd_a.outbound_data.lock().unwrap().len(), 0, "Until A receives data, it shouldn't send more messages");
                }
 
                // Check that each peer has received the expected number of channel updates and channel
index 3c36cdf066a5048bbb1b6fe112c2906ac824e6cc..f68724309c0ae0195da8b3e3dfb98cd8f115dc64 100644 (file)
@@ -49,6 +49,9 @@ use core::{cmp, mem};
 use bitcoin::bech32::u5;
 use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial};
 
+#[cfg(feature = "std")]
+use std::time::{SystemTime, UNIX_EPOCH};
+
 pub struct TestVecWriter(pub Vec<u8>);
 impl Writer for TestVecWriter {
        fn write_all(&mut self, buf: &[u8]) -> Result<(), io::Error> {
@@ -341,6 +344,7 @@ fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
 pub struct TestRoutingMessageHandler {
        pub chan_upds_recvd: AtomicUsize,
        pub chan_anns_recvd: AtomicUsize,
+       pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
        pub request_full_sync: AtomicBool,
 }
 
@@ -349,6 +353,7 @@ impl TestRoutingMessageHandler {
                TestRoutingMessageHandler {
                        chan_upds_recvd: AtomicUsize::new(0),
                        chan_anns_recvd: AtomicUsize::new(0),
+                       pending_events: Mutex::new(vec![]),
                        request_full_sync: AtomicBool::new(false),
                }
        }
@@ -384,7 +389,35 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
                Vec::new()
        }
 
-       fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &msgs::Init) {}
+       fn peer_connected(&self, their_node_id: &PublicKey, init_msg: &msgs::Init) {
+               if !init_msg.features.supports_gossip_queries() {
+                       return ();
+               }
+
+               let should_request_full_sync = self.request_full_sync.load(Ordering::Acquire);
+
+               #[allow(unused_mut, unused_assignments)]
+               let mut gossip_start_time = 0;
+               #[cfg(feature = "std")]
+               {
+                       gossip_start_time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs();
+                       if should_request_full_sync {
+                               gossip_start_time -= 60 * 60 * 24 * 7 * 2; // 2 weeks ago
+                       } else {
+                               gossip_start_time -= 60 * 60; // an hour ago
+                       }
+               }
+
+               let mut pending_events = self.pending_events.lock().unwrap();
+               pending_events.push(events::MessageSendEvent::SendGossipTimestampFilter {
+                       node_id: their_node_id.clone(),
+                       msg: msgs::GossipTimestampFilter {
+                               chain_hash: genesis_block(Network::Testnet).header.block_hash(),
+                               first_timestamp: gossip_start_time as u32,
+                               timestamp_range: u32::max_value(),
+                       },
+               });
+       }
 
        fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> {
                Ok(())
@@ -405,7 +438,10 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
 
 impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
        fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
-               vec![]
+               let mut ret = Vec::new();
+               let mut pending_events = self.pending_events.lock().unwrap();
+               core::mem::swap(&mut ret, &mut pending_events);
+               ret
        }
 }