X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=92826244e09ee2b03f674e49c9e229e0a65a20ff;hb=d3ddf15357589ed10f4c844dc22822755b804306;hp=0659412f774190ba7adee5ab599d3c4e0174cbb0;hpb=40626958e451a3d63edbbcf0bc4a5be3f32ff629;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 0659412f..0b752161 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -15,32 +15,45 @@ //! call into the provided message handlers (probably a ChannelManager and P2PGossipSync) with //! messages they should handle, and encoding/sending response messages. +use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; -use crate::sign::{KeysManager, NodeSigner, Recipient}; -use crate::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider}; +use crate::sign::{NodeSigner, Recipient}; +use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use crate::ln::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs; -use crate::ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, OnionMessageHandler, RoutingMessageHandler}; -use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; +use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler}; use crate::util::ser::{VecWriter, Writeable, Writer}; -use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; +use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE}; use crate::ln::wire; -use crate::ln::wire::Encode; -use crate::onion_message::{CustomOnionMessageContents, CustomOnionMessageHandler, SimpleArcOnionMessenger, SimpleRefOnionMessenger}; -use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias}; +use crate::ln::wire::{Encode, Type}; +use crate::onion_message::messenger::{CustomOnionMessageHandler, PendingOnionMessage}; +use crate::onion_message::offers::{OffersMessage, OffersMessageHandler}; +use crate::onion_message::packet::OnionMessageContents; +use crate::routing::gossip::{NodeId, NodeAlias}; use crate::util::atomic_counter::AtomicCounter; -use crate::util::logger::Logger; +use crate::util::logger::{Logger, WithContext}; +use crate::util::string::PrintableString; use crate::prelude::*; use crate::io; -use alloc::collections::LinkedList; -use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock}; -use core::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use alloc::collections::VecDeque; +use crate::sync::{Mutex, MutexGuard, FairRwLock}; +use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; use core::convert::Infallible; -#[cfg(feature = "std")] use std::error; +#[cfg(feature = "std")] +use std::error; +#[cfg(not(c_bindings))] +use { + crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}, + crate::onion_message::messenger::{SimpleArcOnionMessenger, SimpleRefOnionMessenger}, + crate::routing::gossip::{NetworkGraph, P2PGossipSync}, + crate::sign::KeysManager, + crate::sync::Arc, +}; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::sha256::HashEngine as Sha256Engine; @@ -64,11 +77,28 @@ pub trait CustomMessageHandler: wire::CustomMessageReader { /// in the process. Each message is paired with the node id of the intended recipient. If no /// connection to the node exists, then the message is simply not sent. fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>; + + /// Gets the node feature flags which this handler itself supports. All available handlers are + /// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`] + /// which are broadcasted in our [`NodeAnnouncement`] message. + /// + /// [`NodeAnnouncement`]: crate::ln::msgs::NodeAnnouncement + fn provided_node_features(&self) -> NodeFeatures; + + /// Gets the init feature flags which should be sent to the given peer. All available handlers + /// are queried similarly and their feature flags are OR'd together to form the [`InitFeatures`] + /// which are sent in our [`Init`] message. + /// + /// [`Init`]: crate::ln::msgs::Init + fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures; } /// A dummy struct which implements `RoutingMessageHandler` without storing any routing information /// or doing any processing. You can provide one of these as the route_handler in a MessageHandler. pub struct IgnoringMessageHandler{} +impl EventsProvider for IgnoringMessageHandler { + fn process_pending_events(&self, _handler: H) where H::Target: EventHandler {} +} impl MessageSendEventsProvider for IgnoringMessageHandler { fn get_and_clear_pending_msg_events(&self) -> Vec { Vec::new() } } @@ -90,30 +120,35 @@ impl RoutingMessageHandler for IgnoringMessageHandler { } fn processing_queue_high(&self) -> bool { false } } -impl OnionMessageProvider for IgnoringMessageHandler { - fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } -} impl OnionMessageHandler for IgnoringMessageHandler { fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {} + fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn peer_disconnected(&self, _their_node_id: &PublicKey) {} + fn timer_tick_occurred(&self) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } } +impl OffersMessageHandler for IgnoringMessageHandler { + fn handle_message(&self, _msg: OffersMessage) -> Option { None } +} impl CustomOnionMessageHandler for IgnoringMessageHandler { type CustomMessage = Infallible; - fn handle_custom_message(&self, _msg: Infallible) { + fn handle_custom_message(&self, _msg: Infallible) -> Option { // Since we always return `None` in the read the handle method should never be called. unreachable!(); } fn read_custom_message(&self, _msg_type: u64, _buffer: &mut R) -> Result, msgs::DecodeError> where Self: Sized { Ok(None) } + fn release_pending_custom_messages(&self) -> Vec> { + vec![] + } } -impl CustomOnionMessageContents for Infallible { +impl OnionMessageContents for Infallible { fn tlv_type(&self) -> u64 { unreachable!(); } } @@ -149,6 +184,12 @@ impl CustomMessageHandler for IgnoringMessageHandler { } fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() } + + fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } + + fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { + InitFeatures::empty() + } } /// A dummy struct which implements `ChannelMessageHandler` without having any channels. @@ -161,7 +202,7 @@ impl ErroringMessageHandler { pub fn new() -> Self { Self { message_queue: Mutex::new(Vec::new()) } } - fn push_error(&self, node_id: &PublicKey, channel_id: [u8; 32]) { + fn push_error(&self, node_id: &PublicKey, channel_id: ChannelId) { self.message_queue.lock().unwrap().push(MessageSendEvent::HandleError { action: msgs::ErrorAction::SendErrorMessage { msg: msgs::ErrorMessage { channel_id, data: "We do not support channel messages, sorry.".to_owned() }, @@ -181,7 +222,7 @@ impl ChannelMessageHandler for ErroringMessageHandler { // Any messages which are related to a specific channel generate an error message to let the // peer know we don't care about channels. fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) { - ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + ErroringMessageHandler::push_error(self, their_node_id, msg.common_fields.temporary_channel_id); } fn handle_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) { ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); @@ -201,6 +242,18 @@ impl ChannelMessageHandler for ErroringMessageHandler { fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } + fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } @@ -250,11 +303,19 @@ impl ChannelMessageHandler for ErroringMessageHandler { features.set_channel_type_optional(); features.set_scid_privacy_optional(); features.set_zero_conf_optional(); + features.set_route_blinding_optional(); features } + fn get_chain_hashes(&self) -> Option> { + // We don't enforce any chains upon peer connection for `ErroringMessageHandler` and leave it up + // to users of `ErroringMessageHandler` to make decisions on network compatiblility. + // There's not really any way to pull in specific networks here, and hardcoding can cause breakages. + None + } + fn handle_open_channel_v2(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { - ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + ErroringMessageHandler::push_error(self, their_node_id, msg.common_fields.temporary_channel_id); } fn handle_accept_channel_v2(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { @@ -324,7 +385,7 @@ pub struct MessageHandler where /// A message handler which handles onion messages. This should generally be an /// [`OnionMessenger`], but can also be an [`IgnoringMessageHandler`]. /// - /// [`OnionMessenger`]: crate::onion_message::OnionMessenger + /// [`OnionMessenger`]: crate::onion_message::messenger::OnionMessenger pub onion_message_handler: OM, /// A message handler which handles custom messages. The only LDK-provided implementation is @@ -450,15 +511,15 @@ struct Peer { /// handshake and can talk to this peer normally (though use [`Peer::handshake_complete`] to /// check this. their_features: Option, - their_net_address: Option, + their_socket_address: Option, - pending_outbound_buffer: LinkedList>, + pending_outbound_buffer: VecDeque>, pending_outbound_buffer_first_msg_offset: usize, /// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily /// prioritize channel messages over them. /// /// Note that these messages are *not* encrypted/MAC'd, and are only serialized. - gossip_broadcast_buffer: LinkedList>, + gossip_broadcast_buffer: VecDeque, awaiting_write_event: bool, pending_read_buffer: Vec, @@ -574,8 +635,17 @@ impl Peer { /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents /// issues such as overly long function definitions. /// -/// This is not exported to bindings users as `Arc`s don't make sense in bindings. -pub type SimpleArcPeerManager = PeerManager>, Arc>>, Arc, Arc>>, Arc>, Arc, IgnoringMessageHandler, Arc>; +/// This is not exported to bindings users as type aliases aren't supported in most languages. +#[cfg(not(c_bindings))] +pub type SimpleArcPeerManager = PeerManager< + SD, + Arc>, + Arc>>, C, Arc>>, + Arc>, + Arc, + IgnoringMessageHandler, + Arc +>; /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't @@ -584,13 +654,27 @@ pub type SimpleArcPeerManager = PeerManager = PeerManager, &'f P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'i SimpleRefOnionMessenger<'j, 'k, L>, &'f L, IgnoringMessageHandler, &'c KeysManager>; +/// This is not exported to bindings users as type aliases aren't supported in most languages. +#[cfg(not(c_bindings))] +pub type SimpleRefPeerManager< + 'a, 'b, 'c, 'd, 'e, 'f, 'logger, 'h, 'i, 'j, 'graph, 'k, SD, M, T, F, C, L +> = PeerManager< + SD, + &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'graph, 'logger, 'i, M, T, F, L>, + &'f P2PGossipSync<&'graph NetworkGraph<&'logger L>, C, &'logger L>, + &'h SimpleRefOnionMessenger<'a, 'b, 'c, 'd, 'e, 'graph, 'logger, 'i, 'j, 'k, M, T, F, L>, + &'logger L, + IgnoringMessageHandler, + &'c KeysManager +>; /// A generic trait which is implemented for all [`PeerManager`]s. This makes bounding functions or /// structs on any [`PeerManager`] much simpler as only this trait is needed as a bound, rather /// than the full set of bounds on [`PeerManager`] itself. +/// +/// This is not exported to bindings users as general cover traits aren't useful in other +/// languages. #[allow(missing_docs)] pub trait APeerManager { type Descriptor: SocketDescriptor; @@ -608,6 +692,8 @@ pub trait APeerManager { type NS: Deref; /// Gets a reference to the underlying [`PeerManager`]. fn as_ref(&self) -> &PeerManager; + /// Returns the peer manager's [`OnionMessageHandler`]. + fn onion_message_handler(&self) -> &Self::OMT; } impl @@ -633,6 +719,9 @@ APeerManager for PeerManager where type NST = ::Target; type NS = NS; fn as_ref(&self) -> &PeerManager { self } + fn onion_message_handler(&self) -> &Self::OMT { + self.message_handler.onion_message_handler.deref() + } } /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls @@ -676,15 +765,18 @@ pub struct PeerManager>, - /// We can only have one thread processing events at once, but we don't usually need the full - /// `peers` write lock to do so, so instead we block on this empty mutex when entering - /// `process_events`. - event_processing_lock: Mutex<()>, - /// Because event processing is global and always does all available work before returning, - /// there is no reason for us to have many event processors waiting on the lock at once. - /// Instead, we limit the total blocked event processors to always exactly one by setting this - /// when an event process call is waiting. - blocked_event_processors: AtomicBool, + /// We can only have one thread processing events at once, but if a second call to + /// `process_events` happens while a first call is in progress, one of the two calls needs to + /// start from the top to ensure any new messages are also handled. + /// + /// Because the event handler calls into user code which may block, we don't want to block a + /// second thread waiting for another thread to handle events which is then blocked on user + /// code, so we store an atomic counter here: + /// * 0 indicates no event processor is running + /// * 1 indicates an event processor is running + /// * > 1 indicates an event processor is running but needs to start again from the top once + /// it finishes as another thread tried to start processing events but returned early. + event_processing_state: AtomicI32, /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this /// value increases strictly since we don't assume access to a time source. @@ -722,7 +814,7 @@ impl From for MessageHandlingError { macro_rules! encode_msg { ($msg: expr) => {{ - let mut buffer = VecWriter(Vec::new()); + let mut buffer = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE)); wire::write($msg, &mut buffer).unwrap(); buffer.0 }} @@ -798,28 +890,28 @@ impl core::fmt::Display for OptionalFromDebugger<'_> { /// A function used to filter out local or private addresses /// /// -fn filter_addresses(ip_address: Option) -> Option { +fn filter_addresses(ip_address: Option) -> Option { match ip_address{ // For IPv4 range 10.0.0.0 - 10.255.255.255 (10/8) - Some(NetAddress::IPv4{addr: [10, _, _, _], port: _}) => None, + Some(SocketAddress::TcpIpV4{addr: [10, _, _, _], port: _}) => None, // For IPv4 range 0.0.0.0 - 0.255.255.255 (0/8) - Some(NetAddress::IPv4{addr: [0, _, _, _], port: _}) => None, + Some(SocketAddress::TcpIpV4{addr: [0, _, _, _], port: _}) => None, // For IPv4 range 100.64.0.0 - 100.127.255.255 (100.64/10) - Some(NetAddress::IPv4{addr: [100, 64..=127, _, _], port: _}) => None, + Some(SocketAddress::TcpIpV4{addr: [100, 64..=127, _, _], port: _}) => None, // For IPv4 range 127.0.0.0 - 127.255.255.255 (127/8) - Some(NetAddress::IPv4{addr: [127, _, _, _], port: _}) => None, + Some(SocketAddress::TcpIpV4{addr: [127, _, _, _], port: _}) => None, // For IPv4 range 169.254.0.0 - 169.254.255.255 (169.254/16) - Some(NetAddress::IPv4{addr: [169, 254, _, _], port: _}) => None, + Some(SocketAddress::TcpIpV4{addr: [169, 254, _, _], port: _}) => None, // For IPv4 range 172.16.0.0 - 172.31.255.255 (172.16/12) - Some(NetAddress::IPv4{addr: [172, 16..=31, _, _], port: _}) => None, + Some(SocketAddress::TcpIpV4{addr: [172, 16..=31, _, _], port: _}) => None, // For IPv4 range 192.168.0.0 - 192.168.255.255 (192.168/16) - Some(NetAddress::IPv4{addr: [192, 168, _, _], port: _}) => None, + Some(SocketAddress::TcpIpV4{addr: [192, 168, _, _], port: _}) => None, // For IPv4 range 192.88.99.0 - 192.88.99.255 (192.88.99/24) - Some(NetAddress::IPv4{addr: [192, 88, 99, _], port: _}) => None, + Some(SocketAddress::TcpIpV4{addr: [192, 88, 99, _], port: _}) => None, // For IPv6 range 2000:0000:0000:0000:0000:0000:0000:0000 - 3fff:ffff:ffff:ffff:ffff:ffff:ffff:ffff (2000::/3) - Some(NetAddress::IPv6{addr: [0x20..=0x3F, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], port: _}) => ip_address, + Some(SocketAddress::TcpIpV6{addr: [0x20..=0x3F, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], port: _}) => ip_address, // For remaining addresses - Some(NetAddress::IPv6{addr: _, port: _}) => None, + Some(SocketAddress::TcpIpV6{addr: _, port: _}) => None, Some(..) => ip_address, None => None, } @@ -847,15 +939,14 @@ impl Vec<(PublicKey, Option)> { + pub fn get_peer_node_ids(&self) -> Vec<(PublicKey, Option)> { let peers = self.peers.read().unwrap(); peers.values().filter_map(|peer_mutex| { let p = peer_mutex.lock().unwrap(); if !p.handshake_complete() { return None; } - Some((p.their_node_id.unwrap().0, p.their_net_address.clone())) + Some((p.their_node_id.unwrap().0, p.their_socket_address.clone())) }).collect() } @@ -892,7 +983,14 @@ impl InitFeatures { + self.message_handler.chan_handler.provided_init_features(their_node_id) + | self.message_handler.route_handler.provided_init_features(their_node_id) + | self.message_handler.onion_message_handler.provided_init_features(their_node_id) + | self.message_handler.custom_message_handler.provided_init_features(their_node_id) } /// Indicates a new outbound connection has been established to a node with the given `node_id` @@ -910,7 +1008,7 @@ impl) -> Result, PeerHandleError> { + pub fn new_outbound_connection(&self, their_node_id: PublicKey, descriptor: Descriptor, remote_network_address: Option) -> Result, PeerHandleError> { let mut peer_encryptor = PeerChannelEncryptor::new_outbound(their_node_id.clone(), self.get_ephemeral_key()); let res = peer_encryptor.get_act_one(&self.secp_ctx).to_vec(); let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes @@ -926,11 +1024,11 @@ impl) -> Result<(), PeerHandleError> { + pub fn new_inbound_connection(&self, descriptor: Descriptor, remote_network_address: Option) -> Result<(), PeerHandleError> { let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.node_signer); let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes @@ -982,11 +1080,11 @@ impl>(); + let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE; + let lots_of_slack = peer.pending_outbound_buffer.len() + < peer.pending_outbound_buffer.capacity() / 2; + if large_capacity && lots_of_slack { + peer.pending_outbound_buffer.shrink_to_fit(); + } } else { peer.awaiting_write_event = true; } @@ -1165,18 +1270,20 @@ impl(&self, peer: &mut Peer, message: &M) { + let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None); if is_gossip_msg(message.type_id()) { - log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)); + log_gossip!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)); } else { - log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)) + log_trace!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)) } peer.msgs_sent_since_pong += 1; peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message)); } /// Append a message to a peer's pending outbound/write gossip broadcast buffer - fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: Vec) { + fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: MessageBuf) { peer.msgs_sent_since_pong += 1; + debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP); peer.gossip_broadcast_buffer.push_back(encoded_message); } @@ -1196,39 +1303,54 @@ impl { - match $thing { + ($peer: expr, $thing: expr) => {{ + let res = $thing; + let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None); + match res { Ok(x) => x, Err(e) => { match e.action { - msgs::ErrorAction::DisconnectPeer { msg: _ } => { - //TODO: Try to push msg - log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err); + msgs::ErrorAction::DisconnectPeer { .. } => { + // We may have an `ErrorMessage` to send to the peer, + // but writing to the socket while reading can lead to + // re-entrant code and possibly unexpected behavior. The + // message send is optimistic anyway, and in this case + // we immediately disconnect the peer. + log_debug!(logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err); + return Err(PeerHandleError { }); + }, + msgs::ErrorAction::DisconnectPeerWithWarning { .. } => { + // We have a `WarningMessage` to send to the peer, but + // writing to the socket while reading can lead to + // re-entrant code and possibly unexpected behavior. The + // message send is optimistic anyway, and in this case + // we immediately disconnect the peer. + log_debug!(logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err); return Err(PeerHandleError { }); }, msgs::ErrorAction::IgnoreAndLog(level) => { - log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_given_level!(logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); continue }, msgs::ErrorAction::IgnoreDuplicateGossip => continue, // Don't even bother logging these msgs::ErrorAction::IgnoreError => { - log_debug!(self.logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_debug!(logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); continue; }, msgs::ErrorAction::SendErrorMessage { msg } => { - log_debug!(self.logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_debug!(logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer_node_id), e.err); self.enqueue_message($peer, &msg); continue; }, msgs::ErrorAction::SendWarningMessage { msg, log_level } => { - log_given_level!(self.logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_given_level!(logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer_node_id), e.err); self.enqueue_message($peer, &msg); continue; }, } } } - } + }} } let mut peer_lock = peer_mutex.lock().unwrap(); @@ -1253,9 +1375,10 @@ impl { + let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None); match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) { hash_map::Entry::Occupied(e) => { - log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); + log_trace!(logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event // Check that the peers map is consistent with the // node_id_to_descriptor map, as this has been broken @@ -1264,7 +1387,7 @@ impl { - log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0)); + log_debug!(logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0)); entry.insert(peer_descriptor.clone()) }, }; @@ -1290,10 +1413,9 @@ impl= 2); + debug_assert!(peer.pending_read_buffer.len() >= 2 + 16); + try_potential_handleerror!(peer, + peer.channel_encryptor.decrypt_message(&mut peer.pending_read_buffer[..])); + + let mut reader = io::Cursor::new(&peer.pending_read_buffer[..peer.pending_read_buffer.len() - 16]); + let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler); // Reset read buffer if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); } peer.pending_read_buffer.resize(18, 0); peer.pending_read_is_header = true; - let mut reader = io::Cursor::new(&msg_data[..]); - let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler); + let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None); let message = match message_result { Ok(x) => x, Err(e) => { match e { - // Note that to avoid recursion we never call + // Note that to avoid re-entrancy we never call // `do_attempt_write_data` from here, causing // the messages enqueued here to not actually // be sent before the peer is disconnected. (msgs::DecodeError::UnknownRequiredFeature, Some(ty)) if is_gossip_msg(ty) => { - log_gossip!(self.logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!"); + log_gossip!(logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!"); continue; } (msgs::DecodeError::UnsupportedCompression, _) => { - log_gossip!(self.logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message"); - self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: "Unsupported message compression: zlib".to_owned() }); + log_gossip!(logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message"); + self.enqueue_message(peer, &msgs::WarningMessage { channel_id: ChannelId::new_zero(), data: "Unsupported message compression: zlib".to_owned() }); continue; } (_, Some(ty)) if is_gossip_msg(ty) => { - log_gossip!(self.logger, "Got an invalid value while deserializing a gossip message"); + log_gossip!(logger, "Got an invalid value while deserializing a gossip message"); self.enqueue_message(peer, &msgs::WarningMessage { - channel_id: [0; 32], + channel_id: ChannelId::new_zero(), data: format!("Unreadable/bogus gossip message of type {}", ty), }); continue; } - (msgs::DecodeError::UnknownRequiredFeature, ty) => { - log_gossip!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); - self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: format!("Received an unknown required feature/TLV in message type {:?}", ty) }); + (msgs::DecodeError::UnknownRequiredFeature, _) => { + log_debug!(logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); return Err(PeerHandleError { }); } (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }), (msgs::DecodeError::InvalidValue, _) => { - log_debug!(self.logger, "Got an invalid value while deserializing message"); + log_debug!(logger, "Got an invalid value while deserializing message"); return Err(PeerHandleError { }); } (msgs::DecodeError::ShortRead, _) => { - log_debug!(self.logger, "Deserialization failed due to shortness of message"); + log_debug!(logger, "Deserialization failed due to shortness of message"); return Err(PeerHandleError { }); } (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { }), @@ -1419,19 +1541,46 @@ impl::Target as wire::CustomMessageReader>::CustomMessage> ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0; + let logger = WithContext::from(&self.logger, Some(their_node_id), None); peer_lock.received_message_since_timer_tick = true; // Need an Init as first message if let wire::Message::Init(msg) = message { - if msg.features.requires_unknown_bits() { - log_debug!(self.logger, "Peer features required unknown version bits"); + // Check if we have any compatible chains if the `networks` field is specified. + if let Some(networks) = &msg.networks { + if let Some(our_chains) = self.message_handler.chan_handler.get_chain_hashes() { + let mut have_compatible_chains = false; + 'our_chains: for our_chain in our_chains.iter() { + for their_chain in networks { + if our_chain == their_chain { + have_compatible_chains = true; + break 'our_chains; + } + } + } + if !have_compatible_chains { + log_debug!(logger, "Peer does not support any of our supported chains"); + return Err(PeerHandleError { }.into()); + } + } + } + + let our_features = self.init_features(&their_node_id); + if msg.features.requires_unknown_bits_from(&our_features) { + log_debug!(logger, "Peer requires features unknown to us"); return Err(PeerHandleError { }.into()); } + + if our_features.requires_unknown_bits_from(&msg.features) { + log_debug!(logger, "We require features unknown to our peer"); + return Err(PeerHandleError { }.into()); + } + if peer_lock.their_features.is_some() { return Err(PeerHandleError { }.into()); } - log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(their_node_id), msg.features); + log_info!(logger, "Received peer Init message from {}: {}", log_pubkey!(their_node_id), msg.features); // For peers not supporting gossip queries start sync now, otherwise wait until we receive a filter. if msg.features.initial_routing_sync() && !msg.features.supports_gossip_queries() { @@ -1439,27 +1588,27 @@ impl { - let mut data_is_printable = true; - for b in msg.data.bytes() { - if b < 32 || b > 126 { - data_is_printable = false; - break; - } - } - - if data_is_printable { - log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), msg.data); - } else { - log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(their_node_id)); - } + log_debug!(logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); self.message_handler.chan_handler.handle_error(&their_node_id, &msg); - if msg.channel_id == [0; 32] { + if msg.channel_id.is_zero() { return Err(PeerHandleError { }.into()); } }, wire::Message::Warning(msg) => { - let mut data_is_printable = true; - for b in msg.data.bytes() { - if b < 32 || b > 126 { - data_is_printable = false; - break; - } - } - - if data_is_printable { - log_debug!(self.logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), msg.data); - } else { - log_debug!(self.logger, "Got warning message from {} with non-ASCII error message", log_pubkey!(their_node_id)); - } + log_debug!(logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); }, wire::Message::Ping(msg) => { @@ -1562,6 +1687,22 @@ impl { + self.message_handler.chan_handler.handle_stfu(&their_node_id, &msg); + } + + // Splicing messages: + wire::Message::Splice(msg) => { + self.message_handler.chan_handler.handle_splice(&their_node_id, &msg); + } + wire::Message::SpliceAck(msg) => { + self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg); + } + wire::Message::SpliceLocked(msg) => { + self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg); + } + // Interactive transaction construction messages: wire::Message::TxAddInput(msg) => { self.message_handler.chan_handler.handle_tx_add_input(&their_node_id, &msg); @@ -1671,11 +1812,11 @@ impl { - log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id); + log_debug!(logger, "Received unknown even message of type {}, disconnecting peer!", type_id); return Err(PeerHandleError { }.into()); }, wire::Message::Unknown(type_id) => { - log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id); + log_trace!(logger, "Received unknown odd message of type {}, ignoring", type_id); }, wire::Message::Custom(custom) => { self.message_handler.custom_message_handler.handle_custom_message(custom, &their_node_id)?; @@ -1698,8 +1839,9 @@ impl { @@ -1725,8 +1867,9 @@ impl { @@ -1752,14 +1895,15 @@ impl debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), @@ -1784,356 +1928,391 @@ impl { - debug_assert!(val, "compare_exchange failed spuriously?"); - return; - }, - Ok(val) => { - debug_assert!(!val, "compare_exchange succeeded spuriously?"); - // We're the only waiter, as the running process_events may have emptied the - // pending events "long" ago and there are new events for us to process, wait until - // its done and process any leftover events before returning. - _single_processor_lock = Ok(self.event_processing_lock.lock().unwrap()); - self.blocked_event_processors.store(false, Ordering::Release); - } - } + if self.event_processing_state.fetch_add(1, Ordering::AcqRel) > 0 { + // If we're not the first event processor to get here, just return early, the increment + // we just did will be treated as "go around again" at the end. + return; } - self.update_gossip_backlogged(); - let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed); + loop { + self.update_gossip_backlogged(); + let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed); - let mut peers_to_disconnect = HashMap::new(); - let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); - events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); + let mut peers_to_disconnect = HashMap::new(); - { - // TODO: There are some DoS attacks here where you can flood someone's outbound send - // buffer by doing things like announcing channels on another node. We should be willing to - // drop optional-ish messages when send buffers get full! + { + let peers_lock = self.peers.read().unwrap(); - let peers_lock = self.peers.read().unwrap(); - let peers = &*peers_lock; - macro_rules! get_peer_for_forwarding { - ($node_id: expr) => { - { - if peers_to_disconnect.get($node_id).is_some() { - // If we've "disconnected" this peer, do not send to it. - continue; - } - let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); - match descriptor_opt { - Some(descriptor) => match peers.get(&descriptor) { - Some(peer_mutex) => { - let peer_lock = peer_mutex.lock().unwrap(); - if !peer_lock.handshake_complete() { + let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); + events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); + + let peers = &*peers_lock; + macro_rules! get_peer_for_forwarding { + ($node_id: expr) => { + { + if peers_to_disconnect.get($node_id).is_some() { + // If we've "disconnected" this peer, do not send to it. + continue; + } + let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); + match descriptor_opt { + Some(descriptor) => match peers.get(&descriptor) { + Some(peer_mutex) => { + let peer_lock = peer_mutex.lock().unwrap(); + if !peer_lock.handshake_complete() { + continue; + } + peer_lock + }, + None => { + debug_assert!(false, "Inconsistent peers set state!"); continue; } - peer_lock }, None => { - debug_assert!(false, "Inconsistent peers set state!"); continue; - } - }, - None => { - continue; - }, + }, + } } } } - } - for event in events_generated.drain(..) { - match event { - MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id), - log_funding_channel_id!(msg.funding_txid, msg.funding_output_index)); - // TODO: If the peer is gone we should generate a DiscardFunding event - // indicating to the wallet that they should just throw away this funding transaction - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { - log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", - log_pubkey!(node_id), - update_add_htlcs.len(), - update_fulfill_htlcs.len(), - update_fail_htlcs.len(), - log_bytes!(commitment_signed.channel_id)); - let mut peer = get_peer_for_forwarding!(node_id); - for msg in update_add_htlcs { - self.enqueue_message(&mut *peer, msg); - } - for msg in update_fulfill_htlcs { - self.enqueue_message(&mut *peer, msg); - } - for msg in update_fail_htlcs { - self.enqueue_message(&mut *peer, msg); + for event in events_generated.drain(..) { + match event { + MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.temporary_channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.temporary_channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannel event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.common_fields.temporary_channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.common_fields.temporary_channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", + log_pubkey!(node_id), + &msg.temporary_channel_id, + ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index)); + // TODO: If the peer is gone we should generate a DiscardFunding event + // indicating to the wallet that they should just throw away this funding transaction + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendFundingSigned event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReady event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendStfu { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - for msg in update_fail_malformed_htlcs { - self.enqueue_message(&mut *peer, msg); + MessageSendEvent::SendSplice { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendSplice event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - if let &Some(ref msg) = update_fee { - self.enqueue_message(&mut *peer, msg); + MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - self.enqueue_message(&mut *peer, commitment_signed); - }, - MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendShutdown { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { - log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", - log_pubkey!(node_id), - msg.contents.short_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg); - }, - MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { - log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); - match self.message_handler.route_handler.handle_channel_announcement(&msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None), - _ => {}, + MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - if let Some(msg) = update_msg { + MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddInput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxComplete event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxSignatures event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAbort event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id)), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", + log_pubkey!(node_id), + update_add_htlcs.len(), + update_fulfill_htlcs.len(), + update_fail_htlcs.len(), + &commitment_signed.channel_id); + let mut peer = get_peer_for_forwarding!(node_id); + for msg in update_add_htlcs { + self.enqueue_message(&mut *peer, msg); + } + for msg in update_fulfill_htlcs { + self.enqueue_message(&mut *peer, msg); + } + for msg in update_fail_htlcs { + self.enqueue_message(&mut *peer, msg); + } + for msg in update_fail_malformed_htlcs { + self.enqueue_message(&mut *peer, msg); + } + if let &Some(ref msg) = update_fee { + self.enqueue_message(&mut *peer, msg); + } + self.enqueue_message(&mut *peer, commitment_signed); + }, + MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendClosingSigned event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendShutdown { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling Shutdown event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { + log_debug!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", + log_pubkey!(node_id), + msg.contents.short_channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg); + }, + MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { + log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); + match self.message_handler.route_handler.handle_channel_announcement(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None), + _ => {}, + } + if let Some(msg) = update_msg { + match self.message_handler.route_handler.handle_channel_update(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), + _ => {}, + } + } + }, + MessageSendEvent::BroadcastChannelUpdate { msg } => { + log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}", msg.contents); match self.message_handler.route_handler.handle_channel_update(&msg) { Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), _ => {}, } + }, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id); + match self.message_handler.route_handler.handle_node_announcement(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None), + _ => {}, + } + }, + MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { + log_trace!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), msg.contents.short_channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::HandleError { node_id, action } => { + let logger = WithContext::from(&self.logger, Some(node_id), None); + match action { + msgs::ErrorAction::DisconnectPeer { msg } => { + if let Some(msg) = msg.as_ref() { + log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), msg.data); + } else { + log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}", + log_pubkey!(node_id)); + } + // We do not have the peers write lock, so we just store that we're + // about to disconnect the peer and do it after we finish + // processing most messages. + let msg = msg.map(|msg| wire::Message::<<::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); + peers_to_disconnect.insert(node_id, msg); + }, + msgs::ErrorAction::DisconnectPeerWithWarning { msg } => { + log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), msg.data); + // We do not have the peers write lock, so we just store that we're + // about to disconnect the peer and do it after we finish + // processing most messages. + peers_to_disconnect.insert(node_id, Some(wire::Message::Warning(msg))); + }, + msgs::ErrorAction::IgnoreAndLog(level) => { + log_given_level!(logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); + }, + msgs::ErrorAction::IgnoreDuplicateGossip => {}, + msgs::ErrorAction::IgnoreError => { + log_debug!(logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); + }, + msgs::ErrorAction::SendErrorMessage { ref msg } => { + log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), + msg.data); + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); + }, + msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => { + log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), + msg.data); + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); + }, + } + }, + MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - }, - MessageSendEvent::BroadcastChannelUpdate { msg } => { - log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); - match self.message_handler.route_handler.handle_channel_update(&msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), - _ => {}, - } - }, - MessageSendEvent::BroadcastNodeAnnouncement { msg } => { - log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id); - match self.message_handler.route_handler.handle_node_announcement(&msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None), - _ => {}, + MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { + log_gossip!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", + log_pubkey!(node_id), + msg.short_channel_ids.len(), + msg.first_blocknum, + msg.number_of_blocks, + msg.sync_complete); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - }, - MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { - log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), msg.contents.short_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::HandleError { ref node_id, ref action } => { - match *action { - msgs::ErrorAction::DisconnectPeer { ref msg } => { - // We do not have the peers write lock, so we just store that we're - // about to disconenct the peer and do it after we finish - // processing most messages. - peers_to_disconnect.insert(*node_id, msg.clone()); - }, - msgs::ErrorAction::IgnoreAndLog(level) => { - log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); - }, - msgs::ErrorAction::IgnoreDuplicateGossip => {}, - msgs::ErrorAction::IgnoreError => { - log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); - }, - msgs::ErrorAction::SendErrorMessage { ref msg } => { - log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => { - log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, + MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - }, - MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - } - MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { - log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", - log_pubkey!(node_id), - msg.short_channel_ids.len(), - msg.first_blocknum, - msg.number_of_blocks, - msg.sync_complete); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - } - MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } } - } - for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() { - if peers_to_disconnect.get(&node_id).is_some() { continue; } - self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); - } + for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() { + if peers_to_disconnect.get(&node_id).is_some() { continue; } + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); + } - for (descriptor, peer_mutex) in peers.iter() { - let mut peer = peer_mutex.lock().unwrap(); - if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; } - self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled); + for (descriptor, peer_mutex) in peers.iter() { + let mut peer = peer_mutex.lock().unwrap(); + if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; } + self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled); + } } - } - if !peers_to_disconnect.is_empty() { - let mut peers_lock = self.peers.write().unwrap(); - let peers = &mut *peers_lock; - for (node_id, msg) in peers_to_disconnect.drain() { - // Note that since we are holding the peers *write* lock we can - // remove from node_id_to_descriptor immediately (as no other - // thread can be holding the peer lock if we have the global write - // lock). - - let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); - if let Some(mut descriptor) = descriptor_opt { - if let Some(peer_mutex) = peers.remove(&descriptor) { - let mut peer = peer_mutex.lock().unwrap(); - if let Some(msg) = msg { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); - self.enqueue_message(&mut *peer, &msg); - // This isn't guaranteed to work, but if there is enough free - // room in the send buffer, put the error message there... - self.do_attempt_write_data(&mut descriptor, &mut *peer, false); - } - self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError"); - } else { debug_assert!(false, "Missing connection for peer"); } + if !peers_to_disconnect.is_empty() { + let mut peers_lock = self.peers.write().unwrap(); + let peers = &mut *peers_lock; + for (node_id, msg) in peers_to_disconnect.drain() { + // Note that since we are holding the peers *write* lock we can + // remove from node_id_to_descriptor immediately (as no other + // thread can be holding the peer lock if we have the global write + // lock). + + let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); + if let Some(mut descriptor) = descriptor_opt { + if let Some(peer_mutex) = peers.remove(&descriptor) { + let mut peer = peer_mutex.lock().unwrap(); + if let Some(msg) = msg { + self.enqueue_message(&mut *peer, &msg); + // This isn't guaranteed to work, but if there is enough free + // room in the send buffer, put the error message there... + self.do_attempt_write_data(&mut descriptor, &mut *peer, false); + } + self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError"); + } else { debug_assert!(false, "Missing connection for peer"); } + } } } + + if self.event_processing_state.fetch_sub(1, Ordering::AcqRel) != 1 { + // If another thread incremented the state while we were running we should go + // around again, but only once. + self.event_processing_state.store(1, Ordering::Release); + continue; + } + break; } } @@ -2151,7 +2330,7 @@ impl { let peer = peer_lock.lock().unwrap(); if let Some((node_id, _)) = peer.their_node_id { - log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id)); + log_trace!(WithContext::from(&self.logger, Some(node_id), None), "Handling disconnection of peer {}", log_pubkey!(node_id)); let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); debug_assert!(removed.is_some(), "descriptor maps should be consistent"); if !peer.handshake_complete() { return; } @@ -2316,8 +2495,7 @@ impl) { + pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], mut addresses: Vec) { if addresses.len() > 100 { panic!("More than half the message size was taken up by public addresses!"); } @@ -2348,8 +2526,9 @@ impl bool { mod tests { use crate::sign::{NodeSigner, Recipient}; use crate::events; + use crate::io; + use crate::ln::ChannelId; + use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::peer_channel_encryptor::PeerChannelEncryptor; - use crate::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses}; + use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses}; use crate::ln::{msgs, wire}; - use crate::ln::msgs::NetAddress; + use crate::ln::msgs::{LightningError, SocketAddress}; use crate::util::test_utils; - use bitcoin::secp256k1::SecretKey; + use bitcoin::Network; + use bitcoin::blockdata::constants::ChainHash; + use bitcoin::secp256k1::{PublicKey, SecretKey}; use crate::prelude::*; use crate::sync::{Arc, Mutex}; + use core::convert::Infallible; use core::sync::atomic::{AtomicBool, Ordering}; #[derive(Clone)] @@ -2440,19 +2625,51 @@ mod tests { struct PeerManagerCfg { chan_handler: test_utils::TestChannelMessageHandler, routing_handler: test_utils::TestRoutingMessageHandler, + custom_handler: TestCustomMessageHandler, logger: test_utils::TestLogger, node_signer: test_utils::TestNodeSigner, } + struct TestCustomMessageHandler { + features: InitFeatures, + } + + impl wire::CustomMessageReader for TestCustomMessageHandler { + type CustomMessage = Infallible; + fn read(&self, _: u16, _: &mut R) -> Result, msgs::DecodeError> { + Ok(None) + } + } + + impl CustomMessageHandler for TestCustomMessageHandler { + fn handle_custom_message(&self, _: Infallible, _: &PublicKey) -> Result<(), LightningError> { + unreachable!(); + } + + fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() } + + fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } + + fn provided_init_features(&self, _: &PublicKey) -> InitFeatures { + self.features.clone() + } + } + fn create_peermgr_cfgs(peer_count: usize) -> Vec { let mut cfgs = Vec::new(); for i in 0..peer_count { let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); + let features = { + let mut feature_bits = vec![0u8; 33]; + feature_bits[32] = 0b00000001; + InitFeatures::from_le_bytes(feature_bits) + }; cfgs.push( PeerManagerCfg{ - chan_handler: test_utils::TestChannelMessageHandler::new(), + chan_handler: test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)), logger: test_utils::TestLogger::new(), routing_handler: test_utils::TestRoutingMessageHandler::new(), + custom_handler: TestCustomMessageHandler { features }, node_signer: test_utils::TestNodeSigner::new(node_secret), } ); @@ -2461,13 +2678,56 @@ mod tests { cfgs } - fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { + fn create_feature_incompatible_peermgr_cfgs(peer_count: usize) -> Vec { + let mut cfgs = Vec::new(); + for i in 0..peer_count { + let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); + let features = { + let mut feature_bits = vec![0u8; 33 + i + 1]; + feature_bits[33 + i] = 0b00000001; + InitFeatures::from_le_bytes(feature_bits) + }; + cfgs.push( + PeerManagerCfg{ + chan_handler: test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)), + logger: test_utils::TestLogger::new(), + routing_handler: test_utils::TestRoutingMessageHandler::new(), + custom_handler: TestCustomMessageHandler { features }, + node_signer: test_utils::TestNodeSigner::new(node_secret), + } + ); + } + + cfgs + } + + fn create_chain_incompatible_peermgr_cfgs(peer_count: usize) -> Vec { + let mut cfgs = Vec::new(); + for i in 0..peer_count { + let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); + let features = InitFeatures::from_le_bytes(vec![0u8; 33]); + let network = ChainHash::from(&[i as u8; 32]); + cfgs.push( + PeerManagerCfg{ + chan_handler: test_utils::TestChannelMessageHandler::new(network), + logger: test_utils::TestLogger::new(), + routing_handler: test_utils::TestRoutingMessageHandler::new(), + custom_handler: TestCustomMessageHandler { features }, + node_signer: test_utils::TestNodeSigner::new(node_secret), + } + ); + } + + cfgs + } + + fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { let mut peers = Vec::new(); for i in 0..peer_count { let ephemeral_bytes = [i as u8; 32]; let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler, - onion_message_handler: IgnoringMessageHandler {}, custom_message_handler: IgnoringMessageHandler {} + onion_message_handler: IgnoringMessageHandler {}, custom_message_handler: &cfgs[i].custom_handler }; let peer = PeerManager::new(msg_handler, 0, &ephemeral_bytes, &cfgs[i].logger, &cfgs[i].node_signer); peers.push(peer); @@ -2476,19 +2736,19 @@ mod tests { peers } - fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), disconnect: Arc::new(AtomicBool::new(false)), }; - let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000}; + let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000}; let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap(); let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), disconnect: Arc::new(AtomicBool::new(false)), }; - let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001}; + let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001}; let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false); @@ -2533,12 +2793,12 @@ mod tests { fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())), disconnect: Arc::new(AtomicBool::new(false)), }; - let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000}; + let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000}; let mut fd_b = FileDescriptor { fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())), disconnect: Arc::new(AtomicBool::new(false)), }; - let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001}; + let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001}; let initial_data = peers[1].new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); peers[0].new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); if peers[0].read_event(&mut fd_a, &initial_data).is_err() { break; } @@ -2558,16 +2818,16 @@ mod tests { .push(crate::events::MessageSendEvent::SendShutdown { node_id: peers[1].node_signer.get_node_id(Recipient::Node).unwrap(), msg: msgs::Shutdown { - channel_id: [0; 32], - scriptpubkey: bitcoin::Script::new(), + channel_id: ChannelId::new_zero(), + scriptpubkey: bitcoin::ScriptBuf::new(), }, }); cfgs[1].chan_handler.pending_events.lock().unwrap() .push(crate::events::MessageSendEvent::SendShutdown { node_id: peers[0].node_signer.get_node_id(Recipient::Node).unwrap(), msg: msgs::Shutdown { - channel_id: [0; 32], - scriptpubkey: bitcoin::Script::new(), + channel_id: ChannelId::new_zero(), + scriptpubkey: bitcoin::ScriptBuf::new(), }, }); @@ -2591,6 +2851,78 @@ mod tests { thrd_b.join().unwrap(); } + #[test] + fn test_feature_incompatible_peers() { + let cfgs = create_peermgr_cfgs(2); + let incompatible_cfgs = create_feature_incompatible_peermgr_cfgs(2); + + let peers = create_network(2, &cfgs); + let incompatible_peers = create_network(2, &incompatible_cfgs); + let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])]; + for (peer_a, peer_b) in peer_pairs.iter() { + let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000}; + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001}; + let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); + peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); + assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false); + peer_a.process_events(); + + let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); + assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false); + + peer_b.process_events(); + let b_data = fd_b.outbound_data.lock().unwrap().split_off(0); + + // Should fail because of unknown required features + assert!(peer_a.read_event(&mut fd_a, &b_data).is_err()); + } + } + + #[test] + fn test_chain_incompatible_peers() { + let cfgs = create_peermgr_cfgs(2); + let incompatible_cfgs = create_chain_incompatible_peermgr_cfgs(2); + + let peers = create_network(2, &cfgs); + let incompatible_peers = create_network(2, &incompatible_cfgs); + let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])]; + for (peer_a, peer_b) in peer_pairs.iter() { + let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000}; + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001}; + let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); + peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); + assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false); + peer_a.process_events(); + + let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); + assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false); + + peer_b.process_events(); + let b_data = fd_b.outbound_data.lock().unwrap().split_off(0); + + // Should fail because of incompatible chains + assert!(peer_a.read_event(&mut fd_a, &b_data).is_err()); + } + } + #[test] fn test_disconnect_peer() { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and @@ -2615,15 +2947,15 @@ mod tests { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and // push a message from one peer to another. let cfgs = create_peermgr_cfgs(2); - let a_chan_handler = test_utils::TestChannelMessageHandler::new(); - let b_chan_handler = test_utils::TestChannelMessageHandler::new(); + let a_chan_handler = test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)); + let b_chan_handler = test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)); let mut peers = create_network(2, &cfgs); let (fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]); assert_eq!(peers[0].peers.read().unwrap().len(), 1); let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap(); - let msg = msgs::Shutdown { channel_id: [42; 32], scriptpubkey: bitcoin::Script::new() }; + let msg = msgs::Shutdown { channel_id: ChannelId::from_bytes([42; 32]), scriptpubkey: bitcoin::ScriptBuf::new() }; a_chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::SendShutdown { node_id: their_id, msg: msg.clone() }); @@ -2651,7 +2983,7 @@ mod tests { fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())), disconnect: Arc::new(AtomicBool::new(false)), }; - let addr_dup = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1003}; + let addr_dup = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1003}; let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap(); peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap(); @@ -2789,94 +3121,143 @@ mod tests { // Tests the filter_addresses function. // For (10/8) - let ip_address = NetAddress::IPv4{addr: [10, 0, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [10, 0, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [10, 0, 255, 201], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [10, 0, 255, 201], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [10, 255, 255, 255], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [10, 255, 255, 255], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); // For (0/8) - let ip_address = NetAddress::IPv4{addr: [0, 0, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [0, 0, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [0, 0, 255, 187], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [0, 0, 255, 187], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [0, 255, 255, 255], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [0, 255, 255, 255], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); // For (100.64/10) - let ip_address = NetAddress::IPv4{addr: [100, 64, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [100, 64, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [100, 78, 255, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [100, 78, 255, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [100, 127, 255, 255], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [100, 127, 255, 255], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); // For (127/8) - let ip_address = NetAddress::IPv4{addr: [127, 0, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [127, 0, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [127, 65, 73, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [127, 65, 73, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [127, 255, 255, 255], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [127, 255, 255, 255], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); // For (169.254/16) - let ip_address = NetAddress::IPv4{addr: [169, 254, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [169, 254, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [169, 254, 221, 101], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [169, 254, 221, 101], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [169, 254, 255, 255], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [169, 254, 255, 255], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); // For (172.16/12) - let ip_address = NetAddress::IPv4{addr: [172, 16, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [172, 16, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [172, 27, 101, 23], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [172, 27, 101, 23], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [172, 31, 255, 255], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [172, 31, 255, 255], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); // For (192.168/16) - let ip_address = NetAddress::IPv4{addr: [192, 168, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [192, 168, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [192, 168, 205, 159], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [192, 168, 205, 159], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [192, 168, 255, 255], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [192, 168, 255, 255], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); // For (192.88.99/24) - let ip_address = NetAddress::IPv4{addr: [192, 88, 99, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [192, 88, 99, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [192, 88, 99, 140], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [192, 88, 99, 140], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv4{addr: [192, 88, 99, 255], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [192, 88, 99, 255], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); // For other IPv4 addresses - let ip_address = NetAddress::IPv4{addr: [188, 255, 99, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [188, 255, 99, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone())); - let ip_address = NetAddress::IPv4{addr: [123, 8, 129, 14], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [123, 8, 129, 14], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone())); - let ip_address = NetAddress::IPv4{addr: [2, 88, 9, 255], port: 1000}; + let ip_address = SocketAddress::TcpIpV4{addr: [2, 88, 9, 255], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone())); // For (2000::/3) - let ip_address = NetAddress::IPv6{addr: [32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV6{addr: [32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone())); - let ip_address = NetAddress::IPv6{addr: [45, 34, 209, 190, 0, 123, 55, 34, 0, 0, 3, 27, 201, 0, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV6{addr: [45, 34, 209, 190, 0, 123, 55, 34, 0, 0, 3, 27, 201, 0, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone())); - let ip_address = NetAddress::IPv6{addr: [63, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255], port: 1000}; + let ip_address = SocketAddress::TcpIpV6{addr: [63, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), Some(ip_address.clone())); // For other IPv6 addresses - let ip_address = NetAddress::IPv6{addr: [24, 240, 12, 32, 0, 0, 0, 0, 20, 97, 0, 32, 121, 254, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV6{addr: [24, 240, 12, 32, 0, 0, 0, 0, 20, 97, 0, 32, 121, 254, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv6{addr: [68, 23, 56, 63, 0, 0, 2, 7, 75, 109, 0, 39, 0, 0, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV6{addr: [68, 23, 56, 63, 0, 0, 2, 7, 75, 109, 0, 39, 0, 0, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); - let ip_address = NetAddress::IPv6{addr: [101, 38, 140, 230, 100, 0, 30, 98, 0, 26, 0, 0, 57, 96, 0, 0], port: 1000}; + let ip_address = SocketAddress::TcpIpV6{addr: [101, 38, 140, 230, 100, 0, 30, 98, 0, 26, 0, 0, 57, 96, 0, 0], port: 1000}; assert_eq!(filter_addresses(Some(ip_address.clone())), None); // For (None) assert_eq!(filter_addresses(None), None); } + + #[test] + #[cfg(feature = "std")] + fn test_process_events_multithreaded() { + use std::time::{Duration, Instant}; + // Test that `process_events` getting called on multiple threads doesn't generate too many + // loop iterations. + // Each time `process_events` goes around the loop we call + // `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`. + // Because the loop should go around once more after a call which fails to take the + // single-threaded lock, if we write zero to the counter before calling `process_events` we + // should never observe there having been more than 2 loop iterations. + // Further, because the last thread to exit will call `process_events` before returning, we + // should always have at least one count at the end. + let cfg = Arc::new(create_peermgr_cfgs(1)); + // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }. + let peer = Arc::new(create_network(1, unsafe { &*(&*cfg as *const _) as &'static _ }).pop().unwrap()); + + let exit_flag = Arc::new(AtomicBool::new(false)); + macro_rules! spawn_thread { () => { { + let thread_cfg = Arc::clone(&cfg); + let thread_peer = Arc::clone(&peer); + let thread_exit = Arc::clone(&exit_flag); + std::thread::spawn(move || { + while !thread_exit.load(Ordering::Acquire) { + thread_cfg[0].chan_handler.message_fetch_counter.store(0, Ordering::Release); + thread_peer.process_events(); + std::thread::sleep(Duration::from_micros(1)); + } + }) + } } } + + let thread_a = spawn_thread!(); + let thread_b = spawn_thread!(); + let thread_c = spawn_thread!(); + + let start_time = Instant::now(); + while start_time.elapsed() < Duration::from_millis(100) { + let val = cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire); + assert!(val <= 2); + std::thread::yield_now(); // Winblowz seemingly doesn't ever interrupt threads?! + } + + exit_flag.store(true, Ordering::Release); + thread_a.join().unwrap(); + thread_b.join().unwrap(); + thread_c.join().unwrap(); + assert!(cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire) >= 1); + } }