Merge pull request #736 from bmancini55/gossip_queries
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Tue, 15 Dec 2020 19:17:11 +0000 (11:17 -0800)
committerGitHub <noreply@github.com>
Tue, 15 Dec 2020 19:17:11 +0000 (11:17 -0800)
Initiate gossip_queries

13 files changed:
fuzz/src/full_stack.rs
fuzz/src/router.rs
lightning-net-tokio/src/lib.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/features.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/msgs.rs
lightning/src/ln/peer_handler.rs
lightning/src/ln/wire.rs
lightning/src/routing/network_graph.rs
lightning/src/routing/router.rs
lightning/src/util/events.rs
lightning/src/util/test_utils.rs

index 3aeb3d233b0b19d62196466c3de3f2a23dfe98dd..6375d0765d76a3d4a7940d4781ef2b574b90b564 100644 (file)
@@ -19,6 +19,7 @@ use bitcoin::blockdata::script::{Builder, Script};
 use bitcoin::blockdata::opcodes;
 use bitcoin::consensus::encode::deserialize;
 use bitcoin::network::constants::Network;
+use bitcoin::blockdata::constants::genesis_block;
 
 use bitcoin::hashes::Hash as TraitImport;
 use bitcoin::hashes::HashEngine as TraitImportEngine;
@@ -343,7 +344,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
        config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
        let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0));
        let our_id = PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret());
-       let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(None, Arc::clone(&logger)));
+       let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(genesis_block(Network::Bitcoin).header.block_hash(), None, Arc::clone(&logger)));
 
        let peers = RefCell::new([false; 256]);
        let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
@@ -609,7 +610,7 @@ mod tests {
                // What each byte represents is broken down below, and then everything is concatenated into
                // one large test at the end (you want %s/ -.*//g %s/\n\| \|\t\|\///g).
 
-               // Following BOLT 8, lightning message on the wire are: 2-byte encrypted message length + 
+               // Following BOLT 8, lightning message on the wire are: 2-byte encrypted message length +
                // 16-byte MAC of the encrypted message length + encrypted Lightning message + 16-byte MAC
                // of the Lightning message
                // I.e 2nd inbound read, len 18 : 0006 (encrypted message length) + 03000000000000000000000000000000 (MAC of the encrypted message length)
index 671adcfda51ff48d3d1f6f18db8f5cda01426a14..4eb85714f3e22a813393a6c47a1e50bab7caaaf9 100644 (file)
@@ -21,6 +21,8 @@ use lightning::util::ser::Readable;
 use lightning::routing::network_graph::{NetworkGraph, RoutingFees};
 
 use bitcoin::secp256k1::key::PublicKey;
+use bitcoin::network::constants::Network;
+use bitcoin::blockdata::constants::genesis_block;
 
 use utils::test_logger;
 
@@ -155,7 +157,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
        let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new("".to_owned(), out));
 
        let our_pubkey = get_pubkey!();
-       let mut net_graph = NetworkGraph::new();
+       let mut net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash());
 
        let mut node_pks = HashSet::new();
        let mut scid = 42;
index 36384380fb14a458b31e132446ae374ffaf0b51d..8e5885ca9bf2d7b67b017d8575a4059e9ecc5ce2 100644 (file)
@@ -535,7 +535,11 @@ mod tests {
                fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { }
                fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
                fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
-               fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false }
+               fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &Init) { }
+               fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
+               fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
+               fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
+               fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
        }
        impl ChannelMessageHandler for MsgHandler {
                fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {}
index 0f5e7f8ad63736dcf5f567f3cd5a9af920a57735..c39eef3ea477a9d7558b69629601c4be14d6dbfc 100644 (file)
@@ -3404,6 +3404,8 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                        &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
                                        &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id,
                                        &events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true,
+                                       &events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
+                                       &events::MessageSendEvent::SendShortIdsQuery { .. } => false,
                                }
                        });
                }
index f573ac4c43abd910c96764e123913901a3af1f2e..3ce3d4fa7e604b9e0fb979dd8f4da0911adbdda2 100644 (file)
@@ -104,7 +104,7 @@ mod sealed {
                ],
                optional_features: [
                        // Byte 0
-                       DataLossProtect | InitialRoutingSync | UpfrontShutdownScript,
+                       DataLossProtect | InitialRoutingSync | UpfrontShutdownScript | GossipQueries,
                        // Byte 1
                        VariableLengthOnion | PaymentSecret,
                        // Byte 2
@@ -122,7 +122,7 @@ mod sealed {
                ],
                optional_features: [
                        // Byte 0
-                       DataLossProtect | UpfrontShutdownScript,
+                       DataLossProtect | UpfrontShutdownScript | GossipQueries,
                        // Byte 1
                        VariableLengthOnion | PaymentSecret,
                        // Byte 2
@@ -243,6 +243,8 @@ mod sealed {
                "Feature flags for `initial_routing_sync`.");
        define_feature!(5, UpfrontShutdownScript, [InitContext, NodeContext],
                "Feature flags for `option_upfront_shutdown_script`.");
+       define_feature!(7, GossipQueries, [InitContext, NodeContext],
+               "Feature flags for `gossip_queries`.");
        define_feature!(9, VariableLengthOnion, [InitContext, NodeContext],
                "Feature flags for `var_onion_optin`.");
        define_feature!(13, StaticRemoteKey, [InitContext, NodeContext],
@@ -473,6 +475,22 @@ impl<T: sealed::UpfrontShutdownScript> Features<T> {
        }
 }
 
+
+impl<T: sealed::GossipQueries> Features<T> {
+       #[cfg(test)]
+       pub(crate) fn requires_gossip_queries(&self) -> bool {
+               <T as sealed::GossipQueries>::requires_feature(&self.flags)
+       }
+       pub(crate) fn supports_gossip_queries(&self) -> bool {
+               <T as sealed::GossipQueries>::supports_feature(&self.flags)
+       }
+       #[cfg(test)]
+       pub(crate) fn clear_gossip_queries(mut self) -> Self {
+               <T as sealed::GossipQueries>::clear_bits(&mut self.flags);
+               self
+       }
+}
+
 impl<T: sealed::VariableLengthOnion> Features<T> {
        #[cfg(test)]
        pub(crate) fn requires_variable_length_onion(&self) -> bool {
@@ -497,6 +515,10 @@ impl<T: sealed::InitialRoutingSync> Features<T> {
        pub(crate) fn initial_routing_sync(&self) -> bool {
                <T as sealed::InitialRoutingSync>::supports_feature(&self.flags)
        }
+       // We are no longer setting initial_routing_sync now that gossip_queries
+       // is enabled. This feature is ignored by a peer when gossip_queries has 
+       // been negotiated.
+       #[cfg(test)]
        pub(crate) fn clear_initial_routing_sync(&mut self) {
                <T as sealed::InitialRoutingSync>::clear_bits(&mut self.flags)
        }
@@ -568,6 +590,11 @@ mod tests {
                assert!(!InitFeatures::known().requires_upfront_shutdown_script());
                assert!(!NodeFeatures::known().requires_upfront_shutdown_script());
 
+               assert!(InitFeatures::known().supports_gossip_queries());
+               assert!(NodeFeatures::known().supports_gossip_queries());
+               assert!(!InitFeatures::known().requires_gossip_queries());
+               assert!(!NodeFeatures::known().requires_gossip_queries());
+
                assert!(InitFeatures::known().supports_data_loss_protect());
                assert!(NodeFeatures::known().supports_data_loss_protect());
                assert!(!InitFeatures::known().requires_data_loss_protect());
@@ -620,9 +647,10 @@ mod tests {
 
        #[test]
        fn convert_to_context_with_relevant_flags() {
-               let init_features = InitFeatures::known().clear_upfront_shutdown_script();
+               let init_features = InitFeatures::known().clear_upfront_shutdown_script().clear_gossip_queries();
                assert!(init_features.initial_routing_sync());
                assert!(!init_features.supports_upfront_shutdown_script());
+               assert!(!init_features.supports_gossip_queries());
 
                let node_features: NodeFeatures = init_features.to_context();
                {
@@ -639,8 +667,10 @@ mod tests {
                // Check that cleared flags are kept blank when converting back:
                // - initial_routing_sync was not applicable to NodeContext
                // - upfront_shutdown_script was cleared before converting
+               // - gossip_queries was cleared before converting
                let features: InitFeatures = node_features.to_context_internal();
                assert!(!features.initial_routing_sync());
                assert!(!features.supports_upfront_shutdown_script());
+               assert!(!init_features.supports_gossip_queries());
        }
 }
index 8ff3d4eacf85c3972d7dd322747570b9c66779fc..d54045f8e619d7d3075482d7e893279ca04f7686 100644 (file)
@@ -1171,7 +1171,7 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
        let payment_count = Rc::new(RefCell::new(0));
 
        for i in 0..node_count {
-               let net_graph_msg_handler = NetGraphMsgHandler::new(None, cfgs[i].logger);
+               let net_graph_msg_handler = NetGraphMsgHandler::new(cfgs[i].chain_source.genesis_hash, None, cfgs[i].logger);
                nodes.push(Node{ chain_source: cfgs[i].chain_source,
                                 tx_broadcaster: cfgs[i].tx_broadcaster, chain_monitor: &cfgs[i].chain_monitor,
                                 keys_manager: &cfgs[i].keys_manager, node: &chan_mgrs[i], net_graph_msg_handler,
index f57cf0fd1fb9304fac06f34fad5164e6e8a1dfe4..25553c7080fdaa46e9268978d8348e5be00140cb 100644 (file)
@@ -804,7 +804,13 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
 }
 
 /// A trait to describe an object which can receive routing messages.
-pub trait RoutingMessageHandler : Send + Sync {
+///
+/// # Implementor DoS Warnings
+///
+/// For `gossip_queries` messages there are potential DoS vectors when handling
+/// inbound queries. Implementors using an on-disk network graph should be aware of
+/// repeated disk I/O for queries accessing different parts of the network graph.
+pub trait RoutingMessageHandler : Send + Sync + events::MessageSendEventsProvider {
        /// Handle an incoming node_announcement message, returning true if it should be forwarded on,
        /// false or returning an Err otherwise.
        fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError>;
@@ -825,8 +831,25 @@ pub trait RoutingMessageHandler : Send + Sync {
        /// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
        /// If None is provided for starting_point, we start at the first node.
        fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
-       /// Returns whether a full sync should be requested from a peer.
-       fn should_request_full_sync(&self, node_id: &PublicKey) -> bool;
+       /// Called when a connection is established with a peer. This can be used to
+       /// perform routing table synchronization using a strategy defined by the
+       /// implementor.
+       fn sync_routing_table(&self, their_node_id: &PublicKey, init: &Init);
+       /// Handles the reply of a query we initiated to learn about channels
+       /// for a given range of blocks. We can expect to receive one or more
+       /// replies to a single query.
+       fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError>;
+       /// Handles the reply of a query we initiated asking for routing gossip
+       /// messages for a list of channels. We should receive this message when
+       /// a node has completed its best effort to send us the pertaining routing
+       /// gossip messages.
+       fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError>;
+       /// Handles when a peer asks us to send a list of short_channel_ids
+       /// for the requested range of blocks.
+       fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError>;
+       /// Handles when a peer asks us to send routing gossip messages for a
+       /// list of short_channel_ids.
+       fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
 }
 
 mod fuzzy_internal_msgs {
index 6e997e53b156770d237bfbe615eff0096b77c17a..890df356cc0e5eb1314ff2a17358364cabf9a265 100644 (file)
@@ -584,11 +584,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
 
                                                                        peer.their_node_id = Some(their_node_id);
                                                                        insert_node_id!();
-                                                                       let mut features = InitFeatures::known();
-                                                                       if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
-                                                                               features.clear_initial_routing_sync();
-                                                                       }
-
+                                                                       let features = InitFeatures::known();
                                                                        let resp = msgs::Init { features };
                                                                        self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
                                                                },
@@ -694,10 +690,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                }
 
                                log_info!(
-                                       self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unknown flags (local and global): {}",
+                                       self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, gossip_queries: {}, static_remote_key: {}, unknown flags (local and global): {}",
                                        if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
                                        if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
                                        if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
+                                       if msg.features.supports_gossip_queries() { "supported" } else { "not supported" },
                                        if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
                                        if msg.features.supports_unknown_bits() { "present" } else { "none" }
                                );
@@ -712,15 +709,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                }
 
                                if !peer.outbound {
-                                       let mut features = InitFeatures::known();
-                                       if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
-                                               features.clear_initial_routing_sync();
-                                       }
-
+                                       let features = InitFeatures::known();
                                        let resp = msgs::Init { features };
                                        self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
                                }
 
+                               self.message_handler.route_handler.sync_routing_table(&peer.their_node_id.unwrap(), &msg);
+
                                self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
                                peer.their_features = Some(msg.features);
                        },
@@ -840,6 +835,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                        // TODO: forward msg along to all our other peers!
                                }
                        },
+                       wire::Message::QueryShortChannelIds(msg) => {
+                               self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?;
+                       },
+                       wire::Message::ReplyShortChannelIdsEnd(msg) => {
+                               self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?;
+                       },
+                       wire::Message::QueryChannelRange(msg) => {
+                               self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?;
+                       },
+                       wire::Message::ReplyChannelRange(msg) => {
+                               self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
+                       },
+                       wire::Message::GossipTimestampFilter(_msg) => {
+                               // TODO: handle message
+                       },
 
                        // Unknown messages:
                        wire::Message::Unknown(msg_type) if msg_type.is_even() => {
@@ -864,6 +874,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                        // drop optional-ish messages when send buffers get full!
 
                        let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+                       events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
                        let mut peers_lock = self.peers.lock().unwrap();
                        let peers = &mut *peers_lock;
                        for event in events_generated.drain(..) {
@@ -1115,6 +1126,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                self.do_attempt_write_data(&mut descriptor, peer);
                                                        },
                                                }
+                                       },
+                                       MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+                                               self.do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        }
                                }
                        }
@@ -1302,13 +1323,6 @@ mod tests {
                (fd_a.clone(), fd_b.clone())
        }
 
-       fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
-               let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b);
-               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
-               assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
-               (fd_a.clone(), fd_b.clone())
-       }
-
        #[test]
        fn test_disconnect_peer() {
                // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
@@ -1377,41 +1391,4 @@ mod tests {
                assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
                assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
        }
-
-       #[test]
-       fn limit_initial_routing_sync_requests() {
-               // Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not.
-               {
-                       let cfgs = create_peermgr_cfgs(2);
-                       cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
-                       let peers = create_network(2, &cfgs);
-                       let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
-
-                       let peer_0 = peers[0].peers.lock().unwrap();
-                       let peer_1 = peers[1].peers.lock().unwrap();
-
-                       let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
-                       let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
-
-                       assert!(peer_0_features.unwrap().initial_routing_sync());
-                       assert!(!peer_1_features.unwrap().initial_routing_sync());
-               }
-
-               // Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not.
-               {
-                       let cfgs = create_peermgr_cfgs(2);
-                       cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
-                       let peers = create_network(2, &cfgs);
-                       let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
-
-                       let peer_0 = peers[0].peers.lock().unwrap();
-                       let peer_1 = peers[1].peers.lock().unwrap();
-
-                       let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
-                       let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
-
-                       assert!(!peer_0_features.unwrap().initial_routing_sync());
-                       assert!(peer_1_features.unwrap().initial_routing_sync());
-               }
-       }
 }
index 86d8bfdd494e5ffad0bd7a67ff361009f4aa5bb5..8197ce151ef9deceb8fa17591c04d642e29385d2 100644 (file)
@@ -55,6 +55,11 @@ pub enum Message {
        ChannelAnnouncement(msgs::ChannelAnnouncement),
        NodeAnnouncement(msgs::NodeAnnouncement),
        ChannelUpdate(msgs::ChannelUpdate),
+       QueryShortChannelIds(msgs::QueryShortChannelIds),
+       ReplyShortChannelIdsEnd(msgs::ReplyShortChannelIdsEnd),
+       QueryChannelRange(msgs::QueryChannelRange),
+       ReplyChannelRange(msgs::ReplyChannelRange),
+       GossipTimestampFilter(msgs::GossipTimestampFilter),
        /// A message that could not be decoded because its type is unknown.
        Unknown(MessageType),
 }
@@ -90,6 +95,11 @@ impl Message {
                        &Message::ChannelAnnouncement(ref msg) => msg.type_id(),
                        &Message::NodeAnnouncement(ref msg) => msg.type_id(),
                        &Message::ChannelUpdate(ref msg) => msg.type_id(),
+                       &Message::QueryShortChannelIds(ref msg) => msg.type_id(),
+                       &Message::ReplyShortChannelIdsEnd(ref msg) => msg.type_id(),
+                       &Message::QueryChannelRange(ref msg) => msg.type_id(),
+                       &Message::ReplyChannelRange(ref msg) => msg.type_id(),
+                       &Message::GossipTimestampFilter(ref msg) => msg.type_id(),
                        &Message::Unknown(type_id) => type_id,
                }
        }
@@ -186,6 +196,21 @@ pub fn read<R: ::std::io::Read>(buffer: &mut R) -> Result<Message, msgs::DecodeE
                msgs::ChannelUpdate::TYPE => {
                        Ok(Message::ChannelUpdate(Readable::read(buffer)?))
                },
+               msgs::QueryShortChannelIds::TYPE => {
+                       Ok(Message::QueryShortChannelIds(Readable::read(buffer)?))
+               },
+               msgs::ReplyShortChannelIdsEnd::TYPE => {
+                       Ok(Message::ReplyShortChannelIdsEnd(Readable::read(buffer)?))
+               },
+               msgs::QueryChannelRange::TYPE => {
+                       Ok(Message::QueryChannelRange(Readable::read(buffer)?))
+               },
+               msgs::ReplyChannelRange::TYPE => {
+                       Ok(Message::ReplyChannelRange(Readable::read(buffer)?))
+               }
+               msgs::GossipTimestampFilter::TYPE => {
+                       Ok(Message::GossipTimestampFilter(Readable::read(buffer)?))
+               },
                _ => {
                        Ok(Message::Unknown(MessageType(message_type)))
                },
@@ -312,6 +337,26 @@ impl Encode for msgs::ChannelUpdate {
        const TYPE: u16 = 258;
 }
 
+impl Encode for msgs::QueryShortChannelIds {
+       const TYPE: u16 = 261;
+}
+
+impl Encode for msgs::ReplyShortChannelIdsEnd {
+       const TYPE: u16 = 262;
+}
+
+impl Encode for msgs::QueryChannelRange {
+       const TYPE: u16 = 263;
+}
+
+impl Encode for msgs::ReplyChannelRange {
+       const TYPE: u16 = 264;
+}
+
+impl Encode for msgs::GossipTimestampFilter {
+       const TYPE: u16 = 265;
+}
+
 #[cfg(test)]
 mod tests {
        use super::*;
@@ -415,24 +460,25 @@ mod tests {
        fn read_lnd_init_msg() {
                // Taken from lnd v0.9.0-beta.
                let buffer = vec![0, 16, 0, 2, 34, 0, 0, 3, 2, 162, 161];
-               check_init_msg(buffer);
+               check_init_msg(buffer, false);
        }
 
        #[test]
        fn read_clightning_init_msg() {
                // Taken from c-lightning v0.8.0.
                let buffer = vec![0, 16, 0, 2, 34, 0, 0, 3, 2, 170, 162, 1, 32, 6, 34, 110, 70, 17, 26, 11, 89, 202, 175, 18, 96, 67, 235, 91, 191, 40, 195, 79, 58, 94, 51, 42, 31, 199, 178, 183, 60, 241, 136, 145, 15];
-               check_init_msg(buffer);
+               check_init_msg(buffer, true);
        }
 
-       fn check_init_msg(buffer: Vec<u8>) {
+       fn check_init_msg(buffer: Vec<u8>, expect_unknown: bool) {
                let mut reader = ::std::io::Cursor::new(buffer);
                let decoded_msg = read(&mut reader).unwrap();
                match decoded_msg {
                        Message::Init(msgs::Init { features }) => {
                                assert!(features.supports_variable_length_onion());
                                assert!(features.supports_upfront_shutdown_script());
-                               assert!(features.supports_unknown_bits());
+                               assert!(features.supports_gossip_queries());
+                               assert_eq!(expect_unknown, features.supports_unknown_bits());
                                assert!(!features.requires_unknown_bits());
                                assert!(!features.initial_routing_sync());
                        },
@@ -450,7 +496,7 @@ mod tests {
                        Message::NodeAnnouncement(msgs::NodeAnnouncement { contents: msgs::UnsignedNodeAnnouncement { features, ..}, ..}) => {
                                assert!(features.supports_variable_length_onion());
                                assert!(features.supports_upfront_shutdown_script());
-                               assert!(features.supports_unknown_bits());
+                               assert!(features.supports_gossip_queries());
                                assert!(!features.requires_unknown_bits());
                        },
                        _ => panic!("Expected node announcement, found message type: {}", decoded_msg.type_id())
index 54783dd1dc3cdf633efcf19d49092c03ec0a7f78..8075462c938b797d0f8d923d786be9239533317c 100644 (file)
@@ -18,19 +18,23 @@ use bitcoin::hashes::Hash;
 use bitcoin::blockdata::script::Builder;
 use bitcoin::blockdata::transaction::TxOut;
 use bitcoin::blockdata::opcodes;
+use bitcoin::hash_types::BlockHash;
 
 use chain;
 use chain::Access;
 use ln::features::{ChannelFeatures, NodeFeatures};
-use ln::msgs::{DecodeError, ErrorAction, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT};
+use ln::msgs::{DecodeError, ErrorAction, Init, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT};
 use ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, OptionalField};
+use ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds, ReplyShortChannelIdsEnd};
 use ln::msgs;
 use util::ser::{Writeable, Readable, Writer};
 use util::logger::Logger;
+use util::events;
 
 use std::{cmp, fmt};
 use std::sync::{RwLock, RwLockReadGuard};
 use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Mutex;
 use std::collections::BTreeMap;
 use std::collections::btree_map::Entry as BtreeEntry;
 use std::ops::Deref;
@@ -39,6 +43,7 @@ use bitcoin::hashes::hex::ToHex;
 /// Represents the network as nodes and channels between them
 #[derive(PartialEq)]
 pub struct NetworkGraph {
+       genesis_hash: BlockHash,
        channels: BTreeMap<u64, ChannelInfo>,
        nodes: BTreeMap<PublicKey, NodeInfo>,
 }
@@ -59,6 +64,7 @@ pub struct NetGraphMsgHandler<C: Deref, L: Deref> where C::Target: chain::Access
        pub network_graph: RwLock<NetworkGraph>,
        chain_access: Option<C>,
        full_syncs_requested: AtomicUsize,
+       pending_events: Mutex<Vec<events::MessageSendEvent>>,
        logger: L,
 }
 
@@ -68,15 +74,13 @@ impl<C: Deref, L: Deref> NetGraphMsgHandler<C, L> where C::Target: chain::Access
        /// Chain monitor is used to make sure announced channels exist on-chain,
        /// channel data is correct, and that the announcement is signed with
        /// channel owners' keys.
-       pub fn new(chain_access: Option<C>, logger: L) -> Self {
+       pub fn new(genesis_hash: BlockHash, chain_access: Option<C>, logger: L) -> Self {
                NetGraphMsgHandler {
                        secp_ctx: Secp256k1::verification_only(),
-                       network_graph: RwLock::new(NetworkGraph {
-                               channels: BTreeMap::new(),
-                               nodes: BTreeMap::new(),
-                       }),
+                       network_graph: RwLock::new(NetworkGraph::new(genesis_hash)),
                        full_syncs_requested: AtomicUsize::new(0),
                        chain_access,
+                       pending_events: Mutex::new(vec![]),
                        logger,
                }
        }
@@ -89,6 +93,7 @@ impl<C: Deref, L: Deref> NetGraphMsgHandler<C, L> where C::Target: chain::Access
                        network_graph: RwLock::new(network_graph),
                        full_syncs_requested: AtomicUsize::new(0),
                        chain_access,
+                       pending_events: Mutex::new(vec![]),
                        logger,
                }
        }
@@ -100,6 +105,18 @@ impl<C: Deref, L: Deref> NetGraphMsgHandler<C, L> where C::Target: chain::Access
        pub fn read_locked_graph<'a>(&'a self) -> LockedNetworkGraph<'a> {
                LockedNetworkGraph(self.network_graph.read().unwrap())
        }
+
+       /// Returns true when a full routing table sync should be performed with a peer.
+       fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
+               //TODO: Determine whether to request a full sync based on the network map.
+               const FULL_SYNCS_TO_REQUEST: usize = 5;
+               if self.full_syncs_requested.load(Ordering::Acquire) < FULL_SYNCS_TO_REQUEST {
+                       self.full_syncs_requested.fetch_add(1, Ordering::AcqRel);
+                       true
+               } else {
+                       false
+               }
+       }
 }
 
 impl<'a> LockedNetworkGraph<'a> {
@@ -202,15 +219,124 @@ impl<C: Deref + Sync + Send, L: Deref + Sync + Send> RoutingMessageHandler for N
                result
        }
 
-       fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
-               //TODO: Determine whether to request a full sync based on the network map.
-               const FULL_SYNCS_TO_REQUEST: usize = 5;
-               if self.full_syncs_requested.load(Ordering::Acquire) < FULL_SYNCS_TO_REQUEST {
-                       self.full_syncs_requested.fetch_add(1, Ordering::AcqRel);
-                       true
-               } else {
-                       false
+       /// Initiates a stateless sync of routing gossip information with a peer
+       /// using gossip_queries. The default strategy used by this implementation
+       /// is to sync the full block range with several peers.
+       ///
+       /// We should expect one or more reply_channel_range messages in response
+       /// to our query_channel_range. Each reply will enqueue a query_scid message
+       /// to request gossip messages for each channel. The sync is considered complete
+       /// when the final reply_scids_end message is received, though we are not
+       /// tracking this directly.
+       fn sync_routing_table(&self, their_node_id: &PublicKey, init_msg: &Init) {
+
+               // We will only perform a sync with peers that support gossip_queries.
+               if !init_msg.features.supports_gossip_queries() {
+                       return ();
+               }
+
+               // Check if we need to perform a full synchronization with this peer
+               if !self.should_request_full_sync(their_node_id) {
+                       return ();
                }
+
+               let first_blocknum = 0;
+               let number_of_blocks = 0xffffffff;
+               log_debug!(self.logger, "Sending query_channel_range peer={}, first_blocknum={}, number_of_blocks={}", log_pubkey!(their_node_id), first_blocknum, number_of_blocks);
+               let mut pending_events = self.pending_events.lock().unwrap();
+               pending_events.push(events::MessageSendEvent::SendChannelRangeQuery {
+                       node_id: their_node_id.clone(),
+                       msg: QueryChannelRange {
+                               chain_hash: self.network_graph.read().unwrap().genesis_hash,
+                               first_blocknum,
+                               number_of_blocks,
+                       },
+               });
+       }
+
+       /// Statelessly processes a reply to a channel range query by immediately
+       /// sending an SCID query with SCIDs in the reply. To keep this handler
+       /// stateless, it does not validate the sequencing of replies for multi-
+       /// reply ranges. It does not validate whether the reply(ies) cover the
+       /// queried range. It also does not filter SCIDs to only those in the
+       /// original query range. We also do not validate that the chain_hash
+       /// matches the chain_hash of the NetworkGraph. Any chan_ann message that
+       /// does not match our chain_hash will be rejected when the announcement is
+       /// processed.
+       fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
+               log_debug!(self.logger, "Handling reply_channel_range peer={}, first_blocknum={}, number_of_blocks={}, full_information={}, scids={}", log_pubkey!(their_node_id), msg.first_blocknum, msg.number_of_blocks, msg.full_information, msg.short_channel_ids.len(),);
+
+               // Validate that the remote node maintains up-to-date channel
+               // information for chain_hash. Some nodes use the full_information
+               // flag to indicate multi-part messages so we must check whether
+               // we received SCIDs as well.
+               if !msg.full_information && msg.short_channel_ids.len() == 0 {
+                       return Err(LightningError {
+                               err: String::from("Received reply_channel_range with no information available"),
+                               action: ErrorAction::IgnoreError,
+                       });
+               }
+
+               log_debug!(self.logger, "Sending query_short_channel_ids peer={}, batch_size={}", log_pubkey!(their_node_id), msg.short_channel_ids.len());
+               let mut pending_events = self.pending_events.lock().unwrap();
+               pending_events.push(events::MessageSendEvent::SendShortIdsQuery {
+                       node_id: their_node_id.clone(),
+                       msg: QueryShortChannelIds {
+                               chain_hash: msg.chain_hash,
+                               short_channel_ids: msg.short_channel_ids,
+                       }
+               });
+
+               Ok(())
+       }
+
+       /// When an SCID query is initiated the remote peer will begin streaming
+       /// gossip messages. In the event of a failure, we may have received
+       /// some channel information. Before trying with another peer, the
+       /// caller should update its set of SCIDs that need to be queried.
+       fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
+               log_debug!(self.logger, "Handling reply_short_channel_ids_end peer={}, full_information={}", log_pubkey!(their_node_id), msg.full_information);
+
+               // If the remote node does not have up-to-date information for the
+               // chain_hash they will set full_information=false. We can fail
+               // the result and try again with a different peer.
+               if !msg.full_information {
+                       return Err(LightningError {
+                               err: String::from("Received reply_short_channel_ids_end with no information"),
+                               action: ErrorAction::IgnoreError
+                       });
+               }
+
+               Ok(())
+       }
+
+       fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> {
+               // TODO
+               Err(LightningError {
+                       err: String::from("Not implemented"),
+                       action: ErrorAction::IgnoreError,
+               })
+       }
+
+       fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> {
+               // TODO
+               Err(LightningError {
+                       err: String::from("Not implemented"),
+                       action: ErrorAction::IgnoreError,
+               })
+       }
+}
+
+impl<C: Deref, L: Deref> events::MessageSendEventsProvider for NetGraphMsgHandler<C, L>
+where
+       C::Target: chain::Access,
+       L::Target: Logger,
+{
+       fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+               let mut ret = Vec::new();
+               let mut pending_events = self.pending_events.lock().unwrap();
+               std::mem::swap(&mut ret, &mut pending_events);
+               ret
        }
 }
 
@@ -448,6 +574,7 @@ impl Readable for NodeInfo {
 
 impl Writeable for NetworkGraph {
        fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+               self.genesis_hash.write(writer)?;
                (self.channels.len() as u64).write(writer)?;
                for (ref chan_id, ref chan_info) in self.channels.iter() {
                        (*chan_id).write(writer)?;
@@ -464,6 +591,7 @@ impl Writeable for NetworkGraph {
 
 impl Readable for NetworkGraph {
        fn read<R: ::std::io::Read>(reader: &mut R) -> Result<NetworkGraph, DecodeError> {
+               let genesis_hash: BlockHash = Readable::read(reader)?;
                let channels_count: u64 = Readable::read(reader)?;
                let mut channels = BTreeMap::new();
                for _ in 0..channels_count {
@@ -479,6 +607,7 @@ impl Readable for NetworkGraph {
                        nodes.insert(node_id, node_info);
                }
                Ok(NetworkGraph {
+                       genesis_hash,
                        channels,
                        nodes,
                })
@@ -524,8 +653,9 @@ impl NetworkGraph {
        }
 
        /// Creates a new, empty, network graph.
-       pub fn new() -> NetworkGraph {
+       pub fn new(genesis_hash: BlockHash) -> NetworkGraph {
                Self {
+                       genesis_hash,
                        channels: BTreeMap::new(),
                        nodes: BTreeMap::new(),
                }
@@ -882,14 +1012,15 @@ impl NetworkGraph {
 #[cfg(test)]
 mod tests {
        use chain;
-       use ln::features::{ChannelFeatures, NodeFeatures};
+       use ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
        use routing::network_graph::{NetGraphMsgHandler, NetworkGraph};
-       use ln::msgs::{OptionalField, RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement,
+       use ln::msgs::{Init, OptionalField, RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement,
                UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, HTLCFailChannelUpdate,
-               MAX_VALUE_MSAT};
+               ReplyChannelRange, ReplyShortChannelIdsEnd, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT};
        use util::test_utils;
        use util::logger::Logger;
        use util::ser::{Readable, Writeable};
+       use util::events::{MessageSendEvent, MessageSendEventsProvider};
 
        use bitcoin::hashes::sha256d::Hash as Sha256dHash;
        use bitcoin::hashes::Hash;
@@ -909,7 +1040,8 @@ mod tests {
        fn create_net_graph_msg_handler() -> (Secp256k1<All>, NetGraphMsgHandler<Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>) {
                let secp_ctx = Secp256k1::new();
                let logger = Arc::new(test_utils::TestLogger::new());
-               let net_graph_msg_handler = NetGraphMsgHandler::new(None, Arc::clone(&logger));
+               let genesis_hash = genesis_block(Network::Testnet).header.block_hash();
+               let net_graph_msg_handler = NetGraphMsgHandler::new(genesis_hash, None, Arc::clone(&logger));
                (secp_ctx, net_graph_msg_handler)
        }
 
@@ -1070,7 +1202,7 @@ mod tests {
                };
 
                // Test if the UTXO lookups were not supported
-               let mut net_graph_msg_handler = NetGraphMsgHandler::new(None, Arc::clone(&logger));
+               let mut net_graph_msg_handler = NetGraphMsgHandler::new(genesis_block(Network::Testnet).header.block_hash(), None, Arc::clone(&logger));
                match net_graph_msg_handler.handle_channel_announcement(&valid_announcement) {
                        Ok(res) => assert!(res),
                        _ => panic!()
@@ -1094,7 +1226,7 @@ mod tests {
                // Test if an associated transaction were not on-chain (or not confirmed).
                let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
                *chain_source.utxo_ret.lock().unwrap() = Err(chain::AccessError::UnknownTx);
-               net_graph_msg_handler = NetGraphMsgHandler::new(Some(chain_source.clone()), Arc::clone(&logger));
+               net_graph_msg_handler = NetGraphMsgHandler::new(chain_source.clone().genesis_hash, Some(chain_source.clone()), Arc::clone(&logger));
                unsigned_announcement.short_channel_id += 1;
 
                msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
@@ -1218,7 +1350,7 @@ mod tests {
                let secp_ctx = Secp256k1::new();
                let logger: Arc<Logger> = Arc::new(test_utils::TestLogger::new());
                let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
-               let net_graph_msg_handler = NetGraphMsgHandler::new(Some(chain_source.clone()), Arc::clone(&logger));
+               let net_graph_msg_handler = NetGraphMsgHandler::new(genesis_block(Network::Testnet).header.block_hash(), Some(chain_source.clone()), Arc::clone(&logger));
 
                let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
                let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
@@ -1813,4 +1945,184 @@ mod tests {
                network.write(&mut w).unwrap();
                assert!(<NetworkGraph>::read(&mut ::std::io::Cursor::new(&w.0)).unwrap() == *network);
        }
+
+       #[test]
+       fn calling_sync_routing_table() {
+               let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+               let node_privkey_1 = &SecretKey::from_slice(&[42; 32]).unwrap();
+               let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_privkey_1);
+
+               let chain_hash = genesis_block(Network::Testnet).header.block_hash();
+               let first_blocknum = 0;
+               let number_of_blocks = 0xffff_ffff;
+
+               // It should ignore if gossip_queries feature is not enabled
+               {
+                       let init_msg = Init { features: InitFeatures::known().clear_gossip_queries() };
+                       net_graph_msg_handler.sync_routing_table(&node_id_1, &init_msg);
+                       let events = net_graph_msg_handler.get_and_clear_pending_msg_events();
+                       assert_eq!(events.len(), 0);
+               }
+
+               // It should send a query_channel_message with the correct information
+               {
+                       let init_msg = Init { features: InitFeatures::known() };
+                       net_graph_msg_handler.sync_routing_table(&node_id_1, &init_msg);
+                       let events = net_graph_msg_handler.get_and_clear_pending_msg_events();
+                       assert_eq!(events.len(), 1);
+                       match &events[0] {
+                               MessageSendEvent::SendChannelRangeQuery{ node_id, msg } => {
+                                       assert_eq!(node_id, &node_id_1);
+                                       assert_eq!(msg.chain_hash, chain_hash);
+                                       assert_eq!(msg.first_blocknum, first_blocknum);
+                                       assert_eq!(msg.number_of_blocks, number_of_blocks);
+                               },
+                               _ => panic!("Expected MessageSendEvent::SendChannelRangeQuery")
+                       };
+               }
+
+               // It should not enqueue a query when should_request_full_sync return false.
+               // The initial implementation allows syncing with the first 5 peers after
+               // which should_request_full_sync will return false
+               {
+                       let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+                       let init_msg = Init { features: InitFeatures::known() };
+                       for n in 1..7 {
+                               let node_privkey = &SecretKey::from_slice(&[n; 32]).unwrap();
+                               let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey);
+                               net_graph_msg_handler.sync_routing_table(&node_id, &init_msg);
+                               let events = net_graph_msg_handler.get_and_clear_pending_msg_events();
+                               if n <= 5 {
+                                       assert_eq!(events.len(), 1);
+                               } else {
+                                       assert_eq!(events.len(), 0);
+                               }
+
+                       }
+               }
+       }
+
+       #[test]
+       fn handling_reply_channel_range() {
+               let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+               let node_privkey_1 = &SecretKey::from_slice(&[42; 32]).unwrap();
+               let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_privkey_1);
+
+               let chain_hash = genesis_block(Network::Testnet).header.block_hash();
+
+               // Test receipt of a single reply that should enqueue an SCID query
+               // matching the SCIDs in the reply
+               {
+                       let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange {
+                               chain_hash,
+                               full_information: true,
+                               first_blocknum: 0,
+                               number_of_blocks: 2000,
+                               short_channel_ids: vec![
+                                       0x0003e0_000000_0000, // 992x0x0
+                                       0x0003e8_000000_0000, // 1000x0x0
+                                       0x0003e9_000000_0000, // 1001x0x0
+                                       0x0003f0_000000_0000, // 1008x0x0
+                                       0x00044c_000000_0000, // 1100x0x0
+                                       0x0006e0_000000_0000, // 1760x0x0
+                               ],
+                       });
+                       assert!(result.is_ok());
+
+                       // We expect to emit a query_short_channel_ids message with the received scids
+                       let events = net_graph_msg_handler.get_and_clear_pending_msg_events();
+                       assert_eq!(events.len(), 1);
+                       match &events[0] {
+                               MessageSendEvent::SendShortIdsQuery { node_id, msg } => {
+                                       assert_eq!(node_id, &node_id_1);
+                                       assert_eq!(msg.chain_hash, chain_hash);
+                                       assert_eq!(msg.short_channel_ids, vec![
+                                               0x0003e0_000000_0000, // 992x0x0
+                                               0x0003e8_000000_0000, // 1000x0x0
+                                               0x0003e9_000000_0000, // 1001x0x0
+                                               0x0003f0_000000_0000, // 1008x0x0
+                                               0x00044c_000000_0000, // 1100x0x0
+                                               0x0006e0_000000_0000, // 1760x0x0
+                                       ]);
+                               },
+                               _ => panic!("expected MessageSendEvent::SendShortIdsQuery"),
+                       }
+               }
+
+               // Test receipt of a reply that indicates the remote node does not maintain up-to-date
+               // information for the chain_hash. Because of discrepancies in implementation we use
+               // full_information=false and short_channel_ids=[] as the signal.
+               {
+                       // Handle the reply indicating the peer was unable to fulfill our request.
+                       let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange {
+                               chain_hash,
+                               full_information: false,
+                               first_blocknum: 1000,
+                               number_of_blocks: 100,
+                               short_channel_ids: vec![],
+                       });
+                       assert!(result.is_err());
+                       assert_eq!(result.err().unwrap().err, "Received reply_channel_range with no information available");
+               }
+       }
+
+       #[test]
+       fn handling_reply_short_channel_ids() {
+               let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+               let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+               let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey);
+
+               let chain_hash = genesis_block(Network::Testnet).header.block_hash();
+
+               // Test receipt of a successful reply
+               {
+                       let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, ReplyShortChannelIdsEnd {
+                               chain_hash,
+                               full_information: true,
+                       });
+                       assert!(result.is_ok());
+               }
+
+               // Test receipt of a reply that indicates the peer does not maintain up-to-date information
+               // for the chain_hash requested in the query.
+               {
+                       let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, ReplyShortChannelIdsEnd {
+                               chain_hash,
+                               full_information: false,
+                       });
+                       assert!(result.is_err());
+                       assert_eq!(result.err().unwrap().err, "Received reply_short_channel_ids_end with no information");
+               }
+       }
+
+       #[test]
+       fn handling_query_channel_range() {
+               let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+               let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+               let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey);
+
+               let chain_hash = genesis_block(Network::Testnet).header.block_hash();
+
+               let result = net_graph_msg_handler.handle_query_channel_range(&node_id, QueryChannelRange {
+                       chain_hash,
+                       first_blocknum: 0,
+                       number_of_blocks: 0xffff_ffff,
+               });
+               assert!(result.is_err());
+       }
+
+       #[test]
+       fn handling_query_short_channel_ids() {
+               let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+               let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+               let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey);
+
+               let chain_hash = genesis_block(Network::Testnet).header.block_hash();
+
+               let result = net_graph_msg_handler.handle_query_short_channel_ids(&node_id, QueryShortChannelIds {
+                       chain_hash,
+                       short_channel_ids: vec![0x0003e8_000000_0000],
+               });
+               assert!(result.is_err());
+       }
 }
index 490b6b4048cbd33a7ba60e8b578b6dddbfcd4e17..f73a9a501bfbb75cf3bbafcb3f2193093bfba969 100644 (file)
@@ -538,7 +538,7 @@ mod tests {
        fn build_graph() -> (Secp256k1<All>, NetGraphMsgHandler<std::sync::Arc<crate::util::test_utils::TestChainSource>, std::sync::Arc<crate::util::test_utils::TestLogger>>, std::sync::Arc<test_utils::TestLogger>) {
                let secp_ctx = Secp256k1::new();
                let logger = Arc::new(test_utils::TestLogger::new());
-               let net_graph_msg_handler = NetGraphMsgHandler::new(None, Arc::clone(&logger));
+               let net_graph_msg_handler = NetGraphMsgHandler::new(genesis_block(Network::Testnet).header.block_hash(), None, Arc::clone(&logger));
                // Build network from our_id to node7:
                //
                //        -1(1)2-  node0  -1(3)2-
@@ -1258,7 +1258,7 @@ mod tests {
                        inbound_capacity_msat: 100000,
                        is_live: true,
                }];
-               let route = get_route(&source_node_id, &NetworkGraph::new(), &target_node_id, Some(&our_chans.iter().collect::<Vec<_>>()), &last_hops.iter().collect::<Vec<_>>(), 100, 42, Arc::new(test_utils::TestLogger::new())).unwrap();
+               let route = get_route(&source_node_id, &NetworkGraph::new(genesis_block(Network::Testnet).header.block_hash()), &target_node_id, Some(&our_chans.iter().collect::<Vec<_>>()), &last_hops.iter().collect::<Vec<_>>(), 100, 42, Arc::new(test_utils::TestLogger::new())).unwrap();
 
                assert_eq!(route.paths[0].len(), 2);
 
index 3eeacdc732cb12969f0cddd69aa329745dce5297..6f6f32daeab18226313872d1b5ae15fbd3bfa038 100644 (file)
@@ -346,7 +346,22 @@ pub enum MessageSendEvent {
        PaymentFailureNetworkUpdate {
                /// The channel/node update which should be sent to NetGraphMsgHandler
                update: msgs::HTLCFailChannelUpdate,
-       }
+       },
+       /// Query a peer for channels with funding transaction UTXOs in a block range.
+       SendChannelRangeQuery {
+               /// The node_id of this message recipient
+               node_id: PublicKey,
+               /// The query_channel_range which should be sent.
+               msg: msgs::QueryChannelRange,
+       },
+       /// Request routing gossip messages from a peer for a list of channels identified by
+       /// their short_channel_ids.
+       SendShortIdsQuery {
+               /// The node_id of this message recipient
+               node_id: PublicKey,
+               /// The query_short_channel_ids which should be sent.
+               msg: msgs::QueryShortChannelIds,
+       },
 }
 
 /// A trait indicating an object may generate message send events
index 6c3552d5df0853f1b8b2e79592573373c25f7c99..c944e572cd4569deb44ee8d06180f64d2440ef41 100644 (file)
@@ -316,8 +316,28 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
                Vec::new()
        }
 
-       fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
-               self.request_full_sync.load(Ordering::Acquire)
+       fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &msgs::Init) {}
+
+       fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> {
+               Ok(())
+       }
+
+       fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), msgs::LightningError> {
+               Ok(())
+       }
+
+       fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), msgs::LightningError> {
+               Ok(())
+       }
+
+       fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), msgs::LightningError> {
+               Ok(())
+       }
+}
+
+impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+               vec![]
        }
 }