From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Tue, 15 Dec 2020 19:17:11 +0000 (-0800) Subject: Merge pull request #736 from bmancini55/gossip_queries X-Git-Tag: v0.0.13~50 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=74cd96f972d539b9e19276bef0e9307c6b3299f1;hp=a008464417b7351f23f27fec5e977fccb4b72095;p=rust-lightning Merge pull request #736 from bmancini55/gossip_queries Initiate gossip_queries --- diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 3aeb3d23..6375d076 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -19,6 +19,7 @@ use bitcoin::blockdata::script::{Builder, Script}; use bitcoin::blockdata::opcodes; use bitcoin::consensus::encode::deserialize; use bitcoin::network::constants::Network; +use bitcoin::blockdata::constants::genesis_block; use bitcoin::hashes::Hash as TraitImport; use bitcoin::hashes::HashEngine as TraitImportEngine; @@ -343,7 +344,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { config.peer_channel_config_limits.min_dust_limit_satoshis = 0; let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0)); let our_id = PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()); - let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(None, Arc::clone(&logger))); + let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(genesis_block(Network::Bitcoin).header.block_hash(), None, Arc::clone(&logger))); let peers = RefCell::new([false; 256]); let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler { @@ -609,7 +610,7 @@ mod tests { // What each byte represents is broken down below, and then everything is concatenated into // one large test at the end (you want %s/ -.*//g %s/\n\| \|\t\|\///g). - // Following BOLT 8, lightning message on the wire are: 2-byte encrypted message length + + // Following BOLT 8, lightning message on the wire are: 2-byte encrypted message length + // 16-byte MAC of the encrypted message length + encrypted Lightning message + 16-byte MAC // of the Lightning message // I.e 2nd inbound read, len 18 : 0006 (encrypted message length) + 03000000000000000000000000000000 (MAC of the encrypted message length) diff --git a/fuzz/src/router.rs b/fuzz/src/router.rs index 671adcfd..4eb85714 100644 --- a/fuzz/src/router.rs +++ b/fuzz/src/router.rs @@ -21,6 +21,8 @@ use lightning::util::ser::Readable; use lightning::routing::network_graph::{NetworkGraph, RoutingFees}; use bitcoin::secp256k1::key::PublicKey; +use bitcoin::network::constants::Network; +use bitcoin::blockdata::constants::genesis_block; use utils::test_logger; @@ -155,7 +157,7 @@ pub fn do_test(data: &[u8], out: Out) { let logger: Arc = Arc::new(test_logger::TestLogger::new("".to_owned(), out)); let our_pubkey = get_pubkey!(); - let mut net_graph = NetworkGraph::new(); + let mut net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash()); let mut node_pks = HashSet::new(); let mut scid = 42; diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 36384380..8e5885ca 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -535,7 +535,11 @@ mod tests { fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { } fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { Vec::new() } fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } - fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false } + fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &Init) { } + fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } + fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } + fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { Ok(()) } + fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } } impl ChannelMessageHandler for MsgHandler { fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {} diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 0f5e7f8a..c39eef3e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3404,6 +3404,8 @@ impl true, &events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id, &events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true, + &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, + &events::MessageSendEvent::SendShortIdsQuery { .. } => false, } }); } diff --git a/lightning/src/ln/features.rs b/lightning/src/ln/features.rs index f573ac4c..3ce3d4fa 100644 --- a/lightning/src/ln/features.rs +++ b/lightning/src/ln/features.rs @@ -104,7 +104,7 @@ mod sealed { ], optional_features: [ // Byte 0 - DataLossProtect | InitialRoutingSync | UpfrontShutdownScript, + DataLossProtect | InitialRoutingSync | UpfrontShutdownScript | GossipQueries, // Byte 1 VariableLengthOnion | PaymentSecret, // Byte 2 @@ -122,7 +122,7 @@ mod sealed { ], optional_features: [ // Byte 0 - DataLossProtect | UpfrontShutdownScript, + DataLossProtect | UpfrontShutdownScript | GossipQueries, // Byte 1 VariableLengthOnion | PaymentSecret, // Byte 2 @@ -243,6 +243,8 @@ mod sealed { "Feature flags for `initial_routing_sync`."); define_feature!(5, UpfrontShutdownScript, [InitContext, NodeContext], "Feature flags for `option_upfront_shutdown_script`."); + define_feature!(7, GossipQueries, [InitContext, NodeContext], + "Feature flags for `gossip_queries`."); define_feature!(9, VariableLengthOnion, [InitContext, NodeContext], "Feature flags for `var_onion_optin`."); define_feature!(13, StaticRemoteKey, [InitContext, NodeContext], @@ -473,6 +475,22 @@ impl Features { } } + +impl Features { + #[cfg(test)] + pub(crate) fn requires_gossip_queries(&self) -> bool { + ::requires_feature(&self.flags) + } + pub(crate) fn supports_gossip_queries(&self) -> bool { + ::supports_feature(&self.flags) + } + #[cfg(test)] + pub(crate) fn clear_gossip_queries(mut self) -> Self { + ::clear_bits(&mut self.flags); + self + } +} + impl Features { #[cfg(test)] pub(crate) fn requires_variable_length_onion(&self) -> bool { @@ -497,6 +515,10 @@ impl Features { pub(crate) fn initial_routing_sync(&self) -> bool { ::supports_feature(&self.flags) } + // We are no longer setting initial_routing_sync now that gossip_queries + // is enabled. This feature is ignored by a peer when gossip_queries has + // been negotiated. + #[cfg(test)] pub(crate) fn clear_initial_routing_sync(&mut self) { ::clear_bits(&mut self.flags) } @@ -568,6 +590,11 @@ mod tests { assert!(!InitFeatures::known().requires_upfront_shutdown_script()); assert!(!NodeFeatures::known().requires_upfront_shutdown_script()); + assert!(InitFeatures::known().supports_gossip_queries()); + assert!(NodeFeatures::known().supports_gossip_queries()); + assert!(!InitFeatures::known().requires_gossip_queries()); + assert!(!NodeFeatures::known().requires_gossip_queries()); + assert!(InitFeatures::known().supports_data_loss_protect()); assert!(NodeFeatures::known().supports_data_loss_protect()); assert!(!InitFeatures::known().requires_data_loss_protect()); @@ -620,9 +647,10 @@ mod tests { #[test] fn convert_to_context_with_relevant_flags() { - let init_features = InitFeatures::known().clear_upfront_shutdown_script(); + let init_features = InitFeatures::known().clear_upfront_shutdown_script().clear_gossip_queries(); assert!(init_features.initial_routing_sync()); assert!(!init_features.supports_upfront_shutdown_script()); + assert!(!init_features.supports_gossip_queries()); let node_features: NodeFeatures = init_features.to_context(); { @@ -639,8 +667,10 @@ mod tests { // Check that cleared flags are kept blank when converting back: // - initial_routing_sync was not applicable to NodeContext // - upfront_shutdown_script was cleared before converting + // - gossip_queries was cleared before converting let features: InitFeatures = node_features.to_context_internal(); assert!(!features.initial_routing_sync()); assert!(!features.supports_upfront_shutdown_script()); + assert!(!init_features.supports_gossip_queries()); } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 8ff3d4ea..d54045f8 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -1171,7 +1171,7 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec Result; @@ -825,8 +831,25 @@ pub trait RoutingMessageHandler : Send + Sync { /// immediately higher (as defined by ::cmp) than starting_point. /// If None is provided for starting_point, we start at the first node. fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec; - /// Returns whether a full sync should be requested from a peer. - fn should_request_full_sync(&self, node_id: &PublicKey) -> bool; + /// Called when a connection is established with a peer. This can be used to + /// perform routing table synchronization using a strategy defined by the + /// implementor. + fn sync_routing_table(&self, their_node_id: &PublicKey, init: &Init); + /// Handles the reply of a query we initiated to learn about channels + /// for a given range of blocks. We can expect to receive one or more + /// replies to a single query. + fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError>; + /// Handles the reply of a query we initiated asking for routing gossip + /// messages for a list of channels. We should receive this message when + /// a node has completed its best effort to send us the pertaining routing + /// gossip messages. + fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError>; + /// Handles when a peer asks us to send a list of short_channel_ids + /// for the requested range of blocks. + fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError>; + /// Handles when a peer asks us to send routing gossip messages for a + /// list of short_channel_ids. + fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>; } mod fuzzy_internal_msgs { diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 6e997e53..890df356 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -584,11 +584,7 @@ impl PeerManager PeerManager PeerManager PeerManager { + self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::ReplyShortChannelIdsEnd(msg) => { + self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::QueryChannelRange(msg) => { + self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::ReplyChannelRange(msg) => { + self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::GossipTimestampFilter(_msg) => { + // TODO: handle message + }, // Unknown messages: wire::Message::Unknown(msg_type) if msg_type.is_even() => { @@ -864,6 +874,7 @@ impl PeerManager PeerManager { + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {}); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); + self.do_attempt_write_data(&mut descriptor, peer); + }, + MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {}); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); + self.do_attempt_write_data(&mut descriptor, peer); } } } @@ -1302,13 +1323,6 @@ mod tests { (fd_a.clone(), fd_b.clone()) } - fn establish_connection_and_read_events<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { - let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b); - assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); - assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); - (fd_a.clone(), fd_b.clone()) - } - #[test] fn test_disconnect_peer() { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and @@ -1377,41 +1391,4 @@ mod tests { assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100); assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50); } - - #[test] - fn limit_initial_routing_sync_requests() { - // Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not. - { - let cfgs = create_peermgr_cfgs(2); - cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release); - let peers = create_network(2, &cfgs); - let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]); - - let peer_0 = peers[0].peers.lock().unwrap(); - let peer_1 = peers[1].peers.lock().unwrap(); - - let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref(); - let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref(); - - assert!(peer_0_features.unwrap().initial_routing_sync()); - assert!(!peer_1_features.unwrap().initial_routing_sync()); - } - - // Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not. - { - let cfgs = create_peermgr_cfgs(2); - cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release); - let peers = create_network(2, &cfgs); - let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]); - - let peer_0 = peers[0].peers.lock().unwrap(); - let peer_1 = peers[1].peers.lock().unwrap(); - - let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref(); - let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref(); - - assert!(!peer_0_features.unwrap().initial_routing_sync()); - assert!(peer_1_features.unwrap().initial_routing_sync()); - } - } } diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs index 86d8bfdd..8197ce15 100644 --- a/lightning/src/ln/wire.rs +++ b/lightning/src/ln/wire.rs @@ -55,6 +55,11 @@ pub enum Message { ChannelAnnouncement(msgs::ChannelAnnouncement), NodeAnnouncement(msgs::NodeAnnouncement), ChannelUpdate(msgs::ChannelUpdate), + QueryShortChannelIds(msgs::QueryShortChannelIds), + ReplyShortChannelIdsEnd(msgs::ReplyShortChannelIdsEnd), + QueryChannelRange(msgs::QueryChannelRange), + ReplyChannelRange(msgs::ReplyChannelRange), + GossipTimestampFilter(msgs::GossipTimestampFilter), /// A message that could not be decoded because its type is unknown. Unknown(MessageType), } @@ -90,6 +95,11 @@ impl Message { &Message::ChannelAnnouncement(ref msg) => msg.type_id(), &Message::NodeAnnouncement(ref msg) => msg.type_id(), &Message::ChannelUpdate(ref msg) => msg.type_id(), + &Message::QueryShortChannelIds(ref msg) => msg.type_id(), + &Message::ReplyShortChannelIdsEnd(ref msg) => msg.type_id(), + &Message::QueryChannelRange(ref msg) => msg.type_id(), + &Message::ReplyChannelRange(ref msg) => msg.type_id(), + &Message::GossipTimestampFilter(ref msg) => msg.type_id(), &Message::Unknown(type_id) => type_id, } } @@ -186,6 +196,21 @@ pub fn read(buffer: &mut R) -> Result { Ok(Message::ChannelUpdate(Readable::read(buffer)?)) }, + msgs::QueryShortChannelIds::TYPE => { + Ok(Message::QueryShortChannelIds(Readable::read(buffer)?)) + }, + msgs::ReplyShortChannelIdsEnd::TYPE => { + Ok(Message::ReplyShortChannelIdsEnd(Readable::read(buffer)?)) + }, + msgs::QueryChannelRange::TYPE => { + Ok(Message::QueryChannelRange(Readable::read(buffer)?)) + }, + msgs::ReplyChannelRange::TYPE => { + Ok(Message::ReplyChannelRange(Readable::read(buffer)?)) + } + msgs::GossipTimestampFilter::TYPE => { + Ok(Message::GossipTimestampFilter(Readable::read(buffer)?)) + }, _ => { Ok(Message::Unknown(MessageType(message_type))) }, @@ -312,6 +337,26 @@ impl Encode for msgs::ChannelUpdate { const TYPE: u16 = 258; } +impl Encode for msgs::QueryShortChannelIds { + const TYPE: u16 = 261; +} + +impl Encode for msgs::ReplyShortChannelIdsEnd { + const TYPE: u16 = 262; +} + +impl Encode for msgs::QueryChannelRange { + const TYPE: u16 = 263; +} + +impl Encode for msgs::ReplyChannelRange { + const TYPE: u16 = 264; +} + +impl Encode for msgs::GossipTimestampFilter { + const TYPE: u16 = 265; +} + #[cfg(test)] mod tests { use super::*; @@ -415,24 +460,25 @@ mod tests { fn read_lnd_init_msg() { // Taken from lnd v0.9.0-beta. let buffer = vec![0, 16, 0, 2, 34, 0, 0, 3, 2, 162, 161]; - check_init_msg(buffer); + check_init_msg(buffer, false); } #[test] fn read_clightning_init_msg() { // Taken from c-lightning v0.8.0. let buffer = vec![0, 16, 0, 2, 34, 0, 0, 3, 2, 170, 162, 1, 32, 6, 34, 110, 70, 17, 26, 11, 89, 202, 175, 18, 96, 67, 235, 91, 191, 40, 195, 79, 58, 94, 51, 42, 31, 199, 178, 183, 60, 241, 136, 145, 15]; - check_init_msg(buffer); + check_init_msg(buffer, true); } - fn check_init_msg(buffer: Vec) { + fn check_init_msg(buffer: Vec, expect_unknown: bool) { let mut reader = ::std::io::Cursor::new(buffer); let decoded_msg = read(&mut reader).unwrap(); match decoded_msg { Message::Init(msgs::Init { features }) => { assert!(features.supports_variable_length_onion()); assert!(features.supports_upfront_shutdown_script()); - assert!(features.supports_unknown_bits()); + assert!(features.supports_gossip_queries()); + assert_eq!(expect_unknown, features.supports_unknown_bits()); assert!(!features.requires_unknown_bits()); assert!(!features.initial_routing_sync()); }, @@ -450,7 +496,7 @@ mod tests { Message::NodeAnnouncement(msgs::NodeAnnouncement { contents: msgs::UnsignedNodeAnnouncement { features, ..}, ..}) => { assert!(features.supports_variable_length_onion()); assert!(features.supports_upfront_shutdown_script()); - assert!(features.supports_unknown_bits()); + assert!(features.supports_gossip_queries()); assert!(!features.requires_unknown_bits()); }, _ => panic!("Expected node announcement, found message type: {}", decoded_msg.type_id()) diff --git a/lightning/src/routing/network_graph.rs b/lightning/src/routing/network_graph.rs index 54783dd1..8075462c 100644 --- a/lightning/src/routing/network_graph.rs +++ b/lightning/src/routing/network_graph.rs @@ -18,19 +18,23 @@ use bitcoin::hashes::Hash; use bitcoin::blockdata::script::Builder; use bitcoin::blockdata::transaction::TxOut; use bitcoin::blockdata::opcodes; +use bitcoin::hash_types::BlockHash; use chain; use chain::Access; use ln::features::{ChannelFeatures, NodeFeatures}; -use ln::msgs::{DecodeError, ErrorAction, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT}; +use ln::msgs::{DecodeError, ErrorAction, Init, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT}; use ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, OptionalField}; +use ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds, ReplyShortChannelIdsEnd}; use ln::msgs; use util::ser::{Writeable, Readable, Writer}; use util::logger::Logger; +use util::events; use std::{cmp, fmt}; use std::sync::{RwLock, RwLockReadGuard}; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Mutex; use std::collections::BTreeMap; use std::collections::btree_map::Entry as BtreeEntry; use std::ops::Deref; @@ -39,6 +43,7 @@ use bitcoin::hashes::hex::ToHex; /// Represents the network as nodes and channels between them #[derive(PartialEq)] pub struct NetworkGraph { + genesis_hash: BlockHash, channels: BTreeMap, nodes: BTreeMap, } @@ -59,6 +64,7 @@ pub struct NetGraphMsgHandler where C::Target: chain::Access pub network_graph: RwLock, chain_access: Option, full_syncs_requested: AtomicUsize, + pending_events: Mutex>, logger: L, } @@ -68,15 +74,13 @@ impl NetGraphMsgHandler where C::Target: chain::Access /// Chain monitor is used to make sure announced channels exist on-chain, /// channel data is correct, and that the announcement is signed with /// channel owners' keys. - pub fn new(chain_access: Option, logger: L) -> Self { + pub fn new(genesis_hash: BlockHash, chain_access: Option, logger: L) -> Self { NetGraphMsgHandler { secp_ctx: Secp256k1::verification_only(), - network_graph: RwLock::new(NetworkGraph { - channels: BTreeMap::new(), - nodes: BTreeMap::new(), - }), + network_graph: RwLock::new(NetworkGraph::new(genesis_hash)), full_syncs_requested: AtomicUsize::new(0), chain_access, + pending_events: Mutex::new(vec![]), logger, } } @@ -89,6 +93,7 @@ impl NetGraphMsgHandler where C::Target: chain::Access network_graph: RwLock::new(network_graph), full_syncs_requested: AtomicUsize::new(0), chain_access, + pending_events: Mutex::new(vec![]), logger, } } @@ -100,6 +105,18 @@ impl NetGraphMsgHandler where C::Target: chain::Access pub fn read_locked_graph<'a>(&'a self) -> LockedNetworkGraph<'a> { LockedNetworkGraph(self.network_graph.read().unwrap()) } + + /// Returns true when a full routing table sync should be performed with a peer. + fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { + //TODO: Determine whether to request a full sync based on the network map. + const FULL_SYNCS_TO_REQUEST: usize = 5; + if self.full_syncs_requested.load(Ordering::Acquire) < FULL_SYNCS_TO_REQUEST { + self.full_syncs_requested.fetch_add(1, Ordering::AcqRel); + true + } else { + false + } + } } impl<'a> LockedNetworkGraph<'a> { @@ -202,15 +219,124 @@ impl RoutingMessageHandler for N result } - fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { - //TODO: Determine whether to request a full sync based on the network map. - const FULL_SYNCS_TO_REQUEST: usize = 5; - if self.full_syncs_requested.load(Ordering::Acquire) < FULL_SYNCS_TO_REQUEST { - self.full_syncs_requested.fetch_add(1, Ordering::AcqRel); - true - } else { - false + /// Initiates a stateless sync of routing gossip information with a peer + /// using gossip_queries. The default strategy used by this implementation + /// is to sync the full block range with several peers. + /// + /// We should expect one or more reply_channel_range messages in response + /// to our query_channel_range. Each reply will enqueue a query_scid message + /// to request gossip messages for each channel. The sync is considered complete + /// when the final reply_scids_end message is received, though we are not + /// tracking this directly. + fn sync_routing_table(&self, their_node_id: &PublicKey, init_msg: &Init) { + + // We will only perform a sync with peers that support gossip_queries. + if !init_msg.features.supports_gossip_queries() { + return (); + } + + // Check if we need to perform a full synchronization with this peer + if !self.should_request_full_sync(their_node_id) { + return (); } + + let first_blocknum = 0; + let number_of_blocks = 0xffffffff; + log_debug!(self.logger, "Sending query_channel_range peer={}, first_blocknum={}, number_of_blocks={}", log_pubkey!(their_node_id), first_blocknum, number_of_blocks); + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push(events::MessageSendEvent::SendChannelRangeQuery { + node_id: their_node_id.clone(), + msg: QueryChannelRange { + chain_hash: self.network_graph.read().unwrap().genesis_hash, + first_blocknum, + number_of_blocks, + }, + }); + } + + /// Statelessly processes a reply to a channel range query by immediately + /// sending an SCID query with SCIDs in the reply. To keep this handler + /// stateless, it does not validate the sequencing of replies for multi- + /// reply ranges. It does not validate whether the reply(ies) cover the + /// queried range. It also does not filter SCIDs to only those in the + /// original query range. We also do not validate that the chain_hash + /// matches the chain_hash of the NetworkGraph. Any chan_ann message that + /// does not match our chain_hash will be rejected when the announcement is + /// processed. + fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> { + log_debug!(self.logger, "Handling reply_channel_range peer={}, first_blocknum={}, number_of_blocks={}, full_information={}, scids={}", log_pubkey!(their_node_id), msg.first_blocknum, msg.number_of_blocks, msg.full_information, msg.short_channel_ids.len(),); + + // Validate that the remote node maintains up-to-date channel + // information for chain_hash. Some nodes use the full_information + // flag to indicate multi-part messages so we must check whether + // we received SCIDs as well. + if !msg.full_information && msg.short_channel_ids.len() == 0 { + return Err(LightningError { + err: String::from("Received reply_channel_range with no information available"), + action: ErrorAction::IgnoreError, + }); + } + + log_debug!(self.logger, "Sending query_short_channel_ids peer={}, batch_size={}", log_pubkey!(their_node_id), msg.short_channel_ids.len()); + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push(events::MessageSendEvent::SendShortIdsQuery { + node_id: their_node_id.clone(), + msg: QueryShortChannelIds { + chain_hash: msg.chain_hash, + short_channel_ids: msg.short_channel_ids, + } + }); + + Ok(()) + } + + /// When an SCID query is initiated the remote peer will begin streaming + /// gossip messages. In the event of a failure, we may have received + /// some channel information. Before trying with another peer, the + /// caller should update its set of SCIDs that need to be queried. + fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { + log_debug!(self.logger, "Handling reply_short_channel_ids_end peer={}, full_information={}", log_pubkey!(their_node_id), msg.full_information); + + // If the remote node does not have up-to-date information for the + // chain_hash they will set full_information=false. We can fail + // the result and try again with a different peer. + if !msg.full_information { + return Err(LightningError { + err: String::from("Received reply_short_channel_ids_end with no information"), + action: ErrorAction::IgnoreError + }); + } + + Ok(()) + } + + fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { + // TODO + Err(LightningError { + err: String::from("Not implemented"), + action: ErrorAction::IgnoreError, + }) + } + + fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { + // TODO + Err(LightningError { + err: String::from("Not implemented"), + action: ErrorAction::IgnoreError, + }) + } +} + +impl events::MessageSendEventsProvider for NetGraphMsgHandler +where + C::Target: chain::Access, + L::Target: Logger, +{ + fn get_and_clear_pending_msg_events(&self) -> Vec { + let mut ret = Vec::new(); + let mut pending_events = self.pending_events.lock().unwrap(); + std::mem::swap(&mut ret, &mut pending_events); + ret } } @@ -448,6 +574,7 @@ impl Readable for NodeInfo { impl Writeable for NetworkGraph { fn write(&self, writer: &mut W) -> Result<(), ::std::io::Error> { + self.genesis_hash.write(writer)?; (self.channels.len() as u64).write(writer)?; for (ref chan_id, ref chan_info) in self.channels.iter() { (*chan_id).write(writer)?; @@ -464,6 +591,7 @@ impl Writeable for NetworkGraph { impl Readable for NetworkGraph { fn read(reader: &mut R) -> Result { + let genesis_hash: BlockHash = Readable::read(reader)?; let channels_count: u64 = Readable::read(reader)?; let mut channels = BTreeMap::new(); for _ in 0..channels_count { @@ -479,6 +607,7 @@ impl Readable for NetworkGraph { nodes.insert(node_id, node_info); } Ok(NetworkGraph { + genesis_hash, channels, nodes, }) @@ -524,8 +653,9 @@ impl NetworkGraph { } /// Creates a new, empty, network graph. - pub fn new() -> NetworkGraph { + pub fn new(genesis_hash: BlockHash) -> NetworkGraph { Self { + genesis_hash, channels: BTreeMap::new(), nodes: BTreeMap::new(), } @@ -882,14 +1012,15 @@ impl NetworkGraph { #[cfg(test)] mod tests { use chain; - use ln::features::{ChannelFeatures, NodeFeatures}; + use ln::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use routing::network_graph::{NetGraphMsgHandler, NetworkGraph}; - use ln::msgs::{OptionalField, RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement, + use ln::msgs::{Init, OptionalField, RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement, UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, HTLCFailChannelUpdate, - MAX_VALUE_MSAT}; + ReplyChannelRange, ReplyShortChannelIdsEnd, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT}; use util::test_utils; use util::logger::Logger; use util::ser::{Readable, Writeable}; + use util::events::{MessageSendEvent, MessageSendEventsProvider}; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use bitcoin::hashes::Hash; @@ -909,7 +1040,8 @@ mod tests { fn create_net_graph_msg_handler() -> (Secp256k1, NetGraphMsgHandler, Arc>) { let secp_ctx = Secp256k1::new(); let logger = Arc::new(test_utils::TestLogger::new()); - let net_graph_msg_handler = NetGraphMsgHandler::new(None, Arc::clone(&logger)); + let genesis_hash = genesis_block(Network::Testnet).header.block_hash(); + let net_graph_msg_handler = NetGraphMsgHandler::new(genesis_hash, None, Arc::clone(&logger)); (secp_ctx, net_graph_msg_handler) } @@ -1070,7 +1202,7 @@ mod tests { }; // Test if the UTXO lookups were not supported - let mut net_graph_msg_handler = NetGraphMsgHandler::new(None, Arc::clone(&logger)); + let mut net_graph_msg_handler = NetGraphMsgHandler::new(genesis_block(Network::Testnet).header.block_hash(), None, Arc::clone(&logger)); match net_graph_msg_handler.handle_channel_announcement(&valid_announcement) { Ok(res) => assert!(res), _ => panic!() @@ -1094,7 +1226,7 @@ mod tests { // Test if an associated transaction were not on-chain (or not confirmed). let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); *chain_source.utxo_ret.lock().unwrap() = Err(chain::AccessError::UnknownTx); - net_graph_msg_handler = NetGraphMsgHandler::new(Some(chain_source.clone()), Arc::clone(&logger)); + net_graph_msg_handler = NetGraphMsgHandler::new(chain_source.clone().genesis_hash, Some(chain_source.clone()), Arc::clone(&logger)); unsigned_announcement.short_channel_id += 1; msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]); @@ -1218,7 +1350,7 @@ mod tests { let secp_ctx = Secp256k1::new(); let logger: Arc = Arc::new(test_utils::TestLogger::new()); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); - let net_graph_msg_handler = NetGraphMsgHandler::new(Some(chain_source.clone()), Arc::clone(&logger)); + let net_graph_msg_handler = NetGraphMsgHandler::new(genesis_block(Network::Testnet).header.block_hash(), Some(chain_source.clone()), Arc::clone(&logger)); let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); @@ -1813,4 +1945,184 @@ mod tests { network.write(&mut w).unwrap(); assert!(::read(&mut ::std::io::Cursor::new(&w.0)).unwrap() == *network); } + + #[test] + fn calling_sync_routing_table() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey_1 = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_privkey_1); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + let first_blocknum = 0; + let number_of_blocks = 0xffff_ffff; + + // It should ignore if gossip_queries feature is not enabled + { + let init_msg = Init { features: InitFeatures::known().clear_gossip_queries() }; + net_graph_msg_handler.sync_routing_table(&node_id_1, &init_msg); + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 0); + } + + // It should send a query_channel_message with the correct information + { + let init_msg = Init { features: InitFeatures::known() }; + net_graph_msg_handler.sync_routing_table(&node_id_1, &init_msg); + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendChannelRangeQuery{ node_id, msg } => { + assert_eq!(node_id, &node_id_1); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.first_blocknum, first_blocknum); + assert_eq!(msg.number_of_blocks, number_of_blocks); + }, + _ => panic!("Expected MessageSendEvent::SendChannelRangeQuery") + }; + } + + // It should not enqueue a query when should_request_full_sync return false. + // The initial implementation allows syncing with the first 5 peers after + // which should_request_full_sync will return false + { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let init_msg = Init { features: InitFeatures::known() }; + for n in 1..7 { + let node_privkey = &SecretKey::from_slice(&[n; 32]).unwrap(); + let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey); + net_graph_msg_handler.sync_routing_table(&node_id, &init_msg); + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + if n <= 5 { + assert_eq!(events.len(), 1); + } else { + assert_eq!(events.len(), 0); + } + + } + } + } + + #[test] + fn handling_reply_channel_range() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey_1 = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_privkey_1); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + + // Test receipt of a single reply that should enqueue an SCID query + // matching the SCIDs in the reply + { + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 0, + number_of_blocks: 2000, + short_channel_ids: vec![ + 0x0003e0_000000_0000, // 992x0x0 + 0x0003e8_000000_0000, // 1000x0x0 + 0x0003e9_000000_0000, // 1001x0x0 + 0x0003f0_000000_0000, // 1008x0x0 + 0x00044c_000000_0000, // 1100x0x0 + 0x0006e0_000000_0000, // 1760x0x0 + ], + }); + assert!(result.is_ok()); + + // We expect to emit a query_short_channel_ids message with the received scids + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendShortIdsQuery { node_id, msg } => { + assert_eq!(node_id, &node_id_1); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.short_channel_ids, vec![ + 0x0003e0_000000_0000, // 992x0x0 + 0x0003e8_000000_0000, // 1000x0x0 + 0x0003e9_000000_0000, // 1001x0x0 + 0x0003f0_000000_0000, // 1008x0x0 + 0x00044c_000000_0000, // 1100x0x0 + 0x0006e0_000000_0000, // 1760x0x0 + ]); + }, + _ => panic!("expected MessageSendEvent::SendShortIdsQuery"), + } + } + + // Test receipt of a reply that indicates the remote node does not maintain up-to-date + // information for the chain_hash. Because of discrepancies in implementation we use + // full_information=false and short_channel_ids=[] as the signal. + { + // Handle the reply indicating the peer was unable to fulfill our request. + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange { + chain_hash, + full_information: false, + first_blocknum: 1000, + number_of_blocks: 100, + short_channel_ids: vec![], + }); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Received reply_channel_range with no information available"); + } + } + + #[test] + fn handling_reply_short_channel_ids() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + + // Test receipt of a successful reply + { + let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, ReplyShortChannelIdsEnd { + chain_hash, + full_information: true, + }); + assert!(result.is_ok()); + } + + // Test receipt of a reply that indicates the peer does not maintain up-to-date information + // for the chain_hash requested in the query. + { + let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, ReplyShortChannelIdsEnd { + chain_hash, + full_information: false, + }); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Received reply_short_channel_ids_end with no information"); + } + } + + #[test] + fn handling_query_channel_range() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + + let result = net_graph_msg_handler.handle_query_channel_range(&node_id, QueryChannelRange { + chain_hash, + first_blocknum: 0, + number_of_blocks: 0xffff_ffff, + }); + assert!(result.is_err()); + } + + #[test] + fn handling_query_short_channel_ids() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + + let result = net_graph_msg_handler.handle_query_short_channel_ids(&node_id, QueryShortChannelIds { + chain_hash, + short_channel_ids: vec![0x0003e8_000000_0000], + }); + assert!(result.is_err()); + } } diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index 490b6b40..f73a9a50 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -538,7 +538,7 @@ mod tests { fn build_graph() -> (Secp256k1, NetGraphMsgHandler, std::sync::Arc>, std::sync::Arc) { let secp_ctx = Secp256k1::new(); let logger = Arc::new(test_utils::TestLogger::new()); - let net_graph_msg_handler = NetGraphMsgHandler::new(None, Arc::clone(&logger)); + let net_graph_msg_handler = NetGraphMsgHandler::new(genesis_block(Network::Testnet).header.block_hash(), None, Arc::clone(&logger)); // Build network from our_id to node7: // // -1(1)2- node0 -1(3)2- @@ -1258,7 +1258,7 @@ mod tests { inbound_capacity_msat: 100000, is_live: true, }]; - let route = get_route(&source_node_id, &NetworkGraph::new(), &target_node_id, Some(&our_chans.iter().collect::>()), &last_hops.iter().collect::>(), 100, 42, Arc::new(test_utils::TestLogger::new())).unwrap(); + let route = get_route(&source_node_id, &NetworkGraph::new(genesis_block(Network::Testnet).header.block_hash()), &target_node_id, Some(&our_chans.iter().collect::>()), &last_hops.iter().collect::>(), 100, 42, Arc::new(test_utils::TestLogger::new())).unwrap(); assert_eq!(route.paths[0].len(), 2); diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index 3eeacdc7..6f6f32da 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -346,7 +346,22 @@ pub enum MessageSendEvent { PaymentFailureNetworkUpdate { /// The channel/node update which should be sent to NetGraphMsgHandler update: msgs::HTLCFailChannelUpdate, - } + }, + /// Query a peer for channels with funding transaction UTXOs in a block range. + SendChannelRangeQuery { + /// The node_id of this message recipient + node_id: PublicKey, + /// The query_channel_range which should be sent. + msg: msgs::QueryChannelRange, + }, + /// Request routing gossip messages from a peer for a list of channels identified by + /// their short_channel_ids. + SendShortIdsQuery { + /// The node_id of this message recipient + node_id: PublicKey, + /// The query_short_channel_ids which should be sent. + msg: msgs::QueryShortChannelIds, + }, } /// A trait indicating an object may generate message send events diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 6c3552d5..c944e572 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -316,8 +316,28 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { Vec::new() } - fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { - self.request_full_sync.load(Ordering::Acquire) + fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &msgs::Init) {} + + fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> { + Ok(()) + } + + fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), msgs::LightningError> { + Ok(()) + } + + fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), msgs::LightningError> { + Ok(()) + } + + fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), msgs::LightningError> { + Ok(()) + } +} + +impl events::MessageSendEventsProvider for TestRoutingMessageHandler { + fn get_and_clear_pending_msg_events(&self) -> Vec { + vec![] } }