From 44ba52ccf10bb0362ed2964b66ec2ae51e388161 Mon Sep 17 00:00:00 2001 From: bmancini55 Date: Wed, 3 Mar 2021 16:48:19 -0500 Subject: [PATCH] Handle query_channel_range message from peer Initial implementation of handling query_channel_range messages in NetGraphMsgHandler. Enqueues a sequence of reply message in the pending message events buffer. --- lightning/src/routing/network_graph.rs | 459 ++++++++++++++++++++++++- 1 file changed, 444 insertions(+), 15 deletions(-) diff --git a/lightning/src/routing/network_graph.rs b/lightning/src/routing/network_graph.rs index 13dc6626..34859cf0 100644 --- a/lightning/src/routing/network_graph.rs +++ b/lightning/src/routing/network_graph.rs @@ -30,6 +30,7 @@ use ln::msgs; use util::ser::{Writeable, Readable, Writer}; use util::logger::Logger; use util::events::{MessageSendEvent, MessageSendEventsProvider}; +use util::scid_utils::{block_from_scid, scid_from_parts}; use std::{cmp, fmt}; use std::sync::{RwLock, RwLockReadGuard}; @@ -70,6 +71,11 @@ pub struct NetGraphMsgHandler where C::Target: chain::Access full_syncs_requested: AtomicUsize, pending_events: Mutex>, logger: L, + + /// Maximum number of short_channel_ids that will be encoded in one gossip reply message. + /// Default is 8000 which ensures a reply fits within the 65k payload limit and is + /// consistent with other implementations. + max_reply_scids: usize, } impl NetGraphMsgHandler where C::Target: chain::Access, L::Target: Logger { @@ -86,6 +92,7 @@ impl NetGraphMsgHandler where C::Target: chain::Access chain_access, pending_events: Mutex::new(vec![]), logger, + max_reply_scids: 8000, } } @@ -99,6 +106,7 @@ impl NetGraphMsgHandler where C::Target: chain::Access chain_access, pending_events: Mutex::new(vec![]), logger, + max_reply_scids: 8000, } } @@ -312,12 +320,100 @@ impl RoutingMessageHandler for N Ok(()) } - fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { - // TODO - Err(LightningError { - err: String::from("Not implemented"), - action: ErrorAction::IgnoreError, - }) + /// Processes a query from a peer by finding channels whose funding UTXOs + /// are in the specified block range. Due to message size limits, large range + /// queries may result in several reply messages. This implementation enqueues + /// all reply messages into pending events. + fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> { + log_debug!(self.logger, "Handling query_channel_range peer={}, first_blocknum={}, number_of_blocks={}", log_pubkey!(their_node_id), msg.first_blocknum, msg.number_of_blocks); + + let network_graph = self.network_graph.read().unwrap(); + + let start_scid = scid_from_parts(msg.first_blocknum, 0, 0); + + // We receive valid queries with end_blocknum that would overflow SCID conversion. + // Manually cap the ending block to avoid this overflow. + let exclusive_end_scid = scid_from_parts(cmp::min(msg.end_blocknum(), 0xffffff), 0, 0); + + // Per spec, we must reply to a query. Send an empty message when things are invalid. + if msg.chain_hash != network_graph.genesis_hash || start_scid.is_err() || exclusive_end_scid.is_err() { + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push(MessageSendEvent::SendReplyChannelRange { + node_id: their_node_id.clone(), + msg: ReplyChannelRange { + chain_hash: msg.chain_hash.clone(), + first_blocknum: msg.first_blocknum, + number_of_blocks: msg.number_of_blocks, + sync_complete: true, + short_channel_ids: vec![], + } + }); + return Ok(()); + } + + // Creates channel batches. We are not checking if the channel is routable + // (has at least one update). A peer may still want to know the channel + // exists even if its not yet routable. + let mut batches: Vec> = vec![Vec::with_capacity(self.max_reply_scids)]; + for (_, ref chan) in network_graph.get_channels().range(start_scid.unwrap()..exclusive_end_scid.unwrap()) { + if let Some(chan_announcement) = &chan.announcement_message { + // Construct a new batch if last one is full + if batches.last().unwrap().len() == batches.last().unwrap().capacity() { + batches.push(Vec::with_capacity(self.max_reply_scids)); + } + + let batch = batches.last_mut().unwrap(); + batch.push(chan_announcement.contents.short_channel_id); + } + } + drop(network_graph); + + let mut pending_events = self.pending_events.lock().unwrap(); + let mut batch_index = 0; + let batch_count = batches.len(); + for batch in batches.into_iter() { + // Per spec, the initial first_blocknum needs to be <= the query's first_blocknum. + // Use the query's values since we don't use pre-processed reply ranges. + let first_blocknum = if batch_index == 0 { + msg.first_blocknum + } + // Subsequent replies must be >= the last sent first_blocknum. Use the first block + // in the new batch. + else { + block_from_scid(batch.first().unwrap()) + }; + + // Per spec, the last end_block needs to be >= the query's end_block. Last + // reply calculates difference between the query's end_blocknum and the start of the reply. + // Overflow safe since end_blocknum=msg.first_block_num+msg.number_of_blocks and first_blocknum + // will be either msg.first_blocknum or a higher block height. + let number_of_blocks = if batch_index == batch_count-1 { + msg.end_blocknum() - first_blocknum + } + // Prior replies should use the number of blocks that fit into the reply. Overflow + // safe since first_blocknum is always <= last SCID's block. + else { + block_from_scid(batch.last().unwrap()) - first_blocknum + 1 + }; + + // Only true for the last message in a sequence + let sync_complete = batch_index == batch_count - 1; + + pending_events.push(MessageSendEvent::SendReplyChannelRange { + node_id: their_node_id.clone(), + msg: ReplyChannelRange { + chain_hash: msg.chain_hash.clone(), + first_blocknum, + number_of_blocks, + sync_complete, + short_channel_ids: batch, + } + }); + + batch_index += 1; + } + + Ok(()) } fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { @@ -2088,18 +2184,351 @@ mod tests { #[test] fn handling_query_channel_range() { - let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler(); - let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); - let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey); + let (secp_ctx, mut net_graph_msg_handler) = create_net_graph_msg_handler(); let chain_hash = genesis_block(Network::Testnet).header.block_hash(); + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let node_1_btckey = &SecretKey::from_slice(&[40; 32]).unwrap(); + let node_2_btckey = &SecretKey::from_slice(&[39; 32]).unwrap(); + let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_privkey); + let node_id_2 = PublicKey::from_secret_key(&secp_ctx, node_2_privkey); + let bitcoin_key_1 = PublicKey::from_secret_key(&secp_ctx, node_1_btckey); + let bitcoin_key_2 = PublicKey::from_secret_key(&secp_ctx, node_2_btckey); + + let scids: Vec = vec![ + 0x000000_000000_0000, // 0x0x0 + 0x000001_000000_0000, // 1x0x0 + 0x000002_000000_0000, // 2x0x0 + 0x000002_000001_0000, // 2x1x0 + 0x000100_000000_0000, // 256x0x0 + 0x000101_000000_0000, // 257x0x0 + 0xfffffe_ffffff_ffff, // max + 0xffffff_ffffff_ffff, // never + ]; + + for scid in scids { + let unsigned_announcement = UnsignedChannelAnnouncement { + features: ChannelFeatures::known(), + chain_hash: chain_hash.clone(), + short_channel_id: scid, + node_id_1, + node_id_2, + bitcoin_key_1, + bitcoin_key_2, + excess_data: Vec::new(), + }; - let result = net_graph_msg_handler.handle_query_channel_range(&node_id, QueryChannelRange { - chain_hash, - first_blocknum: 0, - number_of_blocks: 0xffff_ffff, - }); - assert!(result.is_err()); + let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]); + let valid_announcement = ChannelAnnouncement { + node_signature_1: secp_ctx.sign(&msghash, node_1_privkey), + node_signature_2: secp_ctx.sign(&msghash, node_2_privkey), + bitcoin_signature_1: secp_ctx.sign(&msghash, node_1_btckey), + bitcoin_signature_2: secp_ctx.sign(&msghash, node_2_btckey), + contents: unsigned_announcement.clone(), + }; + match net_graph_msg_handler.handle_channel_announcement(&valid_announcement) { + Ok(_) => (), + _ => panic!() + }; + } + + // Empty reply when number_of_blocks=0 + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0, + number_of_blocks: 0, + }, + vec![ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0, + number_of_blocks: 0, + sync_complete: true, + short_channel_ids: vec![] + }] + ); + + // Empty when wrong chain + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: genesis_block(Network::Bitcoin).header.block_hash(), + first_blocknum: 0, + number_of_blocks: 0xffff_ffff, + }, + vec![ReplyChannelRange { + chain_hash: genesis_block(Network::Bitcoin).header.block_hash(), + first_blocknum: 0, + number_of_blocks: 0xffff_ffff, + sync_complete: true, + short_channel_ids: vec![], + }] + ); + + // Empty reply when first_blocknum > 0xffffff + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0x01000000, + number_of_blocks: 0xffffffff, + }, + vec![ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0x01000000, + number_of_blocks: 0xffffffff, + sync_complete: true, + short_channel_ids: vec![] + }] + ); + + // Empty reply when max valid SCID block num. + // Unlike prior test this is a valid but no results are found + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0xffffff, + number_of_blocks: 1, + }, + vec![ + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0xffffff, + number_of_blocks: 1, + sync_complete: true, + short_channel_ids: vec![] + }, + ] + ); + + // No results in valid query range + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0x00100000, + number_of_blocks: 1000, + }, + vec![ + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0x00100000, + number_of_blocks: 1000, + sync_complete: true, + short_channel_ids: vec![], + } + ] + ); + + // Single reply - all blocks + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0, + number_of_blocks: 0xffffffff, + }, + vec![ + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0, + number_of_blocks: 0xffffffff, + sync_complete: true, + short_channel_ids: vec![ + 0x000000_000000_0000, // 0x0x0 + 0x000001_000000_0000, // 1x0x0 + 0x000002_000000_0000, // 2x0x0 + 0x000002_000001_0000, // 2x1x0 + 0x000100_000000_0000, // 256x0x0 + 0x000101_000000_0000, // 257x0x0 + 0xfffffe_ffffff_ffff, // max + ] + } + ] + ); + + // Single reply - overflow of first_blocknum + number_of_blocks + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 1, + number_of_blocks: 0xffffffff, + }, + vec![ + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 1, + number_of_blocks: 0xfffffffe, + sync_complete: true, + short_channel_ids: vec![ + 0x000001_000000_0000, // 1x0x0 + 0x000002_000000_0000, // 2x0x0 + 0x000002_000001_0000, // 2x1x0 + 0x000100_000000_0000, // 256x0x0 + 0x000101_000000_0000, // 257x0x0 + 0xfffffe_ffffff_ffff, // max + ] + } + ] + ); + + // Single reply - query larger than found results + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 100, + number_of_blocks: 1000, + }, + vec![ + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 100, + number_of_blocks: 1000, + sync_complete: true, + short_channel_ids: vec![ + 0x000100_000000_0000, // 256x0x0 + 0x000101_000000_0000, // 257x0x0 + ] + } + ] + ); + + // Tests below here will chunk replies + net_graph_msg_handler.max_reply_scids = 1; + + // Multipart - new block per messages + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0, + number_of_blocks: 2, + }, + vec![ + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 0, + number_of_blocks: 1, + sync_complete: false, + short_channel_ids: vec![ + 0x000000_000000_0000, // 0x0x0 + ] + }, + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 1, + number_of_blocks: 1, + sync_complete: true, + short_channel_ids: vec![ + 0x000001_000000_0000, // 1x0x0 + ] + }, + ] + ); + + // Multiplart - resumption of same block + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 2, + number_of_blocks: 1, + }, + vec![ + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 2, + number_of_blocks: 1, + sync_complete: false, + short_channel_ids: vec![ + 0x000002_000000_0000, // 2x0x0 + ] + }, + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 2, + number_of_blocks: 1, + sync_complete: true, + short_channel_ids: vec![ + 0x000002_000001_0000, // 2x1x0 + ] + } + ] + ); + + // Multipart - query larger than found results, similar to single reply + test_handling_query_channel_range( + &net_graph_msg_handler, + &node_id_2, + QueryChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 100, + number_of_blocks: 1000, + }, + vec![ + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 100, // <= query first_blocknum + number_of_blocks: 157, + sync_complete: false, + short_channel_ids: vec![ + 0x000100_000000_0000, // 256x0x0 + ] + }, + ReplyChannelRange { + chain_hash: chain_hash.clone(), + first_blocknum: 257, + number_of_blocks: 843, // >= query first_blocknum+number_of_blocks + sync_complete: true, + short_channel_ids: vec![ + 0x000101_000000_0000, // 257x0x0 + ] + } + ] + ); + } + + fn test_handling_query_channel_range( + net_graph_msg_handler: &NetGraphMsgHandler, Arc>, + test_node_id: &PublicKey, + msg: QueryChannelRange, + expected_replies: Vec + ) { + let result = net_graph_msg_handler.handle_query_channel_range(test_node_id, msg); + assert!(result.is_ok()); + + let events = net_graph_msg_handler.get_and_clear_pending_msg_events(); + assert_eq!(events.len(), expected_replies.len()); + + for i in 0..events.len() { + let expected_reply = &expected_replies[i]; + match &events[i] { + MessageSendEvent::SendReplyChannelRange { node_id, msg } => { + assert_eq!(node_id, test_node_id); + assert_eq!(msg.chain_hash, expected_reply.chain_hash); + assert_eq!(msg.first_blocknum, expected_reply.first_blocknum); + assert_eq!(msg.number_of_blocks, expected_reply.number_of_blocks); + assert_eq!(msg.sync_complete, expected_reply.sync_complete); + assert_eq!(msg.short_channel_ids, expected_reply.short_channel_ids); + }, + _ => panic!("expected MessageSendEvent::SendReplyChannelRange"), + } + } } #[test] -- 2.30.2