From: bmancini55 Date: Thu, 22 Oct 2020 16:44:53 +0000 (-0400) Subject: Implement gossip_queries sync methods in NetGraphMsgHandler X-Git-Tag: v0.0.13~50^2~7 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=69da2daeae6cad74cbd087e5a6ea3a5376005ee0;p=rust-lightning Implement gossip_queries sync methods in NetGraphMsgHandler To perform a sync of routing gossip messages with a peer requires a two step process where we first initiate a channel range query to discover channels in a block range. Next we request the routing gossip messages for discovered channels. This code implements logic in NetGraphMsgHandler for performing these two tasks while taking into account the specification and variance in implementation. --- diff --git a/lightning/src/routing/network_graph.rs b/lightning/src/routing/network_graph.rs index 9f20219eb..b0cfb982e 100644 --- a/lightning/src/routing/network_graph.rs +++ b/lightning/src/routing/network_graph.rs @@ -37,9 +37,21 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; use std::collections::BTreeMap; use std::collections::btree_map::Entry as BtreeEntry; +use std::collections::HashMap; use std::ops::Deref; use bitcoin::hashes::hex::ToHex; +/// Maximum number of short_channel_id values that can be encoded in a +/// single reply_channel_range or query_short_channel_ids messages when +/// using raw encoding. The maximum value ensures that the 8-byte SCIDs +/// fit inside the maximum size of the Lightning message, 65535-bytes. +const MAX_SHORT_CHANNEL_ID_BATCH_SIZE: usize = 8000; + +/// Maximum number of reply_channel_range messages we will allow in +/// reply to a query_channel_range. This value creates an upper-limit +/// on the number of SCIDs we process in reply to a single query. +const MAX_REPLY_CHANNEL_RANGE_PER_QUERY: usize = 250; + /// Represents the network as nodes and channels between them #[derive(PartialEq)] pub struct NetworkGraph { @@ -64,6 +76,8 @@ pub struct NetGraphMsgHandler where C::Target: chain::Access chain_access: Option, full_syncs_requested: AtomicUsize, pending_events: Mutex>, + chan_range_query_tasks: Mutex>, + scid_query_tasks: Mutex>, logger: L, } @@ -83,6 +97,8 @@ impl NetGraphMsgHandler where C::Target: chain::Access full_syncs_requested: AtomicUsize::new(0), chain_access, pending_events: Mutex::new(vec![]), + chan_range_query_tasks: Mutex::new(HashMap::new()), + scid_query_tasks: Mutex::new(HashMap::new()), logger, } } @@ -96,6 +112,8 @@ impl NetGraphMsgHandler where C::Target: chain::Access full_syncs_requested: AtomicUsize::new(0), chain_access, pending_events: Mutex::new(vec![]), + chan_range_query_tasks: Mutex::new(HashMap::new()), + scid_query_tasks: Mutex::new(HashMap::new()), logger, } } @@ -107,6 +125,28 @@ impl NetGraphMsgHandler where C::Target: chain::Access pub fn read_locked_graph<'a>(&'a self) -> LockedNetworkGraph<'a> { LockedNetworkGraph(self.network_graph.read().unwrap()) } + + /// Enqueues a message send event for a batch of short_channel_ids + /// in a task. + fn finalize_query_short_ids(&self, task: &mut ScidQueryTask) { + let scid_size = std::cmp::min(task.short_channel_ids.len(), MAX_SHORT_CHANNEL_ID_BATCH_SIZE); + let mut short_channel_ids: Vec = Vec::with_capacity(scid_size); + for scid in task.short_channel_ids.drain(..scid_size) { + short_channel_ids.push(scid); + } + + log_debug!(self.logger, "Sending query_short_channel_ids peer={}, batch_size={}", log_pubkey!(task.node_id), scid_size); + + // enqueue the message to the peer + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push(events::MessageSendEvent::SendShortIdsQuery { + node_id: task.node_id.clone(), + msg: QueryShortChannelIds { + chain_hash: task.chain_hash.clone(), + short_channel_ids, + } + }); + } } impl<'a> LockedNetworkGraph<'a> { @@ -220,38 +260,270 @@ impl RoutingMessageHandler for N } } - fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _first_blocknum: u32, _number_of_blocks: u32) -> Result<(), LightningError> { - // TODO - Err(LightningError { - err: String::from("Not implemented"), - action: ErrorAction::IgnoreError, - }) + fn query_channel_range(&self, their_node_id: &PublicKey, chain_hash: BlockHash, first_blocknum: u32, number_of_blocks: u32) -> Result<(), LightningError> { + // We must ensure that we only have a single in-flight query + // to the remote peer. If we already have a query, then we fail + let mut query_range_tasks_lock = self.chan_range_query_tasks.lock().unwrap(); + let query_range_tasks = &mut *query_range_tasks_lock; + if query_range_tasks.contains_key(their_node_id) { + return Err(LightningError { + err: String::from("query_channel_range already in-flight"), + action: ErrorAction::IgnoreError, + }); + } + + // Construct a new task to keep track of the query until the full + // range query has been completed + let task = ChanRangeQueryTask::new(their_node_id, chain_hash, first_blocknum, number_of_blocks); + query_range_tasks.insert(their_node_id.clone(), task); + + // Enqueue the message send event + log_debug!(self.logger, "Sending query_channel_range peer={}, first_blocknum={}, number_of_blocks={}", log_pubkey!(their_node_id), first_blocknum, number_of_blocks); + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push(events::MessageSendEvent::SendChannelRangeQuery { + node_id: their_node_id.clone(), + msg: QueryChannelRange { + chain_hash, + first_blocknum, + number_of_blocks, + }, + }); + Ok(()) } - fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _short_channel_ids: Vec) -> Result<(), LightningError> { - // TODO - Err(LightningError { - err: String::from("Not implemented"), - action: ErrorAction::IgnoreError, - }) + /// A query should only request channels referring to unspent outputs. + /// This method does not validate this requirement and expects the + /// caller to ensure SCIDs are unspent. + fn query_short_channel_ids(&self, their_node_id: &PublicKey, chain_hash: BlockHash, short_channel_ids: Vec) -> Result<(), LightningError> { + // Create a new task or add to the existing task + let mut query_scids_tasks_lock = self.scid_query_tasks.lock().unwrap(); + let query_scids_tasks = &mut *query_scids_tasks_lock; + + // For an existing task we append the short_channel_ids which will be sent when the + // current in-flight batch completes. + if let Some(task) = query_scids_tasks.get_mut(their_node_id) { + task.add(short_channel_ids); + return Ok(()); + } + + // For a new task we create the task with short_channel_ids and send the first + // batch immediately. + query_scids_tasks.insert(their_node_id.clone(), ScidQueryTask::new( + their_node_id, + chain_hash.clone(), + short_channel_ids, + )); + let task = query_scids_tasks.get_mut(their_node_id).unwrap(); + self.finalize_query_short_ids(task); + return Ok(()); } - fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &ReplyChannelRange) -> Result<(), LightningError> { - // TODO - Err(LightningError { - err: String::from("Not implemented"), - action: ErrorAction::IgnoreError, - }) + fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: &ReplyChannelRange) -> Result<(), LightningError> { + log_debug!(self.logger, "Handling reply_channel_range peer={}, first_blocknum={}, number_of_blocks={}, full_information={}, scids={}", log_pubkey!(their_node_id), msg.first_blocknum, msg.number_of_blocks, msg.full_information, msg.short_channel_ids.len(),); + + // First we obtain a lock on the task hashmap. In order to avoid borrowing issues + // we will access the task as needed. + let mut query_range_tasks = self.chan_range_query_tasks.lock().unwrap(); + + // If there is no currently executing task then we have received + // an invalid message and will return an error + if query_range_tasks.get(their_node_id).is_none() { + return Err(LightningError { + err: String::from("Received unknown reply_channel_range message"), + action: ErrorAction::IgnoreError, + }); + } + + // Now that we know we have a task, we can extract a few values for use + // in validations without having to access the task repeatedly + let (task_chain_hash, task_first_blocknum, task_number_of_blocks, task_received_first_block, task_received_last_block, task_number_of_replies) = { + let task = query_range_tasks.get(their_node_id).unwrap(); + (task.chain_hash, task.first_blocknum, task.number_of_blocks, task.received_first_block, task.received_last_block, task.number_of_replies) + }; + + // Validate the chain_hash matches the chain_hash we used in the query. + // If it does not, then the message is malformed and we return an error + if msg.chain_hash != task_chain_hash { + query_range_tasks.remove(their_node_id); + return Err(LightningError { + err: String::from("Received reply_channel_range with invalid chain_hash"), + action: ErrorAction::IgnoreError, + }); + } + + // Validate that the remote node maintains up-to-date channel + // information for chain_hash. Some nodes use the full_information + // flag to indicate multi-part messages so we must check whether + // we received information as well. + if !msg.full_information && msg.short_channel_ids.len() == 0 { + query_range_tasks.remove(their_node_id); + return Err(LightningError { + err: String::from("Received reply_channel_range with no information available"), + action: ErrorAction::IgnoreError, + }); + } + + // Calculate the last block for the message and the task + let msg_last_block = last_blocknum(msg.first_blocknum, msg.number_of_blocks); + let task_last_block = last_blocknum(task_first_blocknum, task_number_of_blocks); + + // On the first message... + if task_received_first_block.is_none() { + // The replies can be a superset of the queried block range, but the + // replies must include our requested query range. We check if the + // start of the replies is greater than the start of our query. If + // so, the start of our query is excluded and the message is malformed. + if msg.first_blocknum > task_first_blocknum { + query_range_tasks.remove(their_node_id); + return Err(LightningError { + err: String::from("Failing reply_channel_range with invalid first_blocknum"), + action: ErrorAction::IgnoreError, + }); + } + + // Next, we ensure the reply has at least some information matching + // our query. If the received last_blocknum is less than our query's + // first_blocknum then the reply does not encompass the query range + // and the message is malformed. + if msg_last_block < task_first_blocknum { + query_range_tasks.remove(their_node_id); + return Err(LightningError { + err: String::from("Failing reply_channel_range with non-overlapping first reply"), + action: ErrorAction::IgnoreError, + }); + } + + // Capture the first block and last block so that subsequent messages + // can be validated. + let task = query_range_tasks.get_mut(their_node_id).unwrap(); + task.received_first_block = Some(msg.first_blocknum); + task.received_last_block = Some(msg_last_block); + } + // On subsequent message(s)... + else { + // We need to validate the sequence of the reply message is expected. + // Subsequent messages must set the first_blocknum to the previous + // message's first_blocknum plus number_of_blocks. There is discrepancy + // in implementation where some resume on the last sent block. We will + // loosen the restriction and accept either, and otherwise consider the + // message malformed and return an error. + let task_received_last_block = task_received_last_block.unwrap(); + if msg.first_blocknum != task_received_last_block && msg.first_blocknum != task_received_last_block + 1 { + query_range_tasks.remove(their_node_id); + return Err(LightningError { + err: String::from("Failing reply_channel_range with invalid sequence"), + action: ErrorAction::IgnoreError, + }); + } + + // Next we check to see that we have received a realistic number of + // reply messages for a query. This caps the allocation exposure + // for short_channel_ids that will be batched and sent in query channels. + if task_number_of_replies + 1 > MAX_REPLY_CHANNEL_RANGE_PER_QUERY { + query_range_tasks.remove(their_node_id); + return Err(LightningError { + err: String::from("Failing reply_channel_range due to excessive messages"), + action: ErrorAction::IgnoreError, + }); + } + + // Capture the last_block in our task so that subsequent messages + // can be validated. + let task = query_range_tasks.get_mut(their_node_id).unwrap(); + task.number_of_replies += 1; + task.received_last_block = Some(msg_last_block); + } + + // We filter the short_channel_ids to those inside the query range. + // The most significant 3-bytes of the short_channel_id are the block. + { + let mut filtered_short_channel_ids: Vec = msg.short_channel_ids.clone().into_iter().filter(|short_channel_id| { + let block = short_channel_id >> 40; + return block >= query_range_tasks.get(their_node_id).unwrap().first_blocknum as u64 && block <= task_last_block as u64; + }).collect(); + let task = query_range_tasks.get_mut(their_node_id).unwrap(); + task.short_channel_ids.append(&mut filtered_short_channel_ids); + } + + // The final message is indicated by a last_blocknum that is equal to + // or greater than the query's last_blocknum. + if msg_last_block >= task_last_block { + log_debug!(self.logger, "Completed query_channel_range: peer={}, first_blocknum={}, number_of_blocks={}", log_pubkey!(their_node_id), task_first_blocknum, task_number_of_blocks); + + // We can now fire off a query to obtain routing messages for the + // accumulated short_channel_ids. + { + let task = query_range_tasks.get_mut(their_node_id).unwrap(); + let mut short_channel_ids = Vec::new(); + std::mem::swap(&mut short_channel_ids, &mut task.short_channel_ids); + self.query_short_channel_ids(their_node_id, task.chain_hash, short_channel_ids)?; + } + + // We can remove the query range task now that the query is complete. + query_range_tasks.remove(their_node_id); + } + Ok(()) } - fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError> { - // TODO - Err(LightningError { - err: String::from("Not implemented"), - action: ErrorAction::IgnoreError, - }) + /// When a query is initiated the remote peer will begin streaming + /// gossip messages. In the event of a failure, we may have received + /// some channel information. Before trying with another peer, the + /// caller should update its set of SCIDs that need to be queried. + fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError> { + log_debug!(self.logger, "Handling reply_short_channel_ids_end peer={}, full_information={}", log_pubkey!(their_node_id), msg.full_information); + + // First we obtain a lock on the task hashmap. In order to avoid borrowing issues + // we will access the task as needed. + let mut query_short_channel_ids_tasks = self.scid_query_tasks.lock().unwrap(); + + // If there is no existing task then we have received an unknown + // message and should return an error. + if query_short_channel_ids_tasks.get(their_node_id).is_none() { + return Err(LightningError { + err: String::from("Unknown reply_short_channel_ids_end message"), + action: ErrorAction::IgnoreError, + }); + } + + // If the reply's chain_hash does not match the task's chain_hash then + // the reply is malformed and we should return an error. + if msg.chain_hash != query_short_channel_ids_tasks.get(their_node_id).unwrap().chain_hash { + query_short_channel_ids_tasks.remove(their_node_id); + return Err(LightningError { + err: String::from("Received reply_short_channel_ids_end with incorrect chain_hash"), + action: ErrorAction::IgnoreError + }); + } + + // If the remote node does not have up-to-date information for the + // chain_hash they will set full_information=false. We can fail + // the result and try again with a different peer. + if !msg.full_information { + query_short_channel_ids_tasks.remove(their_node_id); + return Err(LightningError { + err: String::from("Received reply_short_channel_ids_end with no information"), + action: ErrorAction::IgnoreError + }); + } + + // If we have more scids to process we send the next batch in the task + { + let task = query_short_channel_ids_tasks.get_mut(their_node_id).unwrap(); + if task.short_channel_ids.len() > 0 { + self.finalize_query_short_ids(task); + return Ok(()); + } + } + + // Otherwise the task is complete and we can remove it + log_debug!(self.logger, "Completed query_short_channel_ids peer={}", log_pubkey!(their_node_id)); + query_short_channel_ids_tasks.remove(their_node_id); + Ok(()) } + /// There are potential DoS vectors when handling inbound queries. + /// Handling requests with first_blocknum very far away may trigger repeated + /// disk I/O if the NetworkGraph is not fully in-memory. fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &QueryChannelRange) -> Result<(), LightningError> { // TODO Err(LightningError { @@ -260,6 +532,9 @@ impl RoutingMessageHandler for N }) } + /// There are potential DoS vectors when handling inbound queries. + /// Handling requests with first_blocknum very far away may trigger repeated + /// disk I/O if the NetworkGraph is not fully in-memory. fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &QueryShortChannelIds) -> Result<(), LightningError> { // TODO Err(LightningError { @@ -282,6 +557,118 @@ where } } +/// Safely calculates the last_blocknum given a first_blocknum and +/// number_of_blocks by returning the u32::MAX-1 if there is an overflow +fn last_blocknum(first_blocknum: u32, number_of_blocks: u32) -> u32 { + match first_blocknum.checked_add(number_of_blocks) { + Some(val) => val - 1, + None => 0xffff_ffff - 1, + } +} + +/// Maintains state for a channel range query that we initiated. +/// The query may result in one or more reply_channel_range messages +/// being received. This struct helps determine the status of the query +/// when there are multiple replies. It also collects results for initiating +/// SCID queries. +/// +/// The task is complete and can be cleaned up when a reply meets or +/// exceeds the last block in the query. The collected SCIDs in the task +/// can be used to generate an ScidQueryTask. +/// +/// A query may fail if the recipient does not maintain up-to-date +/// information for the chain or if the recipient fails to reply within +/// a reasonable amount of time. In either event, the query can be +/// re-initiated with a different peer. +pub struct ChanRangeQueryTask { + /// The public key of the node we will be sending queries to + pub node_id: PublicKey, + /// The genesis hash of the blockchain being queried + pub chain_hash: BlockHash, + /// The height of the first block for the channel UTXOs being queried + pub first_blocknum: u32, + /// The number of blocks to include in the query results + pub number_of_blocks: u32, + /// Tracks the number of reply messages we have received + pub number_of_replies: usize, + /// The height of the first block received in a reply. This value + /// should be less than or equal to the first_blocknum requested in + /// the query_channel_range. This allows the range of the replies to + /// contain, but not necessarily strictly, the queried range. + pub received_first_block: Option, + /// The height of the last block received in a reply. This value + /// will get incrementally closer to the target of + /// first_blocknum plus number_of_blocks from the query_channel_range. + pub received_last_block: Option, + /// Contains short_channel_ids received in one or more reply messages. + /// These will be sent in one ore more query_short_channel_ids messages + /// when the task is complete. + pub short_channel_ids: Vec, +} + +impl ChanRangeQueryTask { + /// Constructs a new GossipQueryRangeTask + pub fn new(their_node_id: &PublicKey, chain_hash: BlockHash, first_blocknum: u32, number_of_blocks: u32) -> Self { + ChanRangeQueryTask { + node_id: their_node_id.clone(), + chain_hash, + first_blocknum, + number_of_blocks, + number_of_replies: 0, + received_first_block: None, + received_last_block: None, + short_channel_ids: vec![], + } + } +} + +/// Maintains state when sending one or more short_channel_ids messages +/// to a peer. Only a single SCID query can be in-flight with a peer. The +/// number of SCIDs per query is limited by the size of a Lightning message +/// payload. When querying a large number of SCIDs (results of a large +/// channel range query for instance), multiple query_short_channel_ids +/// messages need to be sent. This task maintains the list of awaiting +/// SCIDs to be queried. +/// +/// When a successful reply_short_channel_ids_end message is received, the +/// next batch of SCIDs can be sent. When no remaining SCIDs exist in the +/// task, the task is complete and can be cleaned up. +/// +/// The recipient may reply indicating that up-to-date information for the +/// chain is not maintained. A query may also fail to complete within a +/// reasonable amount of time. In either event, the short_channel_ids +/// can be queried from a different peer after validating the set of +/// SCIDs that still need to be queried. +pub struct ScidQueryTask { + /// The public key of the node we will be sending queries to + pub node_id: PublicKey, + /// The genesis hash of the blockchain being queried + pub chain_hash: BlockHash, + /// A vector of short_channel_ids that we would like routing gossip + /// information for. This list will be chunked and sent to the peer + /// in one or more query_short_channel_ids messages. + pub short_channel_ids: Vec, +} + +impl ScidQueryTask { + /// Constructs a new GossipQueryShortChannelIdsTask + pub fn new(their_node_id: &PublicKey, chain_hash: BlockHash, short_channel_ids: Vec) -> Self { + ScidQueryTask { + node_id: their_node_id.clone(), + chain_hash, + short_channel_ids, + } + } + + /// Adds short_channel_ids to the pending list of short_channel_ids + /// to be sent in the next request. You can add additional values + /// while a query is in-flight. These new values will be sent once + /// the active query has completed. + pub fn add(&mut self, mut short_channel_ids: Vec) { + self.short_channel_ids.append(&mut short_channel_ids); + } +} + #[derive(PartialEq, Debug)] /// Details about one direction of a channel. Received /// within a channel update. @@ -954,10 +1341,11 @@ mod tests { use routing::network_graph::{NetGraphMsgHandler, NetworkGraph}; use ln::msgs::{OptionalField, RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement, UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, HTLCFailChannelUpdate, - MAX_VALUE_MSAT}; + ReplyChannelRange, ReplyShortChannelIdsEnd, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT}; use util::test_utils; use util::logger::Logger; use util::ser::{Readable, Writeable}; + use util::events::{MessageSendEvent, MessageSendEventsProvider}; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use bitcoin::hashes::Hash; @@ -1881,4 +2269,742 @@ mod tests { network.write(&mut w).unwrap(); assert!(::read(&mut ::std::io::Cursor::new(&w.0)).unwrap() == *network); } + + #[test] + fn sending_query_channel_range() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey_1 = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_privkey_2 = &SecretKey::from_slice(&[41; 32]).unwrap(); + let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_privkey_1); + let node_id_2 = PublicKey::from_secret_key(&secp_ctx, node_privkey_2); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + let first_blocknum = 0; + let number_of_blocks = 0xffff_ffff; + + // When no active query exists for the node, it should send a query message and generate a task + { + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, first_blocknum, number_of_blocks); + assert!(result.is_ok()); + + // It should create a task for the query + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().contains_key(&node_id_1)); + + // It should send a query_channel_range message with the correct information + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendChannelRangeQuery{ node_id, msg } => { + assert_eq!(node_id, &node_id_1); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.first_blocknum, first_blocknum); + assert_eq!(msg.number_of_blocks, number_of_blocks); + }, + _ => panic!("Expected MessageSendEvent::SendChannelRangeQuery") + }; + } + + // When an active query exists for the node, when there is a subsequent query request, it + // should fail to initiate a new query + { + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, first_blocknum, number_of_blocks); + assert_eq!(result.is_err(), true); + } + + // When no active query exists for a different node, it should send a query message + { + let result = net_graph_msg_handler.query_channel_range(&node_id_2, chain_hash, first_blocknum, number_of_blocks); + assert_eq!(result.is_ok(), true); + + // It should create a task for the query + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().contains_key(&node_id_2)); + + // It should send a query_channel_message with the correct information + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendChannelRangeQuery{ node_id, msg } => { + assert_eq!(node_id, &node_id_2); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.first_blocknum, first_blocknum); + assert_eq!(msg.number_of_blocks, number_of_blocks); + }, + _ => panic!("Expected MessageSendEvent::SendChannelRangeQuery") + }; + } + } + + #[test] + fn sending_query_short_channel_ids() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey_1 = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_privkey_1); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + + // The first query should send the batch of scids to the peer + { + let short_channel_ids: Vec = vec![0, 1, 2]; + let result = net_graph_msg_handler.query_short_channel_ids(&node_id_1, chain_hash, short_channel_ids.clone()); + assert!(result.is_ok()); + + // Validate that we have enqueued a send message event and that it contains the correct information + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendShortIdsQuery{ node_id, msg } => { + assert_eq!(node_id, &node_id_1); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.short_channel_ids, short_channel_ids); + }, + _ => panic!("Expected MessageSendEvent::SendShortIdsQuery") + }; + } + + // Subsequent queries for scids should enqueue them to be sent in the next batch which will + // be sent when a reply_short_channel_ids_end message is handled. + { + let short_channel_ids: Vec = vec![3, 4, 5]; + let result = net_graph_msg_handler.query_short_channel_ids(&node_id_1, chain_hash, short_channel_ids.clone()); + assert!(result.is_ok()); + + // Validate that we have not enqueued another send message event yet + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 0); + + // Validate the task has the queued scids + assert_eq!( + net_graph_msg_handler.scid_query_tasks.lock().unwrap().get(&node_id_1).unwrap().short_channel_ids, + short_channel_ids + ); + } + } + + #[test] + fn handling_reply_channel_range() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey_1 = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_privkey_1); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + + // Test receipt of an unknown reply message. We expect an error + { + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1000, + number_of_blocks: 1050, + short_channel_ids: vec![ + 0x0003e8_000000_0000, // 1000x0x0 + 0x0003e9_000000_0000, // 1001x0x0 + 0x0003f0_000000_0000 // 1008x0x0 + ], + }); + assert!(result.is_err()); + } + + // Test receipt of a single reply_channel_range that exactly matches the queried range. + // It sends a query_short_channel_ids with the returned scids and removes the pending task + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 100); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle a single successful reply that matches the queried channel range + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1000, + number_of_blocks: 100, + short_channel_ids: vec![ + 0x0003e8_000000_0000, // 1000x0x0 + 0x0003e9_000000_0000, // 1001x0x0 + 0x0003f0_000000_0000 // 1008x0x0 + ], + }); + assert!(result.is_ok()); + + // The query is now complete, so we expect the task to be removed + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + + // We expect to emit a query_short_channel_ids message with scids in our query range + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendShortIdsQuery { node_id, msg } => { + assert_eq!(node_id, &node_id_1); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.short_channel_ids, vec![0x0003e8_000000_0000,0x0003e9_000000_0000,0x0003f0_000000_0000]); + }, + _ => panic!("expected MessageSendEvent::SendShortIdsQuery"), + } + + // Clean up scid_task + net_graph_msg_handler.scid_query_tasks.lock().unwrap().clear(); + } + + // Test receipt of a single reply_channel_range for a query that has a u32 overflow. We expect + // it sends a query_short_channel_ids with the returned scids and removes the pending task. + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 0xffff_ffff); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle a single successful reply that matches the queried channel range + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1000, + number_of_blocks: 0xffff_ffff, + short_channel_ids: vec![ + 0x0003e8_000000_0000, // 1000x0x0 + 0x0003e9_000000_0000, // 1001x0x0 + 0x0003f0_000000_0000 // 1008x0x0 + ], + }); + assert!(result.is_ok()); + + // The query is now complete, so we expect the task to be removed + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + + // We expect to emit a query_short_channel_ids message with scids in our query range + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendShortIdsQuery { node_id, msg } => { + assert_eq!(node_id, &node_id_1); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.short_channel_ids, vec![0x0003e8_000000_0000,0x0003e9_000000_0000,0x0003f0_000000_0000]); + }, + _ => panic!("expected MessageSendEvent::SendShortIdsQuery"), + } + + // Clean up scid_task + net_graph_msg_handler.scid_query_tasks.lock().unwrap().clear(); + } + + // Test receipt of a single reply that encompasses the queried channel range. This is allowed + // since a reply must contain at least part of the query range. Receipt of the reply should + // send a query_short_channel_ids message with scids filtered to the query range and remove + // the pending task. + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 100); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle a single successful reply that encompasses the queried channel range + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 0, + number_of_blocks: 2000, + short_channel_ids: vec![ + 0x0003e0_000000_0000, // 992x0x0 + 0x0003e8_000000_0000, // 1000x0x0 + 0x0003e9_000000_0000, // 1001x0x0 + 0x0003f0_000000_0000, // 1008x0x0 + 0x00044c_000000_0000, // 1100x0x0 + 0x0006e0_000000_0000, // 1760x0x0 + ], + }); + assert!(result.is_ok()); + + // The query is now complete, so we expect the task to be removed + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + + // We expect to emit a query_short_channel_ids message with scids filtered to those + // within the original query range. + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendShortIdsQuery { node_id, msg } => { + assert_eq!(node_id, &node_id_1); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.short_channel_ids, vec![0x0003e8_000000_0000,0x0003e9_000000_0000,0x0003f0_000000_0000]); + }, + _ => panic!("expected MessageSendEvent::SendShortIdsQuery"), + } + + // Clean up scid_task + net_graph_msg_handler.scid_query_tasks.lock().unwrap().clear(); + } + + // Test receipt of multiple reply messages for a single query. This happens when the number + // of scids in the query range exceeds the size limits of a single reply message. We expect + // to initiate a query_short_channel_ids for the first batch of scids and we enqueue the + // remaining scids for later processing. We remove the range query task after receipt of all + // reply messages. + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 100); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle the first reply message + let reply_1_scids = vec![ + 0x0003e8_000000_0000, // 1000x0x0 + 0x0003e9_000000_0000, // 1001x0x0 + 0x000419_000000_0000, // 1049x0x0 + ]; + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1000, + number_of_blocks: 50, + short_channel_ids: reply_1_scids.clone(), + }); + assert!(result.is_ok()); + + // Handle the next reply in the sequence, which must start at the previous message's + // first_blocknum plus number_of_blocks. The scids in this reply will be queued. + let reply_2_scids = vec![ + 0x00041a_000000_0000, // 1050x0x0 + 0x000432_000000_0000, // 1074x0x0 + ]; + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1050, + number_of_blocks: 25, + short_channel_ids: reply_2_scids.clone(), + }); + assert!(result.is_ok()); + + // Handle the final reply in the sequence, which must meet or exceed the initial query's + // first_blocknum plus number_of_blocks. The scids in this reply will be queued. + let reply_3_scids = vec![ + 0x000433_000000_0000, // 1075x0x0 + 0x00044b_000000_0000, // 1099x0x0 + ]; + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1075, + number_of_blocks: 25, + short_channel_ids: reply_3_scids.clone(), + }); + assert!(result.is_ok()); + + // After the final reply we expect the query task to be removed + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + + // We expect to emit a query_short_channel_ids message with the accumulated scids that + // match the queried channel range. + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendShortIdsQuery { node_id, msg } => { + assert_eq!(node_id, &node_id_1); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.short_channel_ids, [reply_1_scids, reply_2_scids, reply_3_scids].concat()); + }, + _ => panic!("expected MessageSendEvent::SendShortIdsQuery"), + } + + // Clean up scid_task + net_graph_msg_handler.scid_query_tasks.lock().unwrap().clear(); + } + + // Test receipt of a sequence of replies with a valid first reply and a second reply that + // resumes on the same block as the first reply. The spec requires a subsequent + // first_blocknum to equal the prior first_blocknum plus number_of_blocks, however + // due to discrepancies in implementation we must loosen this restriction. + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 100); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle the first reply message + let reply_1_scids = vec![ + 0x0003e8_000000_0000, // 1000x0x0 + 0x0003e9_000000_0000, // 1001x0x0 + 0x000419_000000_0000, // 1049x0x0 + ]; + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1000, + number_of_blocks: 50, + short_channel_ids: reply_1_scids.clone(), + }); + assert!(result.is_ok()); + + // Handle the next reply in the sequence, which is non-spec but resumes on the last block + // of the first message. + let reply_2_scids = vec![ + 0x000419_000001_0000, // 1049x1x0 + 0x00041a_000000_0000, // 1050x0x0 + 0x000432_000000_0000, // 1074x0x0 + ]; + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1049, + number_of_blocks: 51, + short_channel_ids: reply_2_scids.clone(), + }); + assert!(result.is_ok()); + + // After the final reply we expect the query task to be removed + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + + // We expect to emit a query_short_channel_ids message with the accumulated scids that + // match the queried channel range + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendShortIdsQuery { node_id, msg } => { + assert_eq!(node_id, &node_id_1); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.short_channel_ids, [reply_1_scids, reply_2_scids].concat()); + }, + _ => panic!("expected MessageSendEvent::SendShortIdsQuery"), + } + + // Clean up scid_task + net_graph_msg_handler.scid_query_tasks.lock().unwrap().clear(); + } + + // Test receipt of reply with a chain_hash that does not match the query. We expect to return + // an error and to remove the query task. + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 100); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle the reply with a mismatched chain_hash. We expect IgnoreError result and the + // task should be removed. + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash: genesis_block(Network::Bitcoin).header.block_hash(), + full_information: true, + first_blocknum: 1000, + number_of_blocks: 1050, + short_channel_ids: vec![0x0003e8_000000_0000,0x0003e9_000000_0000,0x0003f0_000000_0000], + }); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Received reply_channel_range with invalid chain_hash"); + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + } + + // Test receipt of a reply that indicates the remote node does not maintain up-to-date + // information for the chain_hash. Because of discrepancies in implementation we use + // full_information=false and short_channel_ids=[] as the signal. We should expect an error + // and the task should be removed. + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 100); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle the reply indicating the peer was unable to fulfill our request. + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: false, + first_blocknum: 1000, + number_of_blocks: 100, + short_channel_ids: vec![], + }); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Received reply_channel_range with no information available"); + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + } + + // Test receipt of a reply that has a first_blocknum that is above the first_blocknum + // requested in our query. The reply must contain the queried block range. We expect an + // error result and the task should be removed. + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 100); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle the reply that has a first_blocknum above the query's first_blocknum + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1001, + number_of_blocks: 100, + short_channel_ids: vec![], + }); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Failing reply_channel_range with invalid first_blocknum"); + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + } + + // Test receipt of a first reply that does not overlap the query range at all. The first message + // must have some overlap with the query. We expect an error result and the task should + // be removed. + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 100); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle a reply that contains a block range that precedes the queried block range + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 0, + number_of_blocks: 1000, + short_channel_ids: vec![], + }); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Failing reply_channel_range with non-overlapping first reply"); + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + } + + // Test receipt of a sequence of replies with a valid first reply and a second reply that is + // non-sequential. The spec requires a subsequent first_blocknum to equal the prior + // first_blocknum plus number_of_blocks. We expect an IgnoreError result and the task should + // be removed. + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 100); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle the first reply + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1000, + number_of_blocks: 50, + short_channel_ids: vec![0x0003e8_000000_0000,0x0003e9_000000_0000,0x0003f0_000000_0000], + }); + assert!(result.is_ok()); + + // Handle the second reply which does not start at the proper first_blocknum. We expect + // to return an error and remove the task. + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: 1051, + number_of_blocks: 50, + short_channel_ids: vec![0x0003f1_000000_0000,0x0003f2_000000_0000], + }); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Failing reply_channel_range with invalid sequence"); + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + } + + // Test receipt of too many reply messages. We expect an IgnoreError result and the task should + // be removed. + { + // Initiate a channel range query to create a query task + let result = net_graph_msg_handler.query_channel_range(&node_id_1, chain_hash, 1000, 0xffff_ffff); + assert!(result.is_ok()); + + // Clear the SendRangeQuery event + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle a sequence of replies that will fail once the max number of reply has been exceeded. + for block in 1000..=1000 + super::MAX_REPLY_CHANNEL_RANGE_PER_QUERY + 10 { + let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, &ReplyChannelRange { + chain_hash, + full_information: true, + first_blocknum: block as u32, + number_of_blocks: 1, + short_channel_ids: vec![(block as u64) << 40], + }); + if block <= 1000 + super::MAX_REPLY_CHANNEL_RANGE_PER_QUERY { + assert!(result.is_ok()); + } else if block == 1001 + super::MAX_REPLY_CHANNEL_RANGE_PER_QUERY { + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Failing reply_channel_range due to excessive messages"); + } else { + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Received unknown reply_channel_range message"); + } + } + + // Expect the task to be removed + assert!(net_graph_msg_handler.chan_range_query_tasks.lock().unwrap().is_empty()); + } + } + + #[test] + fn handling_reply_short_channel_ids() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + + // Test receipt of a reply when no query exists. We expect an error to be returned + { + let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, &ReplyShortChannelIdsEnd { + chain_hash, + full_information: true, + }); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Unknown reply_short_channel_ids_end message"); + } + + // Test receipt of a reply that is for a different chain_hash. We expect an error and the task + // should be removed. + { + // Initiate a query to create a pending query task + let result = net_graph_msg_handler.query_short_channel_ids(&node_id, chain_hash, vec![0x0003e8_000000_0000]); + assert!(result.is_ok()); + + // Process reply with incorrect chain_hash + let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, &ReplyShortChannelIdsEnd { + chain_hash: genesis_block(Network::Bitcoin).header.block_hash(), + full_information: true, + }); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Received reply_short_channel_ids_end with incorrect chain_hash"); + + // Expect the task to be removed + assert!(net_graph_msg_handler.scid_query_tasks.lock().unwrap().is_empty()); + } + + // Test receipt of a reply that indicates the peer does not maintain up-to-date information + // for the chain_hash requested in the query. We expect an error and task should be removed. + { + // Initiate a query to create a pending query task + let result = net_graph_msg_handler.query_short_channel_ids(&node_id, chain_hash, vec![0x0003e8_000000_0000]); + assert!(result.is_ok()); + + // Process failed reply + let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, &ReplyShortChannelIdsEnd { + chain_hash, + full_information: false, + }); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().err, "Received reply_short_channel_ids_end with no information"); + + // Expect the task to be removed + assert!(net_graph_msg_handler.scid_query_tasks.lock().unwrap().is_empty()); + } + + // Test receipt of a successful reply when there are no additional scids to query. We expect + // the task to be removed. + { + // Initiate a query to create a pending query task + let result = net_graph_msg_handler.query_short_channel_ids(&node_id, chain_hash, vec![0x0003e8_000000_0000]); + assert!(result.is_ok()); + + // Process success reply + let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, &ReplyShortChannelIdsEnd { + chain_hash, + full_information: true, + }); + assert!(result.is_ok()); + + // Expect the task to be removed + assert!(net_graph_msg_handler.scid_query_tasks.lock().unwrap().is_empty()); + } + + // Test receipt of a successful reply when there are additional scids to query. We expect + // additional queries to be sent until the task can be removed. + { + // Initiate a query to create a pending query task + let result = net_graph_msg_handler.query_short_channel_ids(&node_id, chain_hash, vec![0x0003e8_000000_0000]); + assert!(result.is_ok()); + + // Initiate a second query to add pending scids to the task + let result = net_graph_msg_handler.query_short_channel_ids(&node_id, chain_hash, vec![0x0003e9_000000_0000]); + assert!(result.is_ok()); + assert_eq!(net_graph_msg_handler.scid_query_tasks.lock().unwrap().get(&node_id).unwrap().short_channel_ids, vec![0x0003e9_000000_0000]); + + // Initiate a third query to add pending scids to the task + let result = net_graph_msg_handler.query_short_channel_ids(&node_id, chain_hash, vec![0x0003f0_000000_0000]); + assert!(result.is_ok()); + assert_eq!(net_graph_msg_handler.scid_query_tasks.lock().unwrap().get(&node_id).unwrap().short_channel_ids, vec![0x0003e9_000000_0000, 0x0003f0_000000_0000]); + + // Clear all of the pending send events + net_graph_msg_handler.get_and_clear_pending_msg_events(); + + // Handle the first successful reply, which will send the next batch of scids in a new query + let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, &ReplyShortChannelIdsEnd { + chain_hash, + full_information: true, + }); + assert!(result.is_ok()); + + // We expect the second batch to be sent in an event + let expected_node_id = &node_id; + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), 1); + match &events[0] { + MessageSendEvent::SendShortIdsQuery { node_id, msg } => { + assert_eq!(node_id, expected_node_id); + assert_eq!(msg.chain_hash, chain_hash); + assert_eq!(msg.short_channel_ids, vec![0x0003e9_000000_0000, 0x0003f0_000000_0000]); + }, + _ => panic!("expected MessageSendEvent::SendShortIdsQuery"), + } + + // We expect the scids to be cleared from the task + assert_eq!(net_graph_msg_handler.scid_query_tasks.lock().unwrap().get(&node_id).unwrap().short_channel_ids.len(), 0); + + // Handle the second successful reply + let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, &ReplyShortChannelIdsEnd { + chain_hash, + full_information: true, + }); + assert!(result.is_ok()); + + // We expect the task should be removed + assert!(net_graph_msg_handler.scid_query_tasks.lock().unwrap().is_empty()); + } + } + + #[test] + fn handling_query_channel_range() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + + let result = net_graph_msg_handler.handle_query_channel_range(&node_id, &QueryChannelRange { + chain_hash, + first_blocknum: 0, + number_of_blocks: 0xffff_ffff, + }); + assert!(result.is_err()); + } + + #[test] + fn handling_query_short_channel_ids() { + let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); + let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey); + + let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + + let result = net_graph_msg_handler.handle_query_short_channel_ids(&node_id, &QueryShortChannelIds { + chain_hash, + short_channel_ids: vec![0x0003e8_000000_0000], + }); + assert!(result.is_err()); + } }