]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Add a test of gossip message buffer limiting in `PeerManager` 2024-06-robust-updates
authorMatt Corallo <git@bluematt.me>
Tue, 20 Aug 2024 02:22:22 +0000 (02:22 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 1 Oct 2024 16:49:38 +0000 (16:49 +0000)
This adds a simple test that the gossip message buffer in
`PeerManager` is limited, including the new behavior of bypassing
the limit when the broadcast comes from the
`ChannelMessageHandler`.

lightning/src/ln/peer_handler.rs
lightning/src/routing/test_utils.rs
lightning/src/util/test_utils.rs

index 4e201e5c2bc5fac4578d5005e7e3900dca444dbb..77570ba5c0feb3b7ddcef5cc27754302d14d1c8e 100644 (file)
@@ -2725,20 +2725,21 @@ fn is_gossip_msg(type_id: u16) -> bool {
 
 #[cfg(test)]
 mod tests {
+       use super::*;
+
        use crate::sign::{NodeSigner, Recipient};
        use crate::events;
        use crate::io;
        use crate::ln::types::ChannelId;
        use crate::ln::features::{InitFeatures, NodeFeatures};
        use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
-       use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER};
        use crate::ln::{msgs, wire};
        use crate::ln::msgs::{Init, LightningError, SocketAddress};
        use crate::util::test_utils;
 
        use bitcoin::Network;
        use bitcoin::constants::ChainHash;
-       use bitcoin::secp256k1::{PublicKey, SecretKey};
+       use bitcoin::secp256k1::{PublicKey, SecretKey, Secp256k1};
 
        use crate::sync::{Arc, Mutex};
        use core::convert::Infallible;
@@ -3196,6 +3197,8 @@ mod tests {
                let cfgs = create_peermgr_cfgs(2);
                cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
                cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
+               cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
+               cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
                let peers = create_network(2, &cfgs);
 
                // By calling establish_connect, we trigger do_attempt_write_data between
@@ -3359,6 +3362,79 @@ mod tests {
                assert_eq!(peer_b.peers.read().unwrap().len(), 0);
        }
 
+       #[test]
+       fn test_gossip_flood_pause() {
+               use crate::routing::test_utils::channel_announcement;
+               use lightning_types::features::ChannelFeatures;
+
+               // Simple test which connects two nodes to a PeerManager and checks that if we run out of
+               // socket buffer space we'll stop forwarding gossip but still push our own gossip.
+               let cfgs = create_peermgr_cfgs(2);
+               let peers = create_network(2, &cfgs);
+               let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
+
+               macro_rules! drain_queues { () => {
+                       loop {
+                               peers[0].process_events();
+                               peers[1].process_events();
+
+                               let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
+                               if !msg.is_empty() {
+                                       assert_eq!(peers[1].read_event(&mut fd_b, &msg).unwrap(), false);
+                                       continue;
+                               }
+                               let msg = fd_b.outbound_data.lock().unwrap().split_off(0);
+                               if !msg.is_empty() {
+                                       assert_eq!(peers[0].read_event(&mut fd_a, &msg).unwrap(), false);
+                                       continue;
+                               }
+                               break;
+                       }
+               } }
+
+               // First, make sure all pending messages have been processed and queues drained.
+               drain_queues!();
+
+               let secp_ctx = Secp256k1::new();
+               let key = SecretKey::from_slice(&[1; 32]).unwrap();
+               let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx);
+               let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement {
+                       msg,
+                       update_msg: None,
+               };
+
+               fd_a.hang_writes.store(true, Ordering::Relaxed);
+
+               // Now push an arbitrarily large number of messages and check that only
+               // `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
+               for _ in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
+                       cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone());
+                       peers[0].process_events();
+               }
+
+               assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
+                       OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
+
+               // Check that if a broadcast message comes in from the channel handler (i.e. it is an
+               // announcement for our own channel), it gets queued anyway.
+               cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_ev);
+               peers[0].process_events();
+               assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
+                       OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1);
+
+               // Finally, deliver all the messages and make sure we got the right count. Note that there
+               // was an extra message that had already moved from the broadcast queue to the encrypted
+               // message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
+               fd_a.hang_writes.store(false, Ordering::Relaxed);
+               cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed);
+               peers[0].write_buffer_space_avail(&mut fd_a).unwrap();
+
+               drain_queues!();
+               assert!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty());
+               assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed),
+                       OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2);
+       }
+
        #[test]
        fn test_filter_addresses(){
                // Tests the filter_addresses function.
index a64955cd0156b5bd2d0a6ff64738438a1663bb7f..d85371479fe5c6f47d6e91500736cb2e1d30b00b 100644 (file)
@@ -27,13 +27,11 @@ use crate::sync::{self, Arc};
 
 use crate::routing::gossip::NodeId;
 
-// Using the same keys for LN and BTC ids
-pub(crate) fn add_channel(
-       gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
-       secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
-) {
-       let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
-       let node_id_1 = NodeId::from_pubkey(&node_1_pubkey);
+pub(crate) fn channel_announcement(
+       node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures,
+       short_channel_id: u64, secp_ctx: &Secp256k1<All>,
+) -> ChannelAnnouncement {
+       let node_id_1 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_1_privkey));
        let node_id_2 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_2_privkey));
 
        let unsigned_announcement = UnsignedChannelAnnouncement {
@@ -48,13 +46,23 @@ pub(crate) fn add_channel(
        };
 
        let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
-       let valid_announcement = ChannelAnnouncement {
+       ChannelAnnouncement {
                node_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
                node_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
                bitcoin_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
                bitcoin_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
                contents: unsigned_announcement.clone(),
-       };
+       }
+}
+
+// Using the same keys for LN and BTC ids
+pub(crate) fn add_channel(
+       gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
+       secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
+) {
+       let valid_announcement =
+               channel_announcement(node_1_privkey, node_2_privkey, features, short_channel_id, secp_ctx);
+       let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
        match gossip_sync.handle_channel_announcement(Some(node_1_pubkey), &valid_announcement) {
                Ok(res) => assert!(res),
                _ => panic!()
@@ -108,7 +116,7 @@ pub(crate) fn update_channel(
 
 pub(super) fn get_nodes(secp_ctx: &Secp256k1<All>) -> (SecretKey, PublicKey, Vec<SecretKey>, Vec<PublicKey>) {
        let privkeys: Vec<SecretKey> = (2..22).map(|i| {
-               SecretKey::from_slice(&<Vec<u8>>::from_hex(&format!("{:02x}", i).repeat(32)).unwrap()[..]).unwrap()
+               SecretKey::from_slice(&[i; 32]).unwrap()
        }).collect();
 
        let pubkeys = privkeys.iter().map(|secret| PublicKey::from_secret_key(&secp_ctx, secret)).collect();
index 12e027d32fca0b4e6b457e650b5ce4549b6445a9..8bcb7bbc7fc62ee958c92d3f6a52c36feb8088bd 100644 (file)
@@ -986,6 +986,7 @@ pub struct TestRoutingMessageHandler {
        pub chan_anns_recvd: AtomicUsize,
        pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
        pub request_full_sync: AtomicBool,
+       pub announcement_available_for_sync: AtomicBool,
 }
 
 impl TestRoutingMessageHandler {
@@ -995,27 +996,32 @@ impl TestRoutingMessageHandler {
                        chan_anns_recvd: AtomicUsize::new(0),
                        pending_events: Mutex::new(vec![]),
                        request_full_sync: AtomicBool::new(false),
+                       announcement_available_for_sync: AtomicBool::new(false),
                }
        }
 }
 impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
        fn handle_node_announcement(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
-               Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
+               Ok(true)
        }
        fn handle_channel_announcement(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
                self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
-               Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
+               Ok(true)
        }
        fn handle_channel_update(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
                self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
-               Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
+               Ok(true)
        }
        fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
-               let chan_upd_1 = get_dummy_channel_update(starting_point);
-               let chan_upd_2 = get_dummy_channel_update(starting_point);
-               let chan_ann = get_dummy_channel_announcement(starting_point);
+               if self.announcement_available_for_sync.load(Ordering::Acquire) {
+                       let chan_upd_1 = get_dummy_channel_update(starting_point);
+                       let chan_upd_2 = get_dummy_channel_update(starting_point);
+                       let chan_ann = get_dummy_channel_announcement(starting_point);
 
-               Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
+                       Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
+               } else {
+                       None
+               }
        }
 
        fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option<msgs::NodeAnnouncement> {