From: Elias Rohrer Date: Tue, 3 May 2022 07:13:09 +0000 (+0200) Subject: Initiate sync only after receiving `GossipTimestampFilter`. X-Git-Tag: v0.0.107~45^2 X-Git-Url: http://git.bitcoin.ninja/index.cgi?p=rust-lightning;a=commitdiff_plain;h=f8a196c8e32584a25d3f898f8b9e034f126e40c3 Initiate sync only after receiving `GossipTimestampFilter`. --- diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index d12f8c06..3207f3f1 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -339,6 +339,7 @@ struct Peer { msgs_sent_since_pong: usize, awaiting_pong_timer_tick_intervals: i8, received_message_since_timer_tick: bool, + sent_gossip_timestamp_filter: bool, } impl Peer { @@ -348,7 +349,11 @@ impl Peer { /// announcements/updates for the given channel_id then we will send it when we get to that /// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already /// sent the old versions, we should send the update, and so return true here. - fn should_forward_channel_announcement(&self, channel_id: u64)->bool{ + fn should_forward_channel_announcement(&self, channel_id: u64) -> bool { + if self.their_features.as_ref().unwrap().supports_gossip_queries() && + !self.sent_gossip_timestamp_filter { + return false; + } match self.sync_status { InitSyncTracker::NoSyncRequested => true, InitSyncTracker::ChannelsSyncing(i) => i < channel_id, @@ -358,6 +363,10 @@ impl Peer { /// Similar to the above, but for node announcements indexed by node_id. fn should_forward_node_announcement(&self, node_id: PublicKey) -> bool { + if self.their_features.as_ref().unwrap().supports_gossip_queries() && + !self.sent_gossip_timestamp_filter { + return false; + } match self.sync_status { InitSyncTracker::NoSyncRequested => true, InitSyncTracker::ChannelsSyncing(_) => false, @@ -619,6 +628,7 @@ impl P msgs_sent_since_pong: 0, awaiting_pong_timer_tick_intervals: 0, received_message_since_timer_tick: false, + sent_gossip_timestamp_filter: false, }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -665,6 +675,7 @@ impl P msgs_sent_since_pong: 0, awaiting_pong_timer_tick_intervals: 0, received_message_since_timer_tick: false, + sent_gossip_timestamp_filter: false, }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -1058,7 +1069,8 @@ impl P log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.features); - if msg.features.initial_routing_sync() { + // For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter. + if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() { peer.sync_status = InitSyncTracker::ChannelsSyncing(0); } if !msg.features.supports_static_remote_key() { @@ -1205,7 +1217,13 @@ impl P self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?; }, wire::Message::GossipTimestampFilter(_msg) => { - // TODO: handle message + // When supporting gossip messages, start inital gossip sync only after we receive + // a GossipTimestampFilter + if peer.their_features.as_ref().unwrap().supports_gossip_queries() && + !peer.sent_gossip_timestamp_filter { + peer.sent_gossip_timestamp_filter = true; + peer.sync_status = InitSyncTracker::ChannelsSyncing(0); + } }, // Unknown messages: @@ -1799,6 +1817,8 @@ mod tests { assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); peer_b.process_events(); assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); + peer_a.process_events(); + assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); (fd_a.clone(), fd_b.clone()) } @@ -1862,21 +1882,21 @@ mod tests { let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]); // Make each peer to read the messages that the other peer just wrote to them. Note that - // due to the max-messagse-before-ping limits this may take a few iterations to complete. + // due to the max-message-before-ping limits this may take a few iterations to complete. for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 { - peers[0].process_events(); - let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0); - assert!(!b_read_data.is_empty()); - - peers[1].read_event(&mut fd_b, &b_read_data).unwrap(); peers[1].process_events(); - let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0); assert!(!a_read_data.is_empty()); + peers[0].read_event(&mut fd_a, &a_read_data).unwrap(); + peers[0].process_events(); - peers[1].process_events(); - assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages"); + let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0); + assert!(!b_read_data.is_empty()); + peers[1].read_event(&mut fd_b, &b_read_data).unwrap(); + + peers[0].process_events(); + assert_eq!(fd_a.outbound_data.lock().unwrap().len(), 0, "Until A receives data, it shouldn't send more messages"); } // Check that each peer has received the expected number of channel updates and channel diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 3c36cdf0..f6872430 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -49,6 +49,9 @@ use core::{cmp, mem}; use bitcoin::bech32::u5; use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial}; +#[cfg(feature = "std")] +use std::time::{SystemTime, UNIX_EPOCH}; + pub struct TestVecWriter(pub Vec); impl Writer for TestVecWriter { fn write_all(&mut self, buf: &[u8]) -> Result<(), io::Error> { @@ -341,6 +344,7 @@ fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate { pub struct TestRoutingMessageHandler { pub chan_upds_recvd: AtomicUsize, pub chan_anns_recvd: AtomicUsize, + pub pending_events: Mutex>, pub request_full_sync: AtomicBool, } @@ -349,6 +353,7 @@ impl TestRoutingMessageHandler { TestRoutingMessageHandler { chan_upds_recvd: AtomicUsize::new(0), chan_anns_recvd: AtomicUsize::new(0), + pending_events: Mutex::new(vec![]), request_full_sync: AtomicBool::new(false), } } @@ -384,7 +389,35 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { Vec::new() } - fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &msgs::Init) {} + fn peer_connected(&self, their_node_id: &PublicKey, init_msg: &msgs::Init) { + if !init_msg.features.supports_gossip_queries() { + return (); + } + + let should_request_full_sync = self.request_full_sync.load(Ordering::Acquire); + + #[allow(unused_mut, unused_assignments)] + let mut gossip_start_time = 0; + #[cfg(feature = "std")] + { + gossip_start_time = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs(); + if should_request_full_sync { + gossip_start_time -= 60 * 60 * 24 * 7 * 2; // 2 weeks ago + } else { + gossip_start_time -= 60 * 60; // an hour ago + } + } + + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push(events::MessageSendEvent::SendGossipTimestampFilter { + node_id: their_node_id.clone(), + msg: msgs::GossipTimestampFilter { + chain_hash: genesis_block(Network::Testnet).header.block_hash(), + first_timestamp: gossip_start_time as u32, + timestamp_range: u32::max_value(), + }, + }); + } fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> { Ok(()) @@ -405,7 +438,10 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { impl events::MessageSendEventsProvider for TestRoutingMessageHandler { fn get_and_clear_pending_msg_events(&self) -> Vec { - vec![] + let mut ret = Vec::new(); + let mut pending_events = self.pending_events.lock().unwrap(); + core::mem::swap(&mut ret, &mut pending_events); + ret } }