X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=448dd213dad553ba8143e15b9d32cbea2fb1464c;hb=642913c586fc71b0e413532e7dedcd19cfd4815c;hp=0322de6428e12416fa28a1f7dd5cd3ea037e072f;hpb=ae0d825d89ca0ac2489737d1b413e778650b093c;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 0322de64..448dd213 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -19,16 +19,16 @@ use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; use crate::sign::{NodeSigner, Recipient}; -use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; -use crate::ln::ChannelId; +use crate::events::{MessageSendEvent, MessageSendEventsProvider}; +use crate::ln::types::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs; -use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler}; +use crate::ln::msgs::{ChannelMessageHandler, Init, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler}; use crate::util::ser::{VecWriter, Writeable, Writer}; use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE}; use crate::ln::wire; use crate::ln::wire::{Encode, Type}; -use crate::onion_message::messenger::{CustomOnionMessageHandler, PendingOnionMessage}; +use crate::onion_message::messenger::{CustomOnionMessageHandler, PendingOnionMessage, Responder, ResponseInstruction}; use crate::onion_message::offers::{OffersMessage, OffersMessageHandler}; use crate::onion_message::packet::OnionMessageContents; use crate::routing::gossip::{NodeId, NodeAlias}; @@ -79,6 +79,16 @@ pub trait CustomMessageHandler: wire::CustomMessageReader { /// connection to the node exists, then the message is simply not sent. fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>; + /// Indicates a peer disconnected. + fn peer_disconnected(&self, their_node_id: &PublicKey); + + /// Handle a peer connecting. + /// + /// May return an `Err(())` if the features the peer supports are not sufficient to communicate + /// with us. Implementors should be somewhat conservative about doing so, however, as other + /// message handlers may still wish to communicate with this peer. + fn peer_connected(&self, their_node_id: &PublicKey, msg: &Init, inbound: bool) -> Result<(), ()>; + /// Gets the node feature flags which this handler itself supports. All available handlers are /// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`] /// which are broadcasted in our [`NodeAnnouncement`] message. @@ -97,9 +107,6 @@ pub trait CustomMessageHandler: wire::CustomMessageReader { /// A dummy struct which implements `RoutingMessageHandler` without storing any routing information /// or doing any processing. You can provide one of these as the route_handler in a MessageHandler. pub struct IgnoringMessageHandler{} -impl EventsProvider for IgnoringMessageHandler { - fn process_pending_events(&self, _handler: H) where H::Target: EventHandler {} -} impl MessageSendEventsProvider for IgnoringMessageHandler { fn get_and_clear_pending_msg_events(&self) -> Vec { Vec::new() } } @@ -123,6 +130,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler { } fn processing_queue_high(&self) -> bool { false } } + impl OnionMessageHandler for IgnoringMessageHandler { fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {} fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } @@ -134,12 +142,15 @@ impl OnionMessageHandler for IgnoringMessageHandler { InitFeatures::empty() } } + impl OffersMessageHandler for IgnoringMessageHandler { - fn handle_message(&self, _msg: OffersMessage) -> Option { None } + fn handle_message(&self, _message: OffersMessage, _responder: Option) -> ResponseInstruction { + ResponseInstruction::NoResponse + } } impl CustomOnionMessageHandler for IgnoringMessageHandler { type CustomMessage = Infallible; - fn handle_custom_message(&self, _msg: Infallible) -> Option { + fn handle_custom_message(&self, _message: Self::CustomMessage, _responder: Option) -> ResponseInstruction { // Since we always return `None` in the read the handle method should never be called. unreachable!(); } @@ -153,6 +164,7 @@ impl CustomOnionMessageHandler for IgnoringMessageHandler { impl OnionMessageContents for Infallible { fn tlv_type(&self) -> u64 { unreachable!(); } + fn msg_type(&self) -> &'static str { unreachable!(); } } impl Deref for IgnoringMessageHandler { @@ -188,6 +200,10 @@ impl CustomMessageHandler for IgnoringMessageHandler { fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() } + fn peer_disconnected(&self, _their_node_id: &PublicKey) {} + + fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &Init, _inbound: bool) -> Result<(), ()> { Ok(()) } + fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { @@ -248,12 +264,15 @@ impl ChannelMessageHandler for ErroringMessageHandler { fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } + #[cfg(splicing)] fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } + #[cfg(splicing)] fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } + #[cfg(splicing)] fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } @@ -715,8 +734,6 @@ pub trait APeerManager { type NS: Deref; /// Gets a reference to the underlying [`PeerManager`]. fn as_ref(&self) -> &PeerManager; - /// Returns the peer manager's [`OnionMessageHandler`]. - fn onion_message_handler(&self) -> &Self::OMT; } impl @@ -742,9 +759,6 @@ APeerManager for PeerManager where type NST = ::Target; type NS = NS; fn as_ref(&self) -> &PeerManager { self } - fn onion_message_handler(&self) -> &Self::OMT { - self.message_handler.onion_message_handler.deref() - } } /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls @@ -1326,7 +1340,7 @@ impl(&self, peer: &mut Peer, message: &M) { - let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None); + let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None); if is_gossip_msg(message.type_id()) { log_gossip!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)); } else { @@ -1361,7 +1375,7 @@ impl {{ let res = $thing; - let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None); + let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None, None); match res { Ok(x) => x, Err(e) => { @@ -1433,7 +1447,7 @@ impl { - let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None); + let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None); match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) { hash_map::Entry::Occupied(e) => { log_trace!(logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); @@ -1475,7 +1489,6 @@ impl { let their_node_id = try_potential_handleerror!(peer, @@ -1488,7 +1501,6 @@ impl { if peer.pending_read_is_header { @@ -1513,7 +1525,7 @@ impl x, Err(e) => { @@ -1592,15 +1604,37 @@ impl, - mut peer_lock: MutexGuard, - message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> - ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { + peer_lock: MutexGuard, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0; - let logger = WithContext::from(&self.logger, Some(their_node_id), None); + let logger = WithContext::from(&self.logger, Some(their_node_id), None, None); + + let message = match self.do_handle_message_holding_peer_lock(peer_lock, message, &their_node_id, &logger)? { + Some(processed_message) => processed_message, + None => return Ok(None), + }; + + self.do_handle_message_without_peer_lock(peer_mutex, message, &their_node_id, &logger) + } + + // Conducts all message processing that requires us to hold the `peer_lock`. + // + // Returns `None` if the message was fully processed and otherwise returns the message back to + // allow it to be subsequently processed by `do_handle_message_without_peer_lock`. + fn do_handle_message_holding_peer_lock<'a>( + &self, + mut peer_lock: MutexGuard, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, + their_node_id: &PublicKey, + logger: &WithContext<'a, L> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> + { peer_lock.received_message_since_timer_tick = true; // Need an Init as first message @@ -1626,12 +1660,14 @@ impl( + &self, + peer_mutex: &Mutex, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, + their_node_id: &PublicKey, + logger: &WithContext<'a, L> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> + { if is_gossip_msg(message.type_id()) { log_gossip!(logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id)); } else { @@ -1751,13 +1804,16 @@ impl { self.message_handler.chan_handler.handle_splice(&their_node_id, &msg); } + #[cfg(splicing)] wire::Message::SpliceAck(msg) => { self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg); } + #[cfg(splicing)] wire::Message::SpliceLocked(msg) => { self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg); } @@ -1884,7 +1940,7 @@ impl>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { + fn forward_broadcast_msg(&self, peers: &HashMap>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { match msg { wire::Message::ChannelAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg); @@ -1898,7 +1954,7 @@ impl { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannel event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", log_pubkey!(node_id), &msg.temporary_channel_id, ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index)); @@ -2071,107 +2127,107 @@ impl { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendFundingSigned event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReady event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendStfu { ref node_id, ref msg} => { - let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } MessageSendEvent::SendSplice { ref node_id, ref msg} => { - let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendSplice event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => { - let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => { - let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddInput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxComplete event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxSignatures event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAbort event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id)), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", log_pubkey!(node_id), update_add_htlcs.len(), update_fulfill_htlcs.len(), @@ -2196,31 +2252,31 @@ impl { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendClosingSigned event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling Shutdown event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", log_pubkey!(node_id), msg.contents.short_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); @@ -2258,12 +2314,12 @@ impl { - log_trace!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", + log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", log_pubkey!(node_id), msg.contents.short_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::HandleError { node_id, action } => { - let logger = WithContext::from(&self.logger, Some(node_id), None); + let logger = WithContext::from(&self.logger, Some(node_id), None, None); match action { msgs::ErrorAction::DisconnectPeer { msg } => { if let Some(msg) = msg.as_ref() { @@ -2276,7 +2332,7 @@ impl::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); + let msg = msg.map(|msg| wire::Message::<<::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); peers_to_disconnect.insert(node_id, msg); }, msgs::ErrorAction::DisconnectPeerWithWarning { msg } => { @@ -2315,7 +2371,7 @@ impl { - log_gossip!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", + log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", log_pubkey!(node_id), msg.short_channel_ids.len(), msg.first_blocknum, @@ -2389,9 +2445,10 @@ impl { let peer = peer_lock.lock().unwrap(); if let Some((node_id, _)) = peer.their_node_id { - log_trace!(WithContext::from(&self.logger, Some(node_id), None), "Handling disconnection of peer {}", log_pubkey!(node_id)); + log_trace!(WithContext::from(&self.logger, Some(node_id), None, None), "Handling disconnection of peer {}", log_pubkey!(node_id)); let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); debug_assert!(removed.is_some(), "descriptor maps should be consistent"); if !peer.handshake_complete() { return; } self.message_handler.chan_handler.peer_disconnected(&node_id); self.message_handler.onion_message_handler.peer_disconnected(&node_id); + self.message_handler.custom_message_handler.peer_disconnected(&node_id); } } }; @@ -2637,12 +2695,12 @@ mod tests { use crate::sign::{NodeSigner, Recipient}; use crate::events; use crate::io; - use crate::ln::ChannelId; + use crate::ln::types::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::peer_channel_encryptor::PeerChannelEncryptor; - use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses}; + use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER}; use crate::ln::{msgs, wire}; - use crate::ln::msgs::{LightningError, SocketAddress}; + use crate::ln::msgs::{Init, LightningError, SocketAddress}; use crate::util::test_utils; use bitcoin::Network; @@ -2709,6 +2767,11 @@ mod tests { fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() } + + fn peer_disconnected(&self, _their_node_id: &PublicKey) {} + + fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &Init, _inbound: bool) -> Result<(), ()> { Ok(()) } + fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _: &PublicKey) -> InitFeatures { @@ -3182,6 +3245,105 @@ mod tests { assert!(peers[0].read_event(&mut fd_a, &b_data).is_err()); } + #[test] + fn test_inbound_conn_handshake_complete_awaiting_pong() { + // Test that we do not disconnect an outbound peer after the noise handshake completes due + // to a pong timeout for a ping that was never sent if a timer tick fires after we send act + // two of the noise handshake along with our init message but before we receive their init + // message. + let logger = test_utils::TestLogger::new(); + let node_signer_a = test_utils::TestNodeSigner::new(SecretKey::from_slice(&[42; 32]).unwrap()); + let node_signer_b = test_utils::TestNodeSigner::new(SecretKey::from_slice(&[43; 32]).unwrap()); + let peer_a = PeerManager::new(MessageHandler { + chan_handler: ErroringMessageHandler::new(), + route_handler: IgnoringMessageHandler {}, + onion_message_handler: IgnoringMessageHandler {}, + custom_message_handler: IgnoringMessageHandler {}, + }, 0, &[0; 32], &logger, &node_signer_a); + let peer_b = PeerManager::new(MessageHandler { + chan_handler: ErroringMessageHandler::new(), + route_handler: IgnoringMessageHandler {}, + onion_message_handler: IgnoringMessageHandler {}, + custom_message_handler: IgnoringMessageHandler {}, + }, 0, &[1; 32], &logger, &node_signer_b); + + let a_id = node_signer_a.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + + // Exchange messages with both peers until they both complete the init handshake. + let act_one = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap(); + peer_a.new_inbound_connection(fd_a.clone(), None).unwrap(); + + assert_eq!(peer_a.read_event(&mut fd_a, &act_one).unwrap(), false); + peer_a.process_events(); + + let act_two = fd_a.outbound_data.lock().unwrap().split_off(0); + assert_eq!(peer_b.read_event(&mut fd_b, &act_two).unwrap(), false); + peer_b.process_events(); + + // Calling this here triggers the race on inbound connections. + peer_b.timer_tick_occurred(); + + let act_three_with_init_b = fd_b.outbound_data.lock().unwrap().split_off(0); + assert!(!peer_a.peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().handshake_complete()); + assert_eq!(peer_a.read_event(&mut fd_a, &act_three_with_init_b).unwrap(), false); + peer_a.process_events(); + assert!(peer_a.peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().handshake_complete()); + + let init_a = fd_a.outbound_data.lock().unwrap().split_off(0); + assert!(!init_a.is_empty()); + + assert!(!peer_b.peers.read().unwrap().get(&fd_b).unwrap().lock().unwrap().handshake_complete()); + assert_eq!(peer_b.read_event(&mut fd_b, &init_a).unwrap(), false); + peer_b.process_events(); + assert!(peer_b.peers.read().unwrap().get(&fd_b).unwrap().lock().unwrap().handshake_complete()); + + // Make sure we're still connected. + assert_eq!(peer_b.peers.read().unwrap().len(), 1); + + // B should send a ping on the first timer tick after `handshake_complete`. + assert!(fd_b.outbound_data.lock().unwrap().split_off(0).is_empty()); + peer_b.timer_tick_occurred(); + peer_b.process_events(); + assert!(!fd_b.outbound_data.lock().unwrap().split_off(0).is_empty()); + + let mut send_warning = || { + { + let peers = peer_a.peers.read().unwrap(); + let mut peer_b = peers.get(&fd_a).unwrap().lock().unwrap(); + peer_a.enqueue_message(&mut peer_b, &msgs::WarningMessage { + channel_id: ChannelId([0; 32]), + data: "no disconnect plz".to_string(), + }); + } + peer_a.process_events(); + let msg = fd_a.outbound_data.lock().unwrap().split_off(0); + assert!(!msg.is_empty()); + assert_eq!(peer_b.read_event(&mut fd_b, &msg).unwrap(), false); + peer_b.process_events(); + }; + + // Fire more ticks until we reach the pong timeout. We send any message except pong to + // pretend the connection is still alive. + send_warning(); + for _ in 0..MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER { + peer_b.timer_tick_occurred(); + send_warning(); + } + assert_eq!(peer_b.peers.read().unwrap().len(), 1); + + // One more tick should enforce the pong timeout. + peer_b.timer_tick_occurred(); + assert_eq!(peer_b.peers.read().unwrap().len(), 0); + } + #[test] fn test_filter_addresses(){ // Tests the filter_addresses function.