X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=9f5a9bd72f487c636ccd4c4a45974a4c8ba087e3;hb=bfff6fda6e2e6183285be59a999d5b74feb08779;hp=9c27a23467ce7a8e7134225f352fc961a3998ace;hpb=1d9e541c5766eb03dfaee7844b27b3bc1d60a05e;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 9c27a234..9f5a9bd7 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -20,7 +20,7 @@ use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; use crate::sign::{NodeSigner, Recipient}; use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; -use crate::ln::ChannelId; +use crate::ln::types::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs; use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler}; @@ -28,7 +28,7 @@ use crate::util::ser::{VecWriter, Writeable, Writer}; use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE}; use crate::ln::wire; use crate::ln::wire::{Encode, Type}; -use crate::onion_message::messenger::{CustomOnionMessageHandler, PendingOnionMessage}; +use crate::onion_message::messenger::{CustomOnionMessageHandler, PendingOnionMessage, Responder, ResponseInstruction}; use crate::onion_message::offers::{OffersMessage, OffersMessageHandler}; use crate::onion_message::packet::OnionMessageContents; use crate::routing::gossip::{NodeId, NodeAlias}; @@ -123,6 +123,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler { } fn processing_queue_high(&self) -> bool { false } } + impl OnionMessageHandler for IgnoringMessageHandler { fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {} fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } @@ -134,12 +135,15 @@ impl OnionMessageHandler for IgnoringMessageHandler { InitFeatures::empty() } } + impl OffersMessageHandler for IgnoringMessageHandler { - fn handle_message(&self, _msg: OffersMessage) -> Option { None } + fn handle_message(&self, _message: OffersMessage, _responder: Option) -> ResponseInstruction { + ResponseInstruction::NoResponse + } } impl CustomOnionMessageHandler for IgnoringMessageHandler { type CustomMessage = Infallible; - fn handle_custom_message(&self, _msg: Infallible) -> Option { + fn handle_custom_message(&self, _message: Self::CustomMessage, _responder: Option) -> ResponseInstruction { // Since we always return `None` in the read the handle method should never be called. unreachable!(); } @@ -153,6 +157,7 @@ impl CustomOnionMessageHandler for IgnoringMessageHandler { impl OnionMessageContents for Infallible { fn tlv_type(&self) -> u64 { unreachable!(); } + fn msg_type(&self) -> &'static str { unreachable!(); } } impl Deref for IgnoringMessageHandler { @@ -248,12 +253,15 @@ impl ChannelMessageHandler for ErroringMessageHandler { fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } + #[cfg(splicing)] fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } + #[cfg(splicing)] fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } + #[cfg(splicing)] fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } @@ -1475,7 +1483,6 @@ impl { let their_node_id = try_potential_handleerror!(peer, @@ -1488,7 +1495,6 @@ impl { if peer.pending_read_is_header { @@ -1681,6 +1687,7 @@ impl { self.message_handler.chan_handler.handle_splice(&their_node_id, &msg); } + #[cfg(splicing)] wire::Message::SpliceAck(msg) => { self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg); } + #[cfg(splicing)] wire::Message::SpliceLocked(msg) => { self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg); } @@ -2671,10 +2681,10 @@ mod tests { use crate::sign::{NodeSigner, Recipient}; use crate::events; use crate::io; - use crate::ln::ChannelId; + use crate::ln::types::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::peer_channel_encryptor::PeerChannelEncryptor; - use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses}; + use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER}; use crate::ln::{msgs, wire}; use crate::ln::msgs::{LightningError, SocketAddress}; use crate::util::test_utils; @@ -3216,6 +3226,105 @@ mod tests { assert!(peers[0].read_event(&mut fd_a, &b_data).is_err()); } + #[test] + fn test_inbound_conn_handshake_complete_awaiting_pong() { + // Test that we do not disconnect an outbound peer after the noise handshake completes due + // to a pong timeout for a ping that was never sent if a timer tick fires after we send act + // two of the noise handshake along with our init message but before we receive their init + // message. + let logger = test_utils::TestLogger::new(); + let node_signer_a = test_utils::TestNodeSigner::new(SecretKey::from_slice(&[42; 32]).unwrap()); + let node_signer_b = test_utils::TestNodeSigner::new(SecretKey::from_slice(&[43; 32]).unwrap()); + let peer_a = PeerManager::new(MessageHandler { + chan_handler: ErroringMessageHandler::new(), + route_handler: IgnoringMessageHandler {}, + onion_message_handler: IgnoringMessageHandler {}, + custom_message_handler: IgnoringMessageHandler {}, + }, 0, &[0; 32], &logger, &node_signer_a); + let peer_b = PeerManager::new(MessageHandler { + chan_handler: ErroringMessageHandler::new(), + route_handler: IgnoringMessageHandler {}, + onion_message_handler: IgnoringMessageHandler {}, + custom_message_handler: IgnoringMessageHandler {}, + }, 0, &[1; 32], &logger, &node_signer_b); + + let a_id = node_signer_a.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + + // Exchange messages with both peers until they both complete the init handshake. + let act_one = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap(); + peer_a.new_inbound_connection(fd_a.clone(), None).unwrap(); + + assert_eq!(peer_a.read_event(&mut fd_a, &act_one).unwrap(), false); + peer_a.process_events(); + + let act_two = fd_a.outbound_data.lock().unwrap().split_off(0); + assert_eq!(peer_b.read_event(&mut fd_b, &act_two).unwrap(), false); + peer_b.process_events(); + + // Calling this here triggers the race on inbound connections. + peer_b.timer_tick_occurred(); + + let act_three_with_init_b = fd_b.outbound_data.lock().unwrap().split_off(0); + assert!(!peer_a.peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().handshake_complete()); + assert_eq!(peer_a.read_event(&mut fd_a, &act_three_with_init_b).unwrap(), false); + peer_a.process_events(); + assert!(peer_a.peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().handshake_complete()); + + let init_a = fd_a.outbound_data.lock().unwrap().split_off(0); + assert!(!init_a.is_empty()); + + assert!(!peer_b.peers.read().unwrap().get(&fd_b).unwrap().lock().unwrap().handshake_complete()); + assert_eq!(peer_b.read_event(&mut fd_b, &init_a).unwrap(), false); + peer_b.process_events(); + assert!(peer_b.peers.read().unwrap().get(&fd_b).unwrap().lock().unwrap().handshake_complete()); + + // Make sure we're still connected. + assert_eq!(peer_b.peers.read().unwrap().len(), 1); + + // B should send a ping on the first timer tick after `handshake_complete`. + assert!(fd_b.outbound_data.lock().unwrap().split_off(0).is_empty()); + peer_b.timer_tick_occurred(); + peer_b.process_events(); + assert!(!fd_b.outbound_data.lock().unwrap().split_off(0).is_empty()); + + let mut send_warning = || { + { + let peers = peer_a.peers.read().unwrap(); + let mut peer_b = peers.get(&fd_a).unwrap().lock().unwrap(); + peer_a.enqueue_message(&mut peer_b, &msgs::WarningMessage { + channel_id: ChannelId([0; 32]), + data: "no disconnect plz".to_string(), + }); + } + peer_a.process_events(); + let msg = fd_a.outbound_data.lock().unwrap().split_off(0); + assert!(!msg.is_empty()); + assert_eq!(peer_b.read_event(&mut fd_b, &msg).unwrap(), false); + peer_b.process_events(); + }; + + // Fire more ticks until we reach the pong timeout. We send any message except pong to + // pretend the connection is still alive. + send_warning(); + for _ in 0..MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER { + peer_b.timer_tick_occurred(); + send_warning(); + } + assert_eq!(peer_b.peers.read().unwrap().len(), 1); + + // One more tick should enforce the pong timeout. + peer_b.timer_tick_occurred(); + assert_eq!(peer_b.peers.read().unwrap().len(), 0); + } + #[test] fn test_filter_addresses(){ // Tests the filter_addresses function.