#[cfg(test)]
mod tests {
+ use super::*;
+
use crate::sign::{NodeSigner, Recipient};
use crate::events;
use crate::io;
use crate::ln::types::ChannelId;
use crate::ln::features::{InitFeatures, NodeFeatures};
use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
- use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER};
use crate::ln::{msgs, wire};
use crate::ln::msgs::{Init, LightningError, SocketAddress};
use crate::util::test_utils;
use bitcoin::Network;
use bitcoin::constants::ChainHash;
- use bitcoin::secp256k1::{PublicKey, SecretKey};
+ use bitcoin::secp256k1::{PublicKey, SecretKey, Secp256k1};
use crate::sync::{Arc, Mutex};
use core::convert::Infallible;
let cfgs = create_peermgr_cfgs(2);
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
+ cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
+ cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
let peers = create_network(2, &cfgs);
// By calling establish_connect, we trigger do_attempt_write_data between
assert_eq!(peer_b.peers.read().unwrap().len(), 0);
}
+ #[test]
+ fn test_gossip_flood_pause() {
+ use crate::routing::test_utils::channel_announcement;
+ use lightning_types::features::ChannelFeatures;
+
+ // Simple test which connects two nodes to a PeerManager and checks that if we run out of
+ // socket buffer space we'll stop forwarding gossip but still push our own gossip.
+ let cfgs = create_peermgr_cfgs(2);
+ let peers = create_network(2, &cfgs);
+ let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
+
+ macro_rules! drain_queues { () => {
+ loop {
+ peers[0].process_events();
+ peers[1].process_events();
+
+ let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
+ if !msg.is_empty() {
+ assert_eq!(peers[1].read_event(&mut fd_b, &msg).unwrap(), false);
+ continue;
+ }
+ let msg = fd_b.outbound_data.lock().unwrap().split_off(0);
+ if !msg.is_empty() {
+ assert_eq!(peers[0].read_event(&mut fd_a, &msg).unwrap(), false);
+ continue;
+ }
+ break;
+ }
+ } }
+
+ // First, make sure all pending messages have been processed and queues drained.
+ drain_queues!();
+
+ let secp_ctx = Secp256k1::new();
+ let key = SecretKey::from_slice(&[1; 32]).unwrap();
+ let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx);
+ let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement {
+ msg,
+ update_msg: None,
+ };
+
+ fd_a.hang_writes.store(true, Ordering::Relaxed);
+
+ // Now push an arbitrarily large number of messages and check that only
+ // `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
+ for _ in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
+ cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone());
+ peers[0].process_events();
+ }
+
+ assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
+ OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
+
+ // Check that if a broadcast message comes in from the channel handler (i.e. it is an
+ // announcement for our own channel), it gets queued anyway.
+ cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_ev);
+ peers[0].process_events();
+ assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
+ OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1);
+
+ // Finally, deliver all the messages and make sure we got the right count. Note that there
+ // was an extra message that had already moved from the broadcast queue to the encrypted
+ // message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
+ fd_a.hang_writes.store(false, Ordering::Relaxed);
+ cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed);
+ peers[0].write_buffer_space_avail(&mut fd_a).unwrap();
+
+ drain_queues!();
+ assert!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty());
+ assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed),
+ OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2);
+ }
+
#[test]
fn test_filter_addresses(){
// Tests the filter_addresses function.
use crate::routing::gossip::NodeId;
-// Using the same keys for LN and BTC ids
-pub(crate) fn add_channel(
- gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
- secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
-) {
- let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
- let node_id_1 = NodeId::from_pubkey(&node_1_pubkey);
+pub(crate) fn channel_announcement(
+ node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures,
+ short_channel_id: u64, secp_ctx: &Secp256k1<All>,
+) -> ChannelAnnouncement {
+ let node_id_1 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_1_privkey));
let node_id_2 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_2_privkey));
let unsigned_announcement = UnsignedChannelAnnouncement {
};
let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
- let valid_announcement = ChannelAnnouncement {
+ ChannelAnnouncement {
node_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
node_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
bitcoin_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
bitcoin_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
contents: unsigned_announcement.clone(),
- };
+ }
+}
+
+// Using the same keys for LN and BTC ids
+pub(crate) fn add_channel(
+ gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
+ secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
+) {
+ let valid_announcement =
+ channel_announcement(node_1_privkey, node_2_privkey, features, short_channel_id, secp_ctx);
+ let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
match gossip_sync.handle_channel_announcement(Some(node_1_pubkey), &valid_announcement) {
Ok(res) => assert!(res),
_ => panic!()
pub(super) fn get_nodes(secp_ctx: &Secp256k1<All>) -> (SecretKey, PublicKey, Vec<SecretKey>, Vec<PublicKey>) {
let privkeys: Vec<SecretKey> = (2..22).map(|i| {
- SecretKey::from_slice(&<Vec<u8>>::from_hex(&format!("{:02x}", i).repeat(32)).unwrap()[..]).unwrap()
+ SecretKey::from_slice(&[i; 32]).unwrap()
}).collect();
let pubkeys = privkeys.iter().map(|secret| PublicKey::from_secret_key(&secp_ctx, secret)).collect();
pub chan_anns_recvd: AtomicUsize,
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
pub request_full_sync: AtomicBool,
+ pub announcement_available_for_sync: AtomicBool,
}
impl TestRoutingMessageHandler {
chan_anns_recvd: AtomicUsize::new(0),
pending_events: Mutex::new(vec![]),
request_full_sync: AtomicBool::new(false),
+ announcement_available_for_sync: AtomicBool::new(false),
}
}
}
impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
fn handle_node_announcement(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
- Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
+ Ok(true)
}
fn handle_channel_announcement(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
- Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
+ Ok(true)
}
fn handle_channel_update(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
- Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
+ Ok(true)
}
fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
- let chan_upd_1 = get_dummy_channel_update(starting_point);
- let chan_upd_2 = get_dummy_channel_update(starting_point);
- let chan_ann = get_dummy_channel_announcement(starting_point);
+ if self.announcement_available_for_sync.load(Ordering::Acquire) {
+ let chan_upd_1 = get_dummy_channel_update(starting_point);
+ let chan_upd_2 = get_dummy_channel_update(starting_point);
+ let chan_ann = get_dummy_channel_announcement(starting_point);
- Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
+ Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
+ } else {
+ None
+ }
}
fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option<msgs::NodeAnnouncement> {