From e483e53c113a37232784a2733c8080a8d30b8575 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 20 Aug 2024 02:22:22 +0000 Subject: [PATCH] Add a test of gossip message buffer limiting in `PeerManager` This adds a simple test that the gossip message buffer in `PeerManager` is limited, including the new behavior of bypassing the limit when the broadcast comes from the `ChannelMessageHandler`. --- lightning/src/ln/peer_handler.rs | 80 ++++++++++++++++++++++++++++- lightning/src/routing/test_utils.rs | 28 ++++++---- lightning/src/util/test_utils.rs | 20 +++++--- 3 files changed, 109 insertions(+), 19 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 4e201e5c2..77570ba5c 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -2725,20 +2725,21 @@ fn is_gossip_msg(type_id: u16) -> bool { #[cfg(test)] mod tests { + use super::*; + use crate::sign::{NodeSigner, Recipient}; use crate::events; use crate::io; use crate::ln::types::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::peer_channel_encryptor::PeerChannelEncryptor; - use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER}; use crate::ln::{msgs, wire}; use crate::ln::msgs::{Init, LightningError, SocketAddress}; use crate::util::test_utils; use bitcoin::Network; use bitcoin::constants::ChainHash; - use bitcoin::secp256k1::{PublicKey, SecretKey}; + use bitcoin::secp256k1::{PublicKey, SecretKey, Secp256k1}; use crate::sync::{Arc, Mutex}; use core::convert::Infallible; @@ -3196,6 +3197,8 @@ mod tests { let cfgs = create_peermgr_cfgs(2); cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release); cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release); + cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release); + cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release); let peers = create_network(2, &cfgs); // By calling establish_connect, we trigger do_attempt_write_data between @@ -3359,6 +3362,79 @@ mod tests { assert_eq!(peer_b.peers.read().unwrap().len(), 0); } + #[test] + fn test_gossip_flood_pause() { + use crate::routing::test_utils::channel_announcement; + use lightning_types::features::ChannelFeatures; + + // Simple test which connects two nodes to a PeerManager and checks that if we run out of + // socket buffer space we'll stop forwarding gossip but still push our own gossip. + let cfgs = create_peermgr_cfgs(2); + let peers = create_network(2, &cfgs); + let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]); + + macro_rules! drain_queues { () => { + loop { + peers[0].process_events(); + peers[1].process_events(); + + let msg = fd_a.outbound_data.lock().unwrap().split_off(0); + if !msg.is_empty() { + assert_eq!(peers[1].read_event(&mut fd_b, &msg).unwrap(), false); + continue; + } + let msg = fd_b.outbound_data.lock().unwrap().split_off(0); + if !msg.is_empty() { + assert_eq!(peers[0].read_event(&mut fd_a, &msg).unwrap(), false); + continue; + } + break; + } + } } + + // First, make sure all pending messages have been processed and queues drained. + drain_queues!(); + + let secp_ctx = Secp256k1::new(); + let key = SecretKey::from_slice(&[1; 32]).unwrap(); + let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx); + let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement { + msg, + update_msg: None, + }; + + fd_a.hang_writes.store(true, Ordering::Relaxed); + + // Now push an arbitrarily large number of messages and check that only + // `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue. + for _ in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 { + cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone()); + peers[0].process_events(); + } + + assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(), + OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP); + + // Check that if a broadcast message comes in from the channel handler (i.e. it is an + // announcement for our own channel), it gets queued anyway. + cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_ev); + peers[0].process_events(); + assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(), + OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1); + + // Finally, deliver all the messages and make sure we got the right count. Note that there + // was an extra message that had already moved from the broadcast queue to the encrypted + // message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages. + fd_a.hang_writes.store(false, Ordering::Relaxed); + cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed); + peers[0].write_buffer_space_avail(&mut fd_a).unwrap(); + + drain_queues!(); + assert!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty()); + assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed), + OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2); + } + #[test] fn test_filter_addresses(){ // Tests the filter_addresses function. diff --git a/lightning/src/routing/test_utils.rs b/lightning/src/routing/test_utils.rs index a64955cd0..d85371479 100644 --- a/lightning/src/routing/test_utils.rs +++ b/lightning/src/routing/test_utils.rs @@ -27,13 +27,11 @@ use crate::sync::{self, Arc}; use crate::routing::gossip::NodeId; -// Using the same keys for LN and BTC ids -pub(crate) fn add_channel( - gossip_sync: &P2PGossipSync>>, Arc, Arc>, - secp_ctx: &Secp256k1, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64 -) { - let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey); - let node_id_1 = NodeId::from_pubkey(&node_1_pubkey); +pub(crate) fn channel_announcement( + node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, + short_channel_id: u64, secp_ctx: &Secp256k1, +) -> ChannelAnnouncement { + let node_id_1 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_1_privkey)); let node_id_2 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_2_privkey)); let unsigned_announcement = UnsignedChannelAnnouncement { @@ -48,13 +46,23 @@ pub(crate) fn add_channel( }; let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]); - let valid_announcement = ChannelAnnouncement { + ChannelAnnouncement { node_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey), node_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey), bitcoin_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey), bitcoin_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey), contents: unsigned_announcement.clone(), - }; + } +} + +// Using the same keys for LN and BTC ids +pub(crate) fn add_channel( + gossip_sync: &P2PGossipSync>>, Arc, Arc>, + secp_ctx: &Secp256k1, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64 +) { + let valid_announcement = + channel_announcement(node_1_privkey, node_2_privkey, features, short_channel_id, secp_ctx); + let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey); match gossip_sync.handle_channel_announcement(Some(node_1_pubkey), &valid_announcement) { Ok(res) => assert!(res), _ => panic!() @@ -108,7 +116,7 @@ pub(crate) fn update_channel( pub(super) fn get_nodes(secp_ctx: &Secp256k1) -> (SecretKey, PublicKey, Vec, Vec) { let privkeys: Vec = (2..22).map(|i| { - SecretKey::from_slice(&>::from_hex(&format!("{:02x}", i).repeat(32)).unwrap()[..]).unwrap() + SecretKey::from_slice(&[i; 32]).unwrap() }).collect(); let pubkeys = privkeys.iter().map(|secret| PublicKey::from_secret_key(&secp_ctx, secret)).collect(); diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 12e027d32..8bcb7bbc7 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -986,6 +986,7 @@ pub struct TestRoutingMessageHandler { pub chan_anns_recvd: AtomicUsize, pub pending_events: Mutex>, pub request_full_sync: AtomicBool, + pub announcement_available_for_sync: AtomicBool, } impl TestRoutingMessageHandler { @@ -995,27 +996,32 @@ impl TestRoutingMessageHandler { chan_anns_recvd: AtomicUsize::new(0), pending_events: Mutex::new(vec![]), request_full_sync: AtomicBool::new(false), + announcement_available_for_sync: AtomicBool::new(false), } } } impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { fn handle_node_announcement(&self, _their_node_id: Option, _msg: &msgs::NodeAnnouncement) -> Result { - Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError }) + Ok(true) } fn handle_channel_announcement(&self, _their_node_id: Option, _msg: &msgs::ChannelAnnouncement) -> Result { self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel); - Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError }) + Ok(true) } fn handle_channel_update(&self, _their_node_id: Option, _msg: &msgs::ChannelUpdate) -> Result { self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel); - Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError }) + Ok(true) } fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option, Option)> { - let chan_upd_1 = get_dummy_channel_update(starting_point); - let chan_upd_2 = get_dummy_channel_update(starting_point); - let chan_ann = get_dummy_channel_announcement(starting_point); + if self.announcement_available_for_sync.load(Ordering::Acquire) { + let chan_upd_1 = get_dummy_channel_update(starting_point); + let chan_upd_2 = get_dummy_channel_update(starting_point); + let chan_ann = get_dummy_channel_announcement(starting_point); - Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2))) + Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2))) + } else { + None + } } fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option { -- 2.39.5