X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=1a39bbb3ae408e7047159e6d9f3f63267188b61f;hb=8866ed35330bae1af2237c1951d9c4025938aa65;hp=d38afcbacb304620851d92de9eb54f4de813e5c0;hpb=6b1f867eaaaf9b5fd4317aefa52ab183b7e5f980;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index d38afcba..1a39bbb3 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -15,49 +15,71 @@ //! call into the provided message handlers (probably a ChannelManager and P2PGossipSync) with //! messages they should handle, and encoding/sending response messages. +use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; -use ln::features::{InitFeatures, NodeFeatures}; -use ln::msgs; -use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, OnionMessageHandler, RoutingMessageHandler}; -use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; -use util::ser::{VecWriter, Writeable, Writer}; -use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; -use ln::wire; -use ln::wire::Encode; -use onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger}; -use routing::gossip::{NetworkGraph, P2PGossipSync}; -use util::atomic_counter::AtomicCounter; -use util::crypto::sign; -use util::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider}; -use util::logger::Logger; - -use prelude::*; -use io; +use crate::sign::{KeysManager, NodeSigner, Recipient}; +use crate::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider}; +use crate::ln::features::{InitFeatures, NodeFeatures}; +use crate::ln::msgs; +use crate::ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, OnionMessageHandler, RoutingMessageHandler}; +use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; +use crate::util::ser::{VecWriter, Writeable, Writer}; +use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; +use crate::ln::wire; +use crate::ln::wire::{Encode, Type}; +use crate::onion_message::{CustomOnionMessageContents, CustomOnionMessageHandler, OffersMessage, OffersMessageHandler, SimpleArcOnionMessenger, SimpleRefOnionMessenger}; +use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias}; +use crate::util::atomic_counter::AtomicCounter; +use crate::util::logger::Logger; +use crate::util::string::PrintableString; + +use crate::prelude::*; +use crate::io; use alloc::collections::LinkedList; -use sync::{Arc, Mutex, MutexGuard, FairRwLock}; -use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock}; +use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; use core::convert::Infallible; #[cfg(feature = "std")] use std::error; use bitcoin::hashes::sha256::Hash as Sha256; -use bitcoin::hashes::sha256d::Hash as Sha256dHash; use bitcoin::hashes::sha256::HashEngine as Sha256Engine; use bitcoin::hashes::{HashEngine, Hash}; -/// Handler for BOLT1-compliant messages. +/// A handler provided to [`PeerManager`] for reading and handling custom messages. +/// +/// [BOLT 1] specifies a custom message type range for use with experimental or application-specific +/// messages. `CustomMessageHandler` allows for user-defined handling of such types. See the +/// [`lightning_custom_message`] crate for tools useful in composing more than one custom handler. +/// +/// [BOLT 1]: https://github.com/lightning/bolts/blob/master/01-messaging.md +/// [`lightning_custom_message`]: https://docs.rs/lightning_custom_message/latest/lightning_custom_message pub trait CustomMessageHandler: wire::CustomMessageReader { - /// Called with the message type that was received and the buffer to be read. - /// Can return a `MessageHandlingError` if the message could not be handled. + /// Handles the given message sent from `sender_node_id`, possibly producing messages for + /// [`CustomMessageHandler::get_and_clear_pending_msg`] to return and thus for [`PeerManager`] + /// to send. fn handle_custom_message(&self, msg: Self::CustomMessage, sender_node_id: &PublicKey) -> Result<(), LightningError>; - /// Gets the list of pending messages which were generated by the custom message - /// handler, clearing the list in the process. The first tuple element must - /// correspond to the intended recipients node ids. If no connection to one of the - /// specified node does not exist, the message is simply not sent to it. + /// Returns the list of pending messages that were generated by the handler, clearing the list + /// in the process. Each message is paired with the node id of the intended recipient. If no + /// connection to the node exists, then the message is simply not sent. fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>; + + /// Gets the node feature flags which this handler itself supports. All available handlers are + /// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`] + /// which are broadcasted in our [`NodeAnnouncement`] message. + /// + /// [`NodeAnnouncement`]: crate::ln::msgs::NodeAnnouncement + fn provided_node_features(&self) -> NodeFeatures; + + /// Gets the init feature flags which should be sent to the given peer. All available handlers + /// are queried similarly and their feature flags are OR'd together to form the [`InitFeatures`] + /// which are sent in our [`Init`] message. + /// + /// [`Init`]: crate::ln::msgs::Init + fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures; } /// A dummy struct which implements `RoutingMessageHandler` without storing any routing information @@ -72,8 +94,8 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result { Ok(false) } fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option, Option)> { None } - fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option { None } - fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) } + fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option { None } + fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) } @@ -82,19 +104,38 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } + fn processing_queue_high(&self) -> bool { false } } impl OnionMessageProvider for IgnoringMessageHandler { fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } } impl OnionMessageHandler for IgnoringMessageHandler { fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {} - fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) } - fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {} + fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } + fn peer_disconnected(&self, _their_node_id: &PublicKey) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } } +impl OffersMessageHandler for IgnoringMessageHandler { + fn handle_message(&self, _msg: OffersMessage) -> Option { None } +} +impl CustomOnionMessageHandler for IgnoringMessageHandler { + type CustomMessage = Infallible; + fn handle_custom_message(&self, _msg: Infallible) -> Option { + // Since we always return `None` in the read the handle method should never be called. + unreachable!(); + } + fn read_custom_message(&self, _msg_type: u64, _buffer: &mut R) -> Result, msgs::DecodeError> where Self: Sized { + Ok(None) + } +} + +impl CustomOnionMessageContents for Infallible { + fn tlv_type(&self) -> u64 { unreachable!(); } +} + impl Deref for IgnoringMessageHandler { type Target = IgnoringMessageHandler; fn deref(&self) -> &Self { self } @@ -127,6 +168,12 @@ impl CustomMessageHandler for IgnoringMessageHandler { } fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() } + + fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } + + fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { + InitFeatures::empty() + } } /// A dummy struct which implements `ChannelMessageHandler` without having any channels. @@ -158,10 +205,10 @@ impl MessageSendEventsProvider for ErroringMessageHandler { impl ChannelMessageHandler for ErroringMessageHandler { // Any messages which are related to a specific channel generate an error message to let the // peer know we don't care about channels. - fn handle_open_channel(&self, their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::OpenChannel) { + fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) { ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); } - fn handle_accept_channel(&self, their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::AcceptChannel) { + fn handle_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) { ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); } fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) { @@ -173,7 +220,7 @@ impl ChannelMessageHandler for ErroringMessageHandler { fn handle_channel_ready(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReady) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } - fn handle_shutdown(&self, their_node_id: &PublicKey, _their_features: &InitFeatures, msg: &msgs::Shutdown) { + fn handle_shutdown(&self, their_node_id: &PublicKey, msg: &msgs::Shutdown) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { @@ -208,8 +255,8 @@ impl ChannelMessageHandler for ErroringMessageHandler { } // msgs::ChannelUpdate does not contain the channel_id field, so we just drop them. fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {} - fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {} - fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) } + fn peer_disconnected(&self, _their_node_id: &PublicKey) {} + fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { @@ -230,17 +277,70 @@ impl ChannelMessageHandler for ErroringMessageHandler { features.set_zero_conf_optional(); features } + + fn get_genesis_hashes(&self) -> Option> { + // We don't enforce any chains upon peer connection for `ErroringMessageHandler` and leave it up + // to users of `ErroringMessageHandler` to make decisions on network compatiblility. + // There's not really any way to pull in specific networks here, and hardcoding can cause breakages. + None + } + + fn handle_open_channel_v2(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { + ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + } + + fn handle_accept_channel_v2(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { + ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + } + + fn handle_tx_add_input(&self, their_node_id: &PublicKey, msg: &msgs::TxAddInput) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + + fn handle_tx_add_output(&self, their_node_id: &PublicKey, msg: &msgs::TxAddOutput) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + + fn handle_tx_remove_input(&self, their_node_id: &PublicKey, msg: &msgs::TxRemoveInput) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + + fn handle_tx_remove_output(&self, their_node_id: &PublicKey, msg: &msgs::TxRemoveOutput) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + + fn handle_tx_complete(&self, their_node_id: &PublicKey, msg: &msgs::TxComplete) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + + fn handle_tx_signatures(&self, their_node_id: &PublicKey, msg: &msgs::TxSignatures) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + + fn handle_tx_init_rbf(&self, their_node_id: &PublicKey, msg: &msgs::TxInitRbf) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + + fn handle_tx_ack_rbf(&self, their_node_id: &PublicKey, msg: &msgs::TxAckRbf) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + + fn handle_tx_abort(&self, their_node_id: &PublicKey, msg: &msgs::TxAbort) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } } + impl Deref for ErroringMessageHandler { type Target = ErroringMessageHandler; fn deref(&self) -> &Self { self } } /// Provides references to trait impls which handle different types of messages. -pub struct MessageHandler where - CM::Target: ChannelMessageHandler, - RM::Target: RoutingMessageHandler, - OM::Target: OnionMessageHandler, +pub struct MessageHandler where + CM::Target: ChannelMessageHandler, + RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, + CustomM::Target: CustomMessageHandler, { /// A message handler which handles messages specific to channels. Usually this is just a /// [`ChannelManager`] object or an [`ErroringMessageHandler`]. @@ -253,16 +353,22 @@ pub struct MessageHandler where /// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync pub route_handler: RM, - /// A message handler which handles onion messages. For now, this can only be an - /// [`IgnoringMessageHandler`]. + /// A message handler which handles onion messages. This should generally be an + /// [`OnionMessenger`], but can also be an [`IgnoringMessageHandler`]. + /// + /// [`OnionMessenger`]: crate::onion_message::OnionMessenger pub onion_message_handler: OM, + + /// A message handler which handles custom messages. The only LDK-provided implementation is + /// [`IgnoringMessageHandler`]. + pub custom_message_handler: CustomM, } /// Provides an object which can be used to send data to and which uniquely identifies a connection /// to a remote host. You will need to be able to generate multiple of these which meet Eq and /// implement Hash to meet the PeerManager API. /// -/// For efficiency, Clone should be relatively cheap for this type. +/// For efficiency, [`Clone`] should be relatively cheap for this type. /// /// Two descriptors may compare equal (by [`cmp::Eq`] and [`hash::Hash`]) as long as the original /// has been disconnected, the [`PeerManager`] has been informed of the disconnection (either by it @@ -300,16 +406,7 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone { /// generate no further read_event/write_buffer_space_avail/socket_disconnected calls for the /// descriptor. #[derive(Clone)] -pub struct PeerHandleError { - /// Used to indicate that we probably can't make any future connections to this peer (e.g. - /// because we required features that our peer was missing, or vice versa). - /// - /// While LDK's [`ChannelManager`] will not do it automatically, you likely wish to force-close - /// any channels with this peer or check for new versions of LDK. - /// - /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager - pub no_connection_possible: bool, -} +pub struct PeerHandleError { } impl fmt::Debug for PeerHandleError { fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> { formatter.write_str("Peer Sent Invalid Data") @@ -331,7 +428,7 @@ impl error::Error for PeerHandleError { enum InitSyncTracker{ NoSyncRequested, ChannelsSyncing(u64), - NodesSyncing(PublicKey), + NodesSyncing(NodeId), } /// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop @@ -375,7 +472,15 @@ const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; struct Peer { channel_encryptor: PeerChannelEncryptor, - their_node_id: Option, + /// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip + /// messages in `PeerManager`. Use `Peer::set_their_node_id` to modify this field. + their_node_id: Option<(PublicKey, NodeId)>, + /// The features provided in the peer's [`msgs::Init`] message. + /// + /// This is set only after we've processed the [`msgs::Init`] message and called relevant + /// `peer_connected` handler methods. Thus, this field is set *iff* we've finished our + /// handshake and can talk to this peer normally (though use [`Peer::handshake_complete`] to + /// check this. their_features: Option, their_net_address: Option, @@ -395,12 +500,27 @@ struct Peer { sync_status: InitSyncTracker, msgs_sent_since_pong: usize, - awaiting_pong_timer_tick_intervals: i8, + awaiting_pong_timer_tick_intervals: i64, received_message_since_timer_tick: bool, sent_gossip_timestamp_filter: bool, + + /// Indicates we've received a `channel_announcement` since the last time we had + /// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a + /// `channel_announcement` at all - we set this unconditionally but unset it every time we + /// check if we're gossip-processing-backlogged). + received_channel_announce_since_backlogged: bool, + + inbound_connection: bool, } impl Peer { + /// True after we've processed the [`msgs::Init`] message and called relevant `peer_connected` + /// handler methods. Thus, this implies we've finished our handshake and can talk to this peer + /// normally. + fn handshake_complete(&self) -> bool { + self.their_features.is_some() + } + /// Returns true if the channel announcements/updates for the given channel should be /// forwarded to this peer. /// If we are sending our routing table to this peer and we have not yet sent channel @@ -408,6 +528,7 @@ impl Peer { /// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already /// sent the old versions, we should send the update, and so return true here. fn should_forward_channel_announcement(&self, channel_id: u64) -> bool { + if !self.handshake_complete() { return false; } if self.their_features.as_ref().unwrap().supports_gossip_queries() && !self.sent_gossip_timestamp_filter { return false; @@ -420,7 +541,8 @@ impl Peer { } /// Similar to the above, but for node announcements indexed by node_id. - fn should_forward_node_announcement(&self, node_id: PublicKey) -> bool { + fn should_forward_node_announcement(&self, node_id: NodeId) -> bool { + if !self.handshake_complete() { return false; } if self.their_features.as_ref().unwrap().supports_gossip_queries() && !self.sent_gossip_timestamp_filter { return false; @@ -428,14 +550,18 @@ impl Peer { match self.sync_status { InitSyncTracker::NoSyncRequested => true, InitSyncTracker::ChannelsSyncing(_) => false, - InitSyncTracker::NodesSyncing(pk) => pk < node_id, + InitSyncTracker::NodesSyncing(sync_node_id) => sync_node_id.as_slice() < node_id.as_slice(), } } /// Returns whether we should be reading bytes from this peer, based on whether its outbound /// buffer still has space and we don't need to pause reads to get some writes out. - fn should_read(&self) -> bool { - self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE + fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool { + if !gossip_processing_backlogged { + self.received_channel_announce_since_backlogged = false; + } + self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && + (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged) } /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's @@ -443,19 +569,20 @@ impl Peer { fn should_buffer_gossip_backfill(&self) -> bool { self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty() && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK + && self.handshake_complete() } /// Determines if we should push an onion message onto a peer's outbound buffer. This is checked /// every time the peer's buffer may have been drained. fn should_buffer_onion_message(&self) -> bool { - self.pending_outbound_buffer.is_empty() + self.pending_outbound_buffer.is_empty() && self.handshake_complete() && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK } /// Determines if we should push additional gossip broadcast messages onto a peer's outbound /// buffer. This is checked every time the peer's buffer may have been drained. fn should_buffer_gossip_broadcast(&self) -> bool { - self.pending_outbound_buffer.is_empty() + self.pending_outbound_buffer.is_empty() && self.handshake_complete() && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK } @@ -467,6 +594,10 @@ impl Peer { total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO } + + fn set_their_node_id(&mut self, node_id: PublicKey) { + self.their_node_id = Some((node_id, NodeId::from_pubkey(&node_id))); + } } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -475,8 +606,16 @@ impl Peer { /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents /// issues such as overly long function definitions. /// -/// (C-not exported) as `Arc`s don't make sense in bindings. -pub type SimpleArcPeerManager = PeerManager>, Arc>>, Arc, Arc>>, Arc>, Arc, IgnoringMessageHandler>; +/// This is not exported to bindings users as `Arc`s don't make sense in bindings. +pub type SimpleArcPeerManager = PeerManager< + SD, + Arc>, + Arc>>, Arc, Arc>>, + Arc>, + Arc, + IgnoringMessageHandler, + Arc +>; /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't @@ -485,8 +624,69 @@ pub type SimpleArcPeerManager = PeerManager = PeerManager, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'i SimpleRefOnionMessenger<'j, 'k, L>, &'f L, IgnoringMessageHandler>; +/// This is not exported to bindings users as general type aliases don't make sense in bindings. +pub type SimpleRefPeerManager< + 'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j, 'k, 'l, 'm, 'n, SD, M, T, F, C, L +> = PeerManager< + SD, + &'n SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'm, M, T, F, L>, + &'f P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, + &'i SimpleRefOnionMessenger<'g, 'm, 'n, L>, + &'f L, + IgnoringMessageHandler, + &'c KeysManager +>; + + +/// A generic trait which is implemented for all [`PeerManager`]s. This makes bounding functions or +/// structs on any [`PeerManager`] much simpler as only this trait is needed as a bound, rather +/// than the full set of bounds on [`PeerManager`] itself. +/// +/// This is not exported to bindings users as general cover traits aren't useful in other +/// languages. +#[allow(missing_docs)] +pub trait APeerManager { + type Descriptor: SocketDescriptor; + type CMT: ChannelMessageHandler + ?Sized; + type CM: Deref; + type RMT: RoutingMessageHandler + ?Sized; + type RM: Deref; + type OMT: OnionMessageHandler + ?Sized; + type OM: Deref; + type LT: Logger + ?Sized; + type L: Deref; + type CMHT: CustomMessageHandler + ?Sized; + type CMH: Deref; + type NST: NodeSigner + ?Sized; + type NS: Deref; + /// Gets a reference to the underlying [`PeerManager`]. + fn as_ref(&self) -> &PeerManager; +} + +impl +APeerManager for PeerManager where + CM::Target: ChannelMessageHandler, + RM::Target: RoutingMessageHandler, + OM::Target: OnionMessageHandler, + L::Target: Logger, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner, +{ + type Descriptor = Descriptor; + type CMT = ::Target; + type CM = CM; + type RMT = ::Target; + type RM = RM; + type OMT = ::Target; + type OM = OM; + type LT = ::Target; + type L = L; + type CMHT = ::Target; + type CMH = CMH; + type NST = ::Target; + type NS = NS; + fn as_ref(&self) -> &PeerManager { self } +} /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls /// socket events into messages which it passes on to its [`MessageHandler`]. @@ -500,20 +700,21 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j, 'k, SD, M, /// [`PeerManager`] functions related to the same connection must occur only in serial, making new /// calls only after previous ones have returned. /// -/// Rather than using a plain PeerManager, it is preferable to use either a SimpleArcPeerManager -/// a SimpleRefPeerManager, for conciseness. See their documentation for more details, but -/// essentially you should default to using a SimpleRefPeerManager, and use a -/// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when +/// Rather than using a plain [`PeerManager`], it is preferable to use either a [`SimpleArcPeerManager`] +/// a [`SimpleRefPeerManager`], for conciseness. See their documentation for more details, but +/// essentially you should default to using a [`SimpleRefPeerManager`], and use a +/// [`SimpleArcPeerManager`] when you require a `PeerManager` with a static lifetime, such as when /// you're using lightning-net-tokio. /// /// [`read_event`]: PeerManager::read_event -pub struct PeerManager where +pub struct PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, OM::Target: OnionMessageHandler, L::Target: Logger, - CMH::Target: CustomMessageHandler { - message_handler: MessageHandler, + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner { + message_handler: MessageHandler, /// Connection state for each connected peer - we have an outer read-write lock which is taken /// as read while we're doing processing for a peer and taken write when a peer is being added /// or removed. @@ -528,26 +729,32 @@ pub struct PeerManager>, - /// We can only have one thread processing events at once, but we don't usually need the full - /// `peers` write lock to do so, so instead we block on this empty mutex when entering - /// `process_events`. - event_processing_lock: Mutex<()>, - /// Because event processing is global and always does all available work before returning, - /// there is no reason for us to have many event processors waiting on the lock at once. - /// Instead, we limit the total blocked event processors to always exactly one by setting this - /// when an event process call is waiting. - blocked_event_processors: AtomicBool, + /// We can only have one thread processing events at once, but if a second call to + /// `process_events` happens while a first call is in progress, one of the two calls needs to + /// start from the top to ensure any new messages are also handled. + /// + /// Because the event handler calls into user code which may block, we don't want to block a + /// second thread waiting for another thread to handle events which is then blocked on user + /// code, so we store an atomic counter here: + /// * 0 indicates no event processor is running + /// * 1 indicates an event processor is running + /// * > 1 indicates an event processor is running but needs to start again from the top once + /// it finishes as another thread tried to start processing events but returned early. + event_processing_state: AtomicI32, /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this /// value increases strictly since we don't assume access to a time source. - last_node_announcement_serial: AtomicU64, + last_node_announcement_serial: AtomicU32, - our_node_secret: SecretKey, ephemeral_key_midstate: Sha256Engine, - custom_message_handler: CMH, peer_counter: AtomicCounter, + gossip_processing_backlogged: AtomicBool, + gossip_processing_backlog_lifted: AtomicBool, + + node_signer: NS, + logger: L, secp_ctx: Secp256k1 } @@ -577,15 +784,16 @@ macro_rules! encode_msg { }} } -impl PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, OM::Target: OnionMessageHandler, - L::Target: Logger { + L::Target: Logger, + NS::Target: NodeSigner { /// Constructs a new `PeerManager` with the given `ChannelMessageHandler` and /// `OnionMessageHandler`. No routing message handler is used and network graph messages are /// ignored. /// - /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be + /// `ephemeral_random_data` is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. /// /// `current_time` is used as an always-increasing counter that survives across restarts and is @@ -593,19 +801,21 @@ impl PeerManager Self { + /// This is not exported to bindings users as we can't export a PeerManager with a dummy route handler + pub fn new_channel_only(channel_message_handler: CM, onion_message_handler: OM, current_time: u32, ephemeral_random_data: &[u8; 32], logger: L, node_signer: NS) -> Self { Self::new(MessageHandler { chan_handler: channel_message_handler, route_handler: IgnoringMessageHandler{}, onion_message_handler, - }, our_node_secret, current_time, ephemeral_random_data, logger, IgnoringMessageHandler{}) + custom_message_handler: IgnoringMessageHandler{}, + }, current_time, ephemeral_random_data, logger, node_signer) } } -impl PeerManager where +impl PeerManager where RM::Target: RoutingMessageHandler, - L::Target: Logger { + L::Target: Logger, + NS::Target: NodeSigner { /// Constructs a new `PeerManager` with the given `RoutingMessageHandler`. No channel message /// handler or onion message handler is used and onion and channel messages will be ignored (or /// generate error messages). Note that some other lightning implementations time-out connections @@ -616,33 +826,34 @@ impl PeerManager Self { + /// This is not exported to bindings users as we can't export a PeerManager with a dummy channel handler + pub fn new_routing_only(routing_message_handler: RM, current_time: u32, ephemeral_random_data: &[u8; 32], logger: L, node_signer: NS) -> Self { Self::new(MessageHandler { chan_handler: ErroringMessageHandler::new(), route_handler: routing_message_handler, onion_message_handler: IgnoringMessageHandler{}, - }, our_node_secret, current_time, ephemeral_random_data, logger, IgnoringMessageHandler{}) + custom_message_handler: IgnoringMessageHandler{}, + }, current_time, ephemeral_random_data, logger, node_signer) } } -/// A simple wrapper that optionally prints " from " for an optional pubkey. +/// A simple wrapper that optionally prints ` from ` for an optional pubkey. /// This works around `format!()` taking a reference to each argument, preventing /// `if let Some(node_id) = peer.their_node_id { format!(.., node_id) } else { .. }` from compiling /// due to lifetime errors. -struct OptionalFromDebugger<'a>(&'a Option); +struct OptionalFromDebugger<'a>(&'a Option<(PublicKey, NodeId)>); impl core::fmt::Display for OptionalFromDebugger<'_> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> { - if let Some(node_id) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) } + if let Some((node_id, _)) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) } } } /// A function used to filter out local or private addresses -/// https://www.iana.org./assignments/ipv4-address-space/ipv4-address-space.xhtml -/// https://www.iana.org/assignments/ipv6-address-space/ipv6-address-space.xhtml +/// +/// fn filter_addresses(ip_address: Option) -> Option { match ip_address{ // For IPv4 range 10.0.0.0 - 10.255.255.255 (10/8) @@ -670,21 +881,24 @@ fn filter_addresses(ip_address: Option) -> Option { } } -impl PeerManager where +impl PeerManager where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler, OM::Target: OnionMessageHandler, L::Target: Logger, - CMH::Target: CustomMessageHandler { - /// Constructs a new PeerManager with the given message handlers and node_id secret key - /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be + CMH::Target: CustomMessageHandler, + NS::Target: NodeSigner +{ + /// Constructs a new `PeerManager` with the given message handlers. + /// + /// `ephemeral_random_data` is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. /// /// `current_time` is used as an always-increasing counter that survives across restarts and is /// incremented irregularly internally. In general it is best to simply use the current UNIX /// timestamp, however if it is not available a persistent counter that increases once per /// minute should suffice. - pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, current_time: u64, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self { + pub fn new(message_handler: MessageHandler, current_time: u32, ephemeral_random_data: &[u8; 32], logger: L, node_signer: NS) -> Self { let mut ephemeral_key_midstate = Sha256::engine(); ephemeral_key_midstate.input(ephemeral_random_data); @@ -696,31 +910,36 @@ impl Vec { + /// The returned `Option`s will only be `Some` if an address had been previously given via + /// [`Self::new_outbound_connection`] or [`Self::new_inbound_connection`]. + pub fn get_peer_node_ids(&self) -> Vec<(PublicKey, Option)> { let peers = self.peers.read().unwrap(); peers.values().filter_map(|peer_mutex| { let p = peer_mutex.lock().unwrap(); - if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() { + if !p.handshake_complete() { return None; } - p.their_node_id + Some((p.their_node_id.unwrap().0, p.their_net_address.clone())) }).collect() } @@ -731,7 +950,14 @@ impl InitFeatures { + self.message_handler.chan_handler.provided_init_features(their_node_id) + | self.message_handler.route_handler.provided_init_features(their_node_id) + | self.message_handler.onion_message_handler.provided_init_features(their_node_id) + | self.message_handler.custom_message_handler.provided_init_features(their_node_id) + } + + /// Indicates a new outbound connection has been established to a node with the given `node_id` /// and an optional remote network address. /// /// The remote network address adds the option to report a remote IP address back to a connecting @@ -743,40 +969,49 @@ impl) -> Result, PeerHandleError> { let mut peer_encryptor = PeerChannelEncryptor::new_outbound(their_node_id.clone(), self.get_ephemeral_key()); let res = peer_encryptor.get_act_one(&self.secp_ctx).to_vec(); let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes let mut peers = self.peers.write().unwrap(); - if peers.insert(descriptor, Mutex::new(Peer { - channel_encryptor: peer_encryptor, - their_node_id: None, - their_features: None, - their_net_address: remote_network_address, - - pending_outbound_buffer: LinkedList::new(), - pending_outbound_buffer_first_msg_offset: 0, - gossip_broadcast_buffer: LinkedList::new(), - awaiting_write_event: false, - - pending_read_buffer, - pending_read_buffer_pos: 0, - pending_read_is_header: false, - - sync_status: InitSyncTracker::NoSyncRequested, - - msgs_sent_since_pong: 0, - awaiting_pong_timer_tick_intervals: 0, - received_message_since_timer_tick: false, - sent_gossip_timestamp_filter: false, - })).is_some() { - panic!("PeerManager driver duplicated descriptors!"); - }; - Ok(res) + match peers.entry(descriptor) { + hash_map::Entry::Occupied(_) => { + debug_assert!(false, "PeerManager driver duplicated descriptors!"); + Err(PeerHandleError {}) + }, + hash_map::Entry::Vacant(e) => { + e.insert(Mutex::new(Peer { + channel_encryptor: peer_encryptor, + their_node_id: None, + their_features: None, + their_net_address: remote_network_address, + + pending_outbound_buffer: LinkedList::new(), + pending_outbound_buffer_first_msg_offset: 0, + gossip_broadcast_buffer: LinkedList::new(), + awaiting_write_event: false, + + pending_read_buffer, + pending_read_buffer_pos: 0, + pending_read_is_header: false, + + sync_status: InitSyncTracker::NoSyncRequested, + + msgs_sent_since_pong: 0, + awaiting_pong_timer_tick_intervals: 0, + received_message_since_timer_tick: false, + sent_gossip_timestamp_filter: false, + + received_channel_announce_since_backlogged: false, + inbound_connection: false, + })); + Ok(res) + } + } } /// Indicates a new inbound connection has been established to a node with an optional remote @@ -791,45 +1026,67 @@ impl) -> Result<(), PeerHandleError> { - let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.our_node_secret, &self.secp_ctx); + let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.node_signer); let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes let mut peers = self.peers.write().unwrap(); - if peers.insert(descriptor, Mutex::new(Peer { - channel_encryptor: peer_encryptor, - their_node_id: None, - their_features: None, - their_net_address: remote_network_address, - - pending_outbound_buffer: LinkedList::new(), - pending_outbound_buffer_first_msg_offset: 0, - gossip_broadcast_buffer: LinkedList::new(), - awaiting_write_event: false, - - pending_read_buffer, - pending_read_buffer_pos: 0, - pending_read_is_header: false, - - sync_status: InitSyncTracker::NoSyncRequested, - - msgs_sent_since_pong: 0, - awaiting_pong_timer_tick_intervals: 0, - received_message_since_timer_tick: false, - sent_gossip_timestamp_filter: false, - })).is_some() { - panic!("PeerManager driver duplicated descriptors!"); - }; - Ok(()) + match peers.entry(descriptor) { + hash_map::Entry::Occupied(_) => { + debug_assert!(false, "PeerManager driver duplicated descriptors!"); + Err(PeerHandleError {}) + }, + hash_map::Entry::Vacant(e) => { + e.insert(Mutex::new(Peer { + channel_encryptor: peer_encryptor, + their_node_id: None, + their_features: None, + their_net_address: remote_network_address, + + pending_outbound_buffer: LinkedList::new(), + pending_outbound_buffer_first_msg_offset: 0, + gossip_broadcast_buffer: LinkedList::new(), + awaiting_write_event: false, + + pending_read_buffer, + pending_read_buffer_pos: 0, + pending_read_is_header: false, + + sync_status: InitSyncTracker::NoSyncRequested, + + msgs_sent_since_pong: 0, + awaiting_pong_timer_tick_intervals: 0, + received_message_since_timer_tick: false, + sent_gossip_timestamp_filter: false, + + received_channel_announce_since_backlogged: false, + inbound_connection: true, + })); + Ok(()) + } + } + } + + fn peer_should_read(&self, peer: &mut Peer) -> bool { + peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed)) + } + + fn update_gossip_backlogged(&self) { + let new_state = self.message_handler.route_handler.processing_queue_high(); + let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed); + if prev_state && !new_state { + self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed); + } } - fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { + fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool) { + let mut have_written = false; while !peer.awaiting_write_event { if peer.should_buffer_onion_message() { - if let Some(peer_node_id) = peer.their_node_id { + if let Some((peer_node_id, _)) = peer.their_node_id { if let Some(next_onion_message) = self.message_handler.onion_message_handler.next_onion_message_for_peer(peer_node_id) { self.enqueue_message(peer, &next_onion_message); @@ -869,8 +1126,8 @@ impl unreachable!(), - InitSyncTracker::NodesSyncing(key) => { - if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&key)) { + InitSyncTracker::NodesSyncing(sync_node_id) => { + if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&sync_node_id)) { self.enqueue_message(peer, &msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); } else { @@ -883,13 +1140,23 @@ impl return, + None => { + if force_one_write && !have_written { + if should_read { + let data_sent = descriptor.send_data(&[], should_read); + debug_assert_eq!(data_sent, 0, "Can't write more than no data"); + } + } + return + }, Some(buff) => buff, }; let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; - let data_sent = descriptor.send_data(pending, peer.should_read()); + let data_sent = descriptor.send_data(pending, should_read); + have_written = true; peer.pending_outbound_buffer_first_msg_offset += data_sent; if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { peer.pending_outbound_buffer_first_msg_offset = 0; @@ -907,7 +1174,7 @@ impl { let mut peer = peer_mutex.lock().unwrap(); peer.awaiting_write_event = false; - self.do_attempt_write_data(descriptor, &mut peer); + self.do_attempt_write_data(descriptor, &mut peer, false); } }; Ok(()) @@ -942,14 +1209,17 @@ impl Result { match self.do_read_event(peer_descriptor, data) { Ok(res) => Ok(res), Err(e) => { - log_trace!(self.logger, "Peer sent invalid data or we decided to disconnect due to a protocol error"); - self.disconnect_event_internal(peer_descriptor, e.no_connection_possible); + log_trace!(self.logger, "Disconnecting peer due to a protocol error (usually a duplicate connection)."); + self.disconnect_event_internal(peer_descriptor); Err(e) } } @@ -958,9 +1228,9 @@ impl(&self, peer: &mut Peer, message: &M) { if is_gossip_msg(message.type_id()) { - log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap())); + log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)); } else { - log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap())) + log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)) } peer.msgs_sent_since_pong += 1; peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message)); @@ -982,7 +1252,7 @@ impl { let mut read_pos = 0; @@ -993,10 +1263,23 @@ impl x, Err(e) => { match e.action { - msgs::ErrorAction::DisconnectPeer { msg: _ } => { - //TODO: Try to push msg + msgs::ErrorAction::DisconnectPeer { .. } => { + // We may have an `ErrorMessage` to send to the peer, + // but writing to the socket while reading can lead to + // re-entrant code and possibly unexpected behavior. The + // message send is optimistic anyway, and in this case + // we immediately disconnect the peer. + log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err); + return Err(PeerHandleError { }); + }, + msgs::ErrorAction::DisconnectPeerWithWarning { .. } => { + // We have a `WarningMessage` to send to the peer, but + // writing to the socket while reading can lead to + // re-entrant code and possibly unexpected behavior. The + // message send is optimistic anyway, and in this case + // we immediately disconnect the peer. log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err); - return Err(PeerHandleError{ no_connection_possible: false }); + return Err(PeerHandleError { }); }, msgs::ErrorAction::IgnoreAndLog(level) => { log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); @@ -1045,14 +1328,18 @@ impl { - match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap()) { - hash_map::Entry::Occupied(_) => { - log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap())); + match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) { + hash_map::Entry::Occupied(e) => { + log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event - return Err(PeerHandleError{ no_connection_possible: false }) + // Check that the peers map is consistent with the + // node_id_to_descriptor map, as this has been broken + // before. + debug_assert!(peers.get(e.get()).is_some()); + return Err(PeerHandleError { }) }, hash_map::Entry::Vacant(entry) => { - log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap())); + log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0)); entry.insert(peer_descriptor.clone()) }, }; @@ -1064,24 +1351,23 @@ impl { let act_two = try_potential_handleerror!(peer, peer.channel_encryptor .process_act_one_with_keys(&peer.pending_read_buffer[..], - &self.our_node_secret, self.get_ephemeral_key(), &self.secp_ctx)).to_vec(); + &self.node_signer, self.get_ephemeral_key(), &self.secp_ctx)).to_vec(); peer.pending_outbound_buffer.push_back(act_two); peer.pending_read_buffer = [0; 66].to_vec(); // act three is 66 bytes long }, NextNoiseStep::ActTwo => { let (act_three, their_node_id) = try_potential_handleerror!(peer, peer.channel_encryptor.process_act_two(&peer.pending_read_buffer[..], - &self.our_node_secret, &self.secp_ctx)); + &self.node_signer)); peer.pending_outbound_buffer.push_back(act_three.to_vec()); peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes peer.pending_read_is_header = true; - peer.their_node_id = Some(their_node_id); + peer.set_their_node_id(their_node_id); insert_node_id!(); - let features = self.message_handler.chan_handler.provided_init_features(&their_node_id) - .or(self.message_handler.route_handler.provided_init_features(&their_node_id)) - .or(self.message_handler.onion_message_handler.provided_init_features(&their_node_id)); - let resp = msgs::Init { features, remote_network_address: filter_addresses(peer.their_net_address.clone()) }; + let features = self.init_features(&their_node_id); + let networks = self.message_handler.chan_handler.get_genesis_hashes(); + let resp = msgs::Init { features, networks, remote_network_address: filter_addresses(peer.their_net_address.clone()) }; self.enqueue_message(peer, &resp); peer.awaiting_pong_timer_tick_intervals = 0; }, @@ -1090,12 +1376,11 @@ impl 8192 { peer.pending_read_buffer = Vec::new(); } peer.pending_read_buffer.resize(msg_len as usize + 16, 0); if msg_len < 2 { // Need at least the message type tag - return Err(PeerHandleError{ no_connection_possible: false }); + return Err(PeerHandleError { }); } peer.pending_read_is_header = false; } else { @@ -1120,12 +1405,12 @@ impl x, Err(e) => { match e { - // Note that to avoid recursion we never call + // Note that to avoid re-entrancy we never call // `do_attempt_write_data` from here, causing // the messages enqueued here to not actually // be sent before the peer is disconnected. @@ -1146,22 +1431,21 @@ impl { - log_gossip!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); - self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: format!("Received an unknown required feature/TLV in message type {:?}", ty) }); - return Err(PeerHandleError { no_connection_possible: false }); + (msgs::DecodeError::UnknownRequiredFeature, _) => { + log_debug!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); + return Err(PeerHandleError { }); } - (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { no_connection_possible: false }), + (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }), (msgs::DecodeError::InvalidValue, _) => { log_debug!(self.logger, "Got an invalid value while deserializing message"); - return Err(PeerHandleError { no_connection_possible: false }); + return Err(PeerHandleError { }); } (msgs::DecodeError::ShortRead, _) => { log_debug!(self.logger, "Deserialization failed due to shortness of message"); - return Err(PeerHandleError { no_connection_possible: false }); + return Err(PeerHandleError { }); } - (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { no_connection_possible: false }), - (msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { no_connection_possible: false }), + (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { }), + (msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { }), } } }; @@ -1171,7 +1455,7 @@ impl, message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { - let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages"); + let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0; peer_lock.received_message_since_timer_tick = true; // Need an Init as first message if let wire::Message::Init(msg) = message { - if msg.features.requires_unknown_bits() { - log_debug!(self.logger, "Peer features required unknown version bits"); - return Err(PeerHandleError{ no_connection_possible: true }.into()); + // Check if we have any compatible chains if the `networks` field is specified. + if let Some(networks) = &msg.networks { + if let Some(our_chains) = self.message_handler.chan_handler.get_genesis_hashes() { + let mut have_compatible_chains = false; + 'our_chains: for our_chain in our_chains.iter() { + for their_chain in networks { + if our_chain == their_chain { + have_compatible_chains = true; + break 'our_chains; + } + } + } + if !have_compatible_chains { + log_debug!(self.logger, "Peer does not support any of our supported chains"); + return Err(PeerHandleError { }.into()); + } + } + } + + let our_features = self.init_features(&their_node_id); + if msg.features.requires_unknown_bits_from(&our_features) { + log_debug!(self.logger, "Peer requires features unknown to us"); + return Err(PeerHandleError { }.into()); } + + if our_features.requires_unknown_bits_from(&msg.features) { + log_debug!(self.logger, "We require features unknown to our peer"); + return Err(PeerHandleError { }.into()); + } + if peer_lock.their_features.is_some() { - return Err(PeerHandleError{ no_connection_possible: false }.into()); + return Err(PeerHandleError { }.into()); } log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(their_node_id), msg.features); @@ -1226,24 +1536,24 @@ impl { - let mut data_is_printable = true; - for b in msg.data.bytes() { - if b < 32 || b > 126 { - data_is_printable = false; - break; - } - } - - if data_is_printable { - log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), msg.data); - } else { - log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(their_node_id)); - } + log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); self.message_handler.chan_handler.handle_error(&their_node_id, &msg); if msg.channel_id == [0; 32] { - return Err(PeerHandleError{ no_connection_possible: true }.into()); + return Err(PeerHandleError { }.into()); } }, wire::Message::Warning(msg) => { - let mut data_is_printable = true; - for b in msg.data.bytes() { - if b < 32 || b > 126 { - data_is_printable = false; - break; - } - } - - if data_is_printable { - log_debug!(self.logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), msg.data); - } else { - log_debug!(self.logger, "Got warning message from {} with non-ASCII error message", log_pubkey!(their_node_id)); - } + log_debug!(self.logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); }, wire::Message::Ping(msg) => { @@ -1325,10 +1614,16 @@ impl { - self.message_handler.chan_handler.handle_open_channel(&their_node_id, their_features.clone().unwrap(), &msg); + self.message_handler.chan_handler.handle_open_channel(&their_node_id, &msg); + }, + wire::Message::OpenChannelV2(msg) => { + self.message_handler.chan_handler.handle_open_channel_v2(&their_node_id, &msg); }, wire::Message::AcceptChannel(msg) => { - self.message_handler.chan_handler.handle_accept_channel(&their_node_id, their_features.clone().unwrap(), &msg); + self.message_handler.chan_handler.handle_accept_channel(&their_node_id, &msg); + }, + wire::Message::AcceptChannelV2(msg) => { + self.message_handler.chan_handler.handle_accept_channel_v2(&their_node_id, &msg); }, wire::Message::FundingCreated(msg) => { @@ -1341,8 +1636,37 @@ impl { + self.message_handler.chan_handler.handle_tx_add_input(&their_node_id, &msg); + }, + wire::Message::TxAddOutput(msg) => { + self.message_handler.chan_handler.handle_tx_add_output(&their_node_id, &msg); + }, + wire::Message::TxRemoveInput(msg) => { + self.message_handler.chan_handler.handle_tx_remove_input(&their_node_id, &msg); + }, + wire::Message::TxRemoveOutput(msg) => { + self.message_handler.chan_handler.handle_tx_remove_output(&their_node_id, &msg); + }, + wire::Message::TxComplete(msg) => { + self.message_handler.chan_handler.handle_tx_complete(&their_node_id, &msg); + }, + wire::Message::TxSignatures(msg) => { + self.message_handler.chan_handler.handle_tx_signatures(&their_node_id, &msg); + }, + wire::Message::TxInitRbf(msg) => { + self.message_handler.chan_handler.handle_tx_init_rbf(&their_node_id, &msg); + }, + wire::Message::TxAckRbf(msg) => { + self.message_handler.chan_handler.handle_tx_ack_rbf(&their_node_id, &msg); + }, + wire::Message::TxAbort(msg) => { + self.message_handler.chan_handler.handle_tx_abort(&their_node_id, &msg); + } + wire::Message::Shutdown(msg) => { - self.message_handler.chan_handler.handle_shutdown(&their_node_id, their_features.as_ref().unwrap(), &msg); + self.message_handler.chan_handler.handle_shutdown(&their_node_id, &msg); }, wire::Message::ClosingSigned(msg) => { self.message_handler.chan_handler.handle_closing_signed(&their_node_id, &msg); @@ -1384,12 +1708,14 @@ impl MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelAnnouncement(msg)); } + self.update_gossip_backlogged(); }, wire::Message::NodeAnnouncement(msg) => { if self.message_handler.route_handler.handle_node_announcement(&msg) .map_err(|e| -> MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::NodeAnnouncement(msg)); } + self.update_gossip_backlogged(); }, wire::Message::ChannelUpdate(msg) => { self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg); @@ -1397,6 +1723,7 @@ impl MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelUpdate(msg)); } + self.update_gossip_backlogged(); }, wire::Message::QueryShortChannelIds(msg) => { self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?; @@ -1419,14 +1746,13 @@ impl { log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id); - // Fail the channel if message is an even, unknown type as per BOLT #1. - return Err(PeerHandleError{ no_connection_possible: true }.into()); + return Err(PeerHandleError { }.into()); }, wire::Message::Unknown(type_id) => { log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id); }, wire::Message::Custom(custom) => { - self.custom_message_handler.handle_custom_message(custom, &their_node_id)?; + self.message_handler.custom_message_handler.handle_custom_message(custom, &their_node_id)?; }, }; Ok(should_forward) @@ -1440,19 +1766,22 @@ impl { - debug_assert!(val, "compare_exchange failed spuriously?"); - return; - }, - Ok(val) => { - debug_assert!(!val, "compare_exchange succeeded spuriously?"); - // We're the only waiter, as the running process_events may have emptied the - // pending events "long" ago and there are new events for us to process, wait until - // its done and process any leftover events before returning. - _single_processor_lock = Ok(self.event_processing_lock.lock().unwrap()); - self.blocked_event_processors.store(false, Ordering::Release); - } - } + if self.event_processing_state.fetch_add(1, Ordering::AcqRel) > 0 { + // If we're not the first event processor to get here, just return early, the increment + // we just did will be treated as "go around again" at the end. + return; } - let mut peers_to_disconnect = HashMap::new(); - let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); - events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); + loop { + self.update_gossip_backlogged(); + let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed); - { - // TODO: There are some DoS attacks here where you can flood someone's outbound send - // buffer by doing things like announcing channels on another node. We should be willing to - // drop optional-ish messages when send buffers get full! + let mut peers_to_disconnect = HashMap::new(); + let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); + events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); - let peers_lock = self.peers.read().unwrap(); - let peers = &*peers_lock; - macro_rules! get_peer_for_forwarding { - ($node_id: expr) => { - { - if peers_to_disconnect.get($node_id).is_some() { - // If we've "disconnected" this peer, do not send to it. - continue; - } - let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); - match descriptor_opt { - Some(descriptor) => match peers.get(&descriptor) { - Some(peer_mutex) => { - let peer_lock = peer_mutex.lock().unwrap(); - if peer_lock.their_features.is_none() { + { + // TODO: There are some DoS attacks here where you can flood someone's outbound send + // buffer by doing things like announcing channels on another node. We should be willing to + // drop optional-ish messages when send buffers get full! + + let peers_lock = self.peers.read().unwrap(); + let peers = &*peers_lock; + macro_rules! get_peer_for_forwarding { + ($node_id: expr) => { + { + if peers_to_disconnect.get($node_id).is_some() { + // If we've "disconnected" this peer, do not send to it. + continue; + } + let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); + match descriptor_opt { + Some(descriptor) => match peers.get(&descriptor) { + Some(peer_mutex) => { + let peer_lock = peer_mutex.lock().unwrap(); + if !peer_lock.handshake_complete() { + continue; + } + peer_lock + }, + None => { + debug_assert!(false, "Inconsistent peers set state!"); continue; } - peer_lock }, None => { - debug_assert!(false, "Inconsistent peers set state!"); continue; - } - }, - None => { - continue; - }, + }, + } } } } - } - for event in events_generated.drain(..) { - match event { - MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", - log_pubkey!(node_id), - log_bytes!(msg.temporary_channel_id), - log_funding_channel_id!(msg.funding_txid, msg.funding_output_index)); - // TODO: If the peer is gone we should generate a DiscardFunding event - // indicating to the wallet that they should just throw away this funding transaction - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { - log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", - log_pubkey!(node_id), - update_add_htlcs.len(), - update_fulfill_htlcs.len(), - update_fail_htlcs.len(), - log_bytes!(commitment_signed.channel_id)); - let mut peer = get_peer_for_forwarding!(node_id); - for msg in update_add_htlcs { - self.enqueue_message(&mut *peer, msg); - } - for msg in update_fulfill_htlcs { - self.enqueue_message(&mut *peer, msg); - } - for msg in update_fail_htlcs { - self.enqueue_message(&mut *peer, msg); - } - for msg in update_fail_malformed_htlcs { - self.enqueue_message(&mut *peer, msg); - } - if let &Some(ref msg) = update_fee { - self.enqueue_message(&mut *peer, msg); + for event in events_generated.drain(..) { + match event { + MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id), + log_funding_channel_id!(msg.funding_txid, msg.funding_output_index)); + // TODO: If the peer is gone we should generate a DiscardFunding event + // indicating to the wallet that they should just throw away this funding transaction + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", + log_pubkey!(node_id), + update_add_htlcs.len(), + update_fulfill_htlcs.len(), + update_fail_htlcs.len(), + log_bytes!(commitment_signed.channel_id)); + let mut peer = get_peer_for_forwarding!(node_id); + for msg in update_add_htlcs { + self.enqueue_message(&mut *peer, msg); + } + for msg in update_fulfill_htlcs { + self.enqueue_message(&mut *peer, msg); + } + for msg in update_fail_htlcs { + self.enqueue_message(&mut *peer, msg); + } + for msg in update_fail_malformed_htlcs { + self.enqueue_message(&mut *peer, msg); + } + if let &Some(ref msg) = update_fee { + self.enqueue_message(&mut *peer, msg); + } + self.enqueue_message(&mut *peer, commitment_signed); + }, + MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendShutdown { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { + log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { + log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", + log_pubkey!(node_id), + msg.contents.short_channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg); + }, + MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { + log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); + match self.message_handler.route_handler.handle_channel_announcement(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None), + _ => {}, + } + if let Some(msg) = update_msg { + match self.message_handler.route_handler.handle_channel_update(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), + _ => {}, + } + } + }, + MessageSendEvent::BroadcastChannelUpdate { msg } => { + log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); + match self.message_handler.route_handler.handle_channel_update(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), + _ => {}, + } + }, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id); + match self.message_handler.route_handler.handle_node_announcement(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None), + _ => {}, + } + }, + MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { + log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), msg.contents.short_channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::HandleError { node_id, action } => { + match action { + msgs::ErrorAction::DisconnectPeer { msg } => { + if let Some(msg) = msg.as_ref() { + log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), msg.data); + } else { + log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}", + log_pubkey!(node_id)); + } + // We do not have the peers write lock, so we just store that we're + // about to disconenct the peer and do it after we finish + // processing most messages. + let msg = msg.map(|msg| wire::Message::<<::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); + peers_to_disconnect.insert(node_id, msg); + }, + msgs::ErrorAction::DisconnectPeerWithWarning { msg } => { + log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), msg.data); + // We do not have the peers write lock, so we just store that we're + // about to disconenct the peer and do it after we finish + // processing most messages. + peers_to_disconnect.insert(node_id, Some(wire::Message::Warning(msg))); + }, + msgs::ErrorAction::IgnoreAndLog(level) => { + log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); + }, + msgs::ErrorAction::IgnoreDuplicateGossip => {}, + msgs::ErrorAction::IgnoreError => { + log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); + }, + msgs::ErrorAction::SendErrorMessage { ref msg } => { + log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), + msg.data); + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); + }, + msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => { + log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), + msg.data); + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); + }, + } + }, + MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + }, + MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - self.enqueue_message(&mut *peer, commitment_signed); - }, - MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", + MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { + log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendShutdown { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), - log_bytes!(msg.channel_id)); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { - log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", - log_pubkey!(node_id), - msg.contents.short_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg); - }, - MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { - log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); - match self.message_handler.route_handler.handle_channel_announcement(&msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None), - _ => {}, - } - match self.message_handler.route_handler.handle_channel_update(&update_msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None), - _ => {}, - } - }, - MessageSendEvent::BroadcastChannelUpdate { msg } => { - log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); - match self.message_handler.route_handler.handle_channel_update(&msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), - _ => {}, + msg.short_channel_ids.len(), + msg.first_blocknum, + msg.number_of_blocks, + msg.sync_complete); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - }, - MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { - log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), msg.contents.short_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::HandleError { ref node_id, ref action } => { - match *action { - msgs::ErrorAction::DisconnectPeer { ref msg } => { - // We do not have the peers write lock, so we just store that we're - // about to disconenct the peer and do it after we finish - // processing most messages. - peers_to_disconnect.insert(*node_id, msg.clone()); - }, - msgs::ErrorAction::IgnoreAndLog(level) => { - log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); - }, - msgs::ErrorAction::IgnoreDuplicateGossip => {}, - msgs::ErrorAction::IgnoreError => { - log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); - }, - msgs::ErrorAction::SendErrorMessage { ref msg } => { - log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => { - log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, + MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } - }, - MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - }, - MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - } - MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { - log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", - log_pubkey!(node_id), - msg.short_channel_ids.len(), - msg.first_blocknum, - msg.number_of_blocks, - msg.sync_complete); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - } - MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } } - } - for (node_id, msg) in self.custom_message_handler.get_and_clear_pending_msg() { - if peers_to_disconnect.get(&node_id).is_some() { continue; } - self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); - } + for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() { + if peers_to_disconnect.get(&node_id).is_some() { continue; } + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); + } - for (descriptor, peer_mutex) in peers.iter() { - self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap()); + for (descriptor, peer_mutex) in peers.iter() { + let mut peer = peer_mutex.lock().unwrap(); + if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; } + self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled); + } } - } - if !peers_to_disconnect.is_empty() { - let mut peers_lock = self.peers.write().unwrap(); - let peers = &mut *peers_lock; - for (node_id, msg) in peers_to_disconnect.drain() { - // Note that since we are holding the peers *write* lock we can - // remove from node_id_to_descriptor immediately (as no other - // thread can be holding the peer lock if we have the global write - // lock). - - if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { - if let Some(peer_mutex) = peers.remove(&descriptor) { - if let Some(msg) = msg { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); + if !peers_to_disconnect.is_empty() { + let mut peers_lock = self.peers.write().unwrap(); + let peers = &mut *peers_lock; + for (node_id, msg) in peers_to_disconnect.drain() { + // Note that since we are holding the peers *write* lock we can + // remove from node_id_to_descriptor immediately (as no other + // thread can be holding the peer lock if we have the global write + // lock). + + let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); + if let Some(mut descriptor) = descriptor_opt { + if let Some(peer_mutex) = peers.remove(&descriptor) { let mut peer = peer_mutex.lock().unwrap(); - self.enqueue_message(&mut *peer, &msg); - // This isn't guaranteed to work, but if there is enough free - // room in the send buffer, put the error message there... - self.do_attempt_write_data(&mut descriptor, &mut *peer); - } else { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id)); - } + if let Some(msg) = msg { + self.enqueue_message(&mut *peer, &msg); + // This isn't guaranteed to work, but if there is enough free + // room in the send buffer, put the error message there... + self.do_attempt_write_data(&mut descriptor, &mut *peer, false); + } + self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError"); + } else { debug_assert!(false, "Missing connection for peer"); } } - descriptor.disconnect_socket(); - self.message_handler.chan_handler.peer_disconnected(&node_id, false); - self.message_handler.onion_message_handler.peer_disconnected(&node_id, false); } } + + if self.event_processing_state.fetch_sub(1, Ordering::AcqRel) != 1 { + // If another thread incremented the state while we were running we should go + // around again, but only once. + self.event_processing_state.store(1, Ordering::Release); + continue; + } + break; } } /// Indicates that the given socket descriptor's connection is now closed. pub fn socket_disconnected(&self, descriptor: &Descriptor) { - self.disconnect_event_internal(descriptor, false); + self.disconnect_event_internal(descriptor); + } + + fn do_disconnect(&self, mut descriptor: Descriptor, peer: &Peer, reason: &'static str) { + if !peer.handshake_complete() { + log_trace!(self.logger, "Disconnecting peer which hasn't completed handshake due to {}", reason); + descriptor.disconnect_socket(); + return; + } + + debug_assert!(peer.their_node_id.is_some()); + if let Some((node_id, _)) = peer.their_node_id { + log_trace!(self.logger, "Disconnecting peer with id {} due to {}", node_id, reason); + self.message_handler.chan_handler.peer_disconnected(&node_id); + self.message_handler.onion_message_handler.peer_disconnected(&node_id); + } + descriptor.disconnect_socket(); } - fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { + fn disconnect_event_internal(&self, descriptor: &Descriptor) { let mut peers = self.peers.write().unwrap(); let peer_option = peers.remove(descriptor); match peer_option { @@ -1814,13 +2251,13 @@ impl { let peer = peer_lock.lock().unwrap(); - if let Some(node_id) = peer.their_node_id { - log_trace!(self.logger, - "Handling disconnection of peer {}, with {}future connection to the peer possible.", - log_pubkey!(node_id), if no_connection_possible { "no " } else { "" }); - self.node_id_to_descriptor.lock().unwrap().remove(&node_id); - self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); - self.message_handler.onion_message_handler.peer_disconnected(&node_id, no_connection_possible); + if let Some((node_id, _)) = peer.their_node_id { + log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id)); + let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); + debug_assert!(removed.is_some(), "descriptor maps should be consistent"); + if !peer.handshake_complete() { return; } + self.message_handler.chan_handler.peer_disconnected(&node_id); + self.message_handler.onion_message_handler.peer_disconnected(&node_id); } } }; @@ -1828,21 +2265,17 @@ impl 0 && !peer.received_message_since_timer_tick) + || peer.awaiting_pong_timer_tick_intervals as u64 > + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 + { + descriptors_needing_disconnect.push(descriptor.clone()); + break; + } peer.received_message_since_timer_tick = false; - continue; - } - if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick) - || peer.awaiting_pong_timer_tick_intervals as u64 > - MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 - { - descriptors_needing_disconnect.push(descriptor.clone()); - continue; - } - peer.received_message_since_timer_tick = false; + if peer.awaiting_pong_timer_tick_intervals > 0 { + peer.awaiting_pong_timer_tick_intervals += 1; + break; + } - if peer.awaiting_pong_timer_tick_intervals > 0 { - peer.awaiting_pong_timer_tick_intervals += 1; - continue; + peer.awaiting_pong_timer_tick_intervals = 1; + let ping = msgs::Ping { + ponglen: 0, + byteslen: 64, + }; + self.enqueue_message(&mut *peer, &ping); + break; } - - peer.awaiting_pong_timer_tick_intervals = 1; - let ping = msgs::Ping { - ponglen: 0, - byteslen: 64, - }; - self.enqueue_message(&mut *peer, &ping); - self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer); + self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer, flush_read_disabled); } } if !descriptors_needing_disconnect.is_empty() { { let mut peers_lock = self.peers.write().unwrap(); - for descriptor in descriptors_needing_disconnect.iter() { - if let Some(peer) = peers_lock.remove(descriptor) { - if let Some(node_id) = peer.lock().unwrap().their_node_id { - log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id); + for descriptor in descriptors_needing_disconnect { + if let Some(peer_mutex) = peers_lock.remove(&descriptor) { + let peer = peer_mutex.lock().unwrap(); + if let Some((node_id, _)) = peer.their_node_id { self.node_id_to_descriptor.lock().unwrap().remove(&node_id); - self.message_handler.chan_handler.peer_disconnected(&node_id, false); - self.message_handler.onion_message_handler.peer_disconnected(&node_id, false); } + self.do_disconnect(descriptor, &*peer, "ping/handshake timeout"); } } } - - for mut descriptor in descriptors_needing_disconnect.drain(..) { - descriptor.disconnect_socket(); - } } } @@ -1997,18 +2430,28 @@ impl sig, + Err(_) => { + log_error!(self.logger, "Failed to generate signature for node_announcement"); + return; + }, + }; let msg = msgs::NodeAnnouncement { signature: node_announce_sig, @@ -2036,23 +2479,30 @@ fn is_gossip_msg(type_id: u16) -> bool { #[cfg(test)] mod tests { - use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses}; - use ln::{msgs, wire}; - use ln::msgs::NetAddress; - use util::events; - use util::test_utils; - - use bitcoin::secp256k1::Secp256k1; - use bitcoin::secp256k1::{SecretKey, PublicKey}; - - use prelude::*; - use sync::{Arc, Mutex}; - use core::sync::atomic::Ordering; + use crate::sign::{NodeSigner, Recipient}; + use crate::events; + use crate::io; + use crate::ln::features::{InitFeatures, NodeFeatures}; + use crate::ln::peer_channel_encryptor::PeerChannelEncryptor; + use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses}; + use crate::ln::{msgs, wire}; + use crate::ln::msgs::{LightningError, NetAddress}; + use crate::util::test_utils; + + use bitcoin::Network; + use bitcoin::blockdata::constants::ChainHash; + use bitcoin::secp256k1::{PublicKey, SecretKey}; + + use crate::prelude::*; + use crate::sync::{Arc, Mutex}; + use core::convert::Infallible; + use core::sync::atomic::{AtomicBool, Ordering}; #[derive(Clone)] struct FileDescriptor { fd: u16, outbound_data: Arc>>, + disconnect: Arc, } impl PartialEq for FileDescriptor { fn eq(&self, other: &Self) -> bool { @@ -2072,23 +2522,58 @@ mod tests { data.len() } - fn disconnect_socket(&mut self) {} + fn disconnect_socket(&mut self) { self.disconnect.store(true, Ordering::Release); } } struct PeerManagerCfg { chan_handler: test_utils::TestChannelMessageHandler, routing_handler: test_utils::TestRoutingMessageHandler, + custom_handler: TestCustomMessageHandler, logger: test_utils::TestLogger, + node_signer: test_utils::TestNodeSigner, + } + + struct TestCustomMessageHandler { + features: InitFeatures, + } + + impl wire::CustomMessageReader for TestCustomMessageHandler { + type CustomMessage = Infallible; + fn read(&self, _: u16, _: &mut R) -> Result, msgs::DecodeError> { + Ok(None) + } + } + + impl CustomMessageHandler for TestCustomMessageHandler { + fn handle_custom_message(&self, _: Infallible, _: &PublicKey) -> Result<(), LightningError> { + unreachable!(); + } + + fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() } + + fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } + + fn provided_init_features(&self, _: &PublicKey) -> InitFeatures { + self.features.clone() + } } fn create_peermgr_cfgs(peer_count: usize) -> Vec { let mut cfgs = Vec::new(); - for _ in 0..peer_count { + for i in 0..peer_count { + let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); + let features = { + let mut feature_bits = vec![0u8; 33]; + feature_bits[32] = 0b00000001; + InitFeatures::from_le_bytes(feature_bits) + }; cfgs.push( PeerManagerCfg{ - chan_handler: test_utils::TestChannelMessageHandler::new(), + chan_handler: test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)), logger: test_utils::TestLogger::new(), routing_handler: test_utils::TestRoutingMessageHandler::new(), + custom_handler: TestCustomMessageHandler { features }, + node_signer: test_utils::TestNodeSigner::new(node_secret), } ); } @@ -2096,26 +2581,79 @@ mod tests { cfgs } - fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { - let mut peers = Vec::new(); + fn create_feature_incompatible_peermgr_cfgs(peer_count: usize) -> Vec { + let mut cfgs = Vec::new(); + for i in 0..peer_count { + let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); + let features = { + let mut feature_bits = vec![0u8; 33 + i + 1]; + feature_bits[33 + i] = 0b00000001; + InitFeatures::from_le_bytes(feature_bits) + }; + cfgs.push( + PeerManagerCfg{ + chan_handler: test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)), + logger: test_utils::TestLogger::new(), + routing_handler: test_utils::TestRoutingMessageHandler::new(), + custom_handler: TestCustomMessageHandler { features }, + node_signer: test_utils::TestNodeSigner::new(node_secret), + } + ); + } + + cfgs + } + + fn create_chain_incompatible_peermgr_cfgs(peer_count: usize) -> Vec { + let mut cfgs = Vec::new(); for i in 0..peer_count { let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); + let features = InitFeatures::from_le_bytes(vec![0u8; 33]); + let network = ChainHash::from(&[i as u8; 32][..]); + cfgs.push( + PeerManagerCfg{ + chan_handler: test_utils::TestChannelMessageHandler::new(network), + logger: test_utils::TestLogger::new(), + routing_handler: test_utils::TestRoutingMessageHandler::new(), + custom_handler: TestCustomMessageHandler { features }, + node_signer: test_utils::TestNodeSigner::new(node_secret), + } + ); + } + + cfgs + } + + fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { + let mut peers = Vec::new(); + for i in 0..peer_count { let ephemeral_bytes = [i as u8; 32]; - let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler, onion_message_handler: IgnoringMessageHandler {} }; - let peer = PeerManager::new(msg_handler, node_secret, 0, &ephemeral_bytes, &cfgs[i].logger, IgnoringMessageHandler {}); + let msg_handler = MessageHandler { + chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler, + onion_message_handler: IgnoringMessageHandler {}, custom_message_handler: &cfgs[i].custom_handler + }; + let peer = PeerManager::new(msg_handler, 0, &ephemeral_bytes, &cfgs[i].logger, &cfgs[i].node_signer); peers.push(peer); } peers } - fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { - let secp_ctx = Secp256k1::new(); - let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret); - let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; - let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; - let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap(); - peer_a.new_inbound_connection(fd_a.clone(), None).unwrap(); + fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000}; + let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap(); + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001}; + let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); + peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false); peer_a.process_events(); @@ -2130,28 +2668,178 @@ mod tests { let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false); + assert!(peer_a.get_peer_node_ids().contains(&(id_b, Some(addr_b)))); + assert!(peer_b.get_peer_node_ids().contains(&(id_a, Some(addr_a)))); + (fd_a.clone(), fd_b.clone()) } + #[test] + #[cfg(feature = "std")] + fn fuzz_threaded_connections() { + // Spawn two threads which repeatedly connect two peers together, leading to "got second + // connection with peer" disconnections and rapid reconnect. This previously found an issue + // with our internal map consistency, and is a generally good smoke test of disconnection. + let cfgs = Arc::new(create_peermgr_cfgs(2)); + // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }. + let peers = Arc::new(create_network(2, unsafe { &*(&*cfgs as *const _) as &'static _ })); + + let start_time = std::time::Instant::now(); + macro_rules! spawn_thread { ($id: expr) => { { + let peers = Arc::clone(&peers); + let cfgs = Arc::clone(&cfgs); + std::thread::spawn(move || { + let mut ctr = 0; + while start_time.elapsed() < std::time::Duration::from_secs(1) { + let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000}; + let mut fd_b = FileDescriptor { + fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001}; + let initial_data = peers[1].new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); + peers[0].new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); + if peers[0].read_event(&mut fd_a, &initial_data).is_err() { break; } + + while start_time.elapsed() < std::time::Duration::from_secs(1) { + peers[0].process_events(); + if fd_a.disconnect.load(Ordering::Acquire) { break; } + let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); + if peers[1].read_event(&mut fd_b, &a_data).is_err() { break; } + + peers[1].process_events(); + if fd_b.disconnect.load(Ordering::Acquire) { break; } + let b_data = fd_b.outbound_data.lock().unwrap().split_off(0); + if peers[0].read_event(&mut fd_a, &b_data).is_err() { break; } + + cfgs[0].chan_handler.pending_events.lock().unwrap() + .push(crate::events::MessageSendEvent::SendShutdown { + node_id: peers[1].node_signer.get_node_id(Recipient::Node).unwrap(), + msg: msgs::Shutdown { + channel_id: [0; 32], + scriptpubkey: bitcoin::Script::new(), + }, + }); + cfgs[1].chan_handler.pending_events.lock().unwrap() + .push(crate::events::MessageSendEvent::SendShutdown { + node_id: peers[0].node_signer.get_node_id(Recipient::Node).unwrap(), + msg: msgs::Shutdown { + channel_id: [0; 32], + scriptpubkey: bitcoin::Script::new(), + }, + }); + + if ctr % 2 == 0 { + peers[0].timer_tick_occurred(); + peers[1].timer_tick_occurred(); + } + } + + peers[0].socket_disconnected(&fd_a); + peers[1].socket_disconnected(&fd_b); + ctr += 1; + std::thread::sleep(std::time::Duration::from_micros(1)); + } + }) + } } } + let thrd_a = spawn_thread!(1); + let thrd_b = spawn_thread!(2); + + thrd_a.join().unwrap(); + thrd_b.join().unwrap(); + } + + #[test] + fn test_feature_incompatible_peers() { + let cfgs = create_peermgr_cfgs(2); + let incompatible_cfgs = create_feature_incompatible_peermgr_cfgs(2); + + let peers = create_network(2, &cfgs); + let incompatible_peers = create_network(2, &incompatible_cfgs); + let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])]; + for (peer_a, peer_b) in peer_pairs.iter() { + let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000}; + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001}; + let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); + peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); + assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false); + peer_a.process_events(); + + let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); + assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false); + + peer_b.process_events(); + let b_data = fd_b.outbound_data.lock().unwrap().split_off(0); + + // Should fail because of unknown required features + assert!(peer_a.read_event(&mut fd_a, &b_data).is_err()); + } + } + + #[test] + fn test_chain_incompatible_peers() { + let cfgs = create_peermgr_cfgs(2); + let incompatible_cfgs = create_chain_incompatible_peermgr_cfgs(2); + + let peers = create_network(2, &cfgs); + let incompatible_peers = create_network(2, &incompatible_cfgs); + let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])]; + for (peer_a, peer_b) in peer_pairs.iter() { + let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000}; + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001}; + let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); + peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); + assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false); + peer_a.process_events(); + + let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); + assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false); + + peer_b.process_events(); + let b_data = fd_b.outbound_data.lock().unwrap().split_off(0); + + // Should fail because of incompatible chains + assert!(peer_a.read_event(&mut fd_a, &b_data).is_err()); + } + } + #[test] fn test_disconnect_peer() { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and // push a DisconnectPeer event to remove the node flagged by id let cfgs = create_peermgr_cfgs(2); - let chan_handler = test_utils::TestChannelMessageHandler::new(); - let mut peers = create_network(2, &cfgs); + let peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); assert_eq!(peers[0].peers.read().unwrap().len(), 1); - let secp_ctx = Secp256k1::new(); - let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); - - chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError { + let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap(); + cfgs[0].chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError { node_id: their_id, action: msgs::ErrorAction::DisconnectPeer { msg: None }, }); - assert_eq!(chan_handler.pending_events.lock().unwrap().len(), 1); - peers[0].message_handler.chan_handler = &chan_handler; peers[0].process_events(); assert_eq!(peers[0].peers.read().unwrap().len(), 0); @@ -2162,14 +2850,13 @@ mod tests { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and // push a message from one peer to another. let cfgs = create_peermgr_cfgs(2); - let a_chan_handler = test_utils::TestChannelMessageHandler::new(); - let b_chan_handler = test_utils::TestChannelMessageHandler::new(); + let a_chan_handler = test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)); + let b_chan_handler = test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)); let mut peers = create_network(2, &cfgs); let (fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]); assert_eq!(peers[0].peers.read().unwrap().len(), 1); - let secp_ctx = Secp256k1::new(); - let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); + let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap(); let msg = msgs::Shutdown { channel_id: [42; 32], scriptpubkey: bitcoin::Script::new() }; a_chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::SendShutdown { @@ -2186,6 +2873,38 @@ mod tests { assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false); } + #[test] + fn test_non_init_first_msg() { + // Simple test of the first message received over a connection being something other than + // Init. This results in an immediate disconnection, which previously included a spurious + // peer_disconnected event handed to event handlers (which would panic in + // `TestChannelMessageHandler` here). + let cfgs = create_peermgr_cfgs(2); + let peers = create_network(2, &cfgs); + + let mut fd_dup = FileDescriptor { + fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let addr_dup = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1003}; + let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap(); + peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap(); + + let mut dup_encryptor = PeerChannelEncryptor::new_outbound(id_a, SecretKey::from_slice(&[42; 32]).unwrap()); + let initial_data = dup_encryptor.get_act_one(&peers[1].secp_ctx); + assert_eq!(peers[0].read_event(&mut fd_dup, &initial_data).unwrap(), false); + peers[0].process_events(); + + let a_data = fd_dup.outbound_data.lock().unwrap().split_off(0); + let (act_three, _) = + dup_encryptor.process_act_two(&a_data[..], &&cfgs[1].node_signer).unwrap(); + assert_eq!(peers[0].read_event(&mut fd_dup, &act_three).unwrap(), false); + + let not_init_msg = msgs::Ping { ponglen: 4, byteslen: 0 }; + let msg_bytes = dup_encryptor.encrypt_message(¬_init_msg); + assert!(peers[0].read_event(&mut fd_dup, &msg_bytes).is_err()); + } + #[test] fn test_disconnect_all_peer() { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and @@ -2269,10 +2988,15 @@ mod tests { cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release); let peers = create_network(2, &cfgs); - let secp_ctx = Secp256k1::new(); - let a_id = PublicKey::from_secret_key(&secp_ctx, &peers[0].our_node_secret); - let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; - let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; + let a_id = peers[0].node_signer.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone(), None).unwrap(); peers[0].new_inbound_connection(fd_a.clone(), None).unwrap(); @@ -2390,4 +3114,53 @@ mod tests { // For (None) assert_eq!(filter_addresses(None), None); } + + #[test] + #[cfg(feature = "std")] + fn test_process_events_multithreaded() { + use std::time::{Duration, Instant}; + // Test that `process_events` getting called on multiple threads doesn't generate too many + // loop iterations. + // Each time `process_events` goes around the loop we call + // `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`. + // Because the loop should go around once more after a call which fails to take the + // single-threaded lock, if we write zero to the counter before calling `process_events` we + // should never observe there having been more than 2 loop iterations. + // Further, because the last thread to exit will call `process_events` before returning, we + // should always have at least one count at the end. + let cfg = Arc::new(create_peermgr_cfgs(1)); + // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }. + let peer = Arc::new(create_network(1, unsafe { &*(&*cfg as *const _) as &'static _ }).pop().unwrap()); + + let exit_flag = Arc::new(AtomicBool::new(false)); + macro_rules! spawn_thread { () => { { + let thread_cfg = Arc::clone(&cfg); + let thread_peer = Arc::clone(&peer); + let thread_exit = Arc::clone(&exit_flag); + std::thread::spawn(move || { + while !thread_exit.load(Ordering::Acquire) { + thread_cfg[0].chan_handler.message_fetch_counter.store(0, Ordering::Release); + thread_peer.process_events(); + std::thread::sleep(Duration::from_micros(1)); + } + }) + } } } + + let thread_a = spawn_thread!(); + let thread_b = spawn_thread!(); + let thread_c = spawn_thread!(); + + let start_time = Instant::now(); + while start_time.elapsed() < Duration::from_millis(100) { + let val = cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire); + assert!(val <= 2); + std::thread::yield_now(); // Winblowz seemingly doesn't ever interrupt threads?! + } + + exit_flag.store(true, Ordering::Release); + thread_a.join().unwrap(); + thread_b.join().unwrap(); + thread_c.join().unwrap(); + assert!(cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire) >= 1); + } }