X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=9ce230861456927f53bf4a2b65dd33e800e17e5d;hb=d00e55077aa613457b6cc1957238a91a700873f8;hp=11cdd906abad191ec163c7b0a55766b3101fb223;hpb=0c74cdc573d43cdabfb7c960f41d6cc3dd55b9e5;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 11cdd906..9ce23086 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -36,9 +36,10 @@ use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::{Level, Logger, WithContext}; use crate::util::string::PrintableString; +#[allow(unused_imports)] use crate::prelude::*; + use crate::io; -use alloc::collections::VecDeque; use crate::sync::{Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; @@ -116,7 +117,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { - InitFeatures::empty() + let mut features = InitFeatures::empty(); + features.set_gossip_queries_optional(); + features } fn processing_queue_high(&self) -> bool { false } } @@ -245,12 +248,15 @@ impl ChannelMessageHandler for ErroringMessageHandler { fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } + #[cfg(splicing)] fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } + #[cfg(splicing)] fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } + #[cfg(splicing)] fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) { ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); } @@ -1472,7 +1478,6 @@ impl { let their_node_id = try_potential_handleerror!(peer, @@ -1485,7 +1490,6 @@ impl { if peer.pending_read_is_header { @@ -1551,6 +1555,7 @@ impl return Err(PeerHandleError { }), (msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { }), + (msgs::DecodeError::DangerousValue, _) => return Err(PeerHandleError { }), } } }; @@ -1588,15 +1593,37 @@ impl, - mut peer_lock: MutexGuard, - message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> - ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { + peer_lock: MutexGuard, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0; let logger = WithContext::from(&self.logger, Some(their_node_id), None); + + let message = match self.do_handle_message_holding_peer_lock(peer_lock, message, &their_node_id, &logger)? { + Some(processed_message) => processed_message, + None => return Ok(None), + }; + + self.do_handle_message_without_peer_lock(peer_mutex, message, &their_node_id, &logger) + } + + // Conducts all message processing that requires us to hold the `peer_lock`. + // + // Returns `None` if the message was fully processed and otherwise returns the message back to + // allow it to be subsequently processed by `do_handle_message_without_peer_lock`. + fn do_handle_message_holding_peer_lock<'a>( + &self, + mut peer_lock: MutexGuard, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, + their_node_id: &PublicKey, + logger: &WithContext<'a, L> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> + { peer_lock.received_message_since_timer_tick = true; // Need an Init as first message @@ -1655,6 +1682,7 @@ impl( + &self, + peer_mutex: &Mutex, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, + their_node_id: &PublicKey, + logger: &WithContext<'a, L> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> + { if is_gossip_msg(message.type_id()) { log_gossip!(logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id)); } else { @@ -1747,13 +1787,16 @@ impl { self.message_handler.chan_handler.handle_splice(&their_node_id, &msg); } + #[cfg(splicing)] wire::Message::SpliceAck(msg) => { self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg); } + #[cfg(splicing)] wire::Message::SpliceLocked(msg) => { self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg); } @@ -1880,7 +1923,7 @@ impl>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { + fn forward_broadcast_msg(&self, peers: &HashMap>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { match msg { wire::Message::ChannelAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg); @@ -2272,7 +2315,7 @@ impl::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); + let msg = msg.map(|msg| wire::Message::<<::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); peers_to_disconnect.insert(node_id, msg); }, msgs::ErrorAction::DisconnectPeerWithWarning { msg } => { @@ -2636,7 +2679,7 @@ mod tests { use crate::ln::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::peer_channel_encryptor::PeerChannelEncryptor; - use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses}; + use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER}; use crate::ln::{msgs, wire}; use crate::ln::msgs::{LightningError, SocketAddress}; use crate::util::test_utils; @@ -2645,11 +2688,13 @@ mod tests { use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{PublicKey, SecretKey}; - use crate::prelude::*; use crate::sync::{Arc, Mutex}; use core::convert::Infallible; use core::sync::atomic::{AtomicBool, Ordering}; + #[allow(unused_imports)] + use crate::prelude::*; + #[derive(Clone)] struct FileDescriptor { fd: u16, @@ -3176,6 +3221,105 @@ mod tests { assert!(peers[0].read_event(&mut fd_a, &b_data).is_err()); } + #[test] + fn test_inbound_conn_handshake_complete_awaiting_pong() { + // Test that we do not disconnect an outbound peer after the noise handshake completes due + // to a pong timeout for a ping that was never sent if a timer tick fires after we send act + // two of the noise handshake along with our init message but before we receive their init + // message. + let logger = test_utils::TestLogger::new(); + let node_signer_a = test_utils::TestNodeSigner::new(SecretKey::from_slice(&[42; 32]).unwrap()); + let node_signer_b = test_utils::TestNodeSigner::new(SecretKey::from_slice(&[43; 32]).unwrap()); + let peer_a = PeerManager::new(MessageHandler { + chan_handler: ErroringMessageHandler::new(), + route_handler: IgnoringMessageHandler {}, + onion_message_handler: IgnoringMessageHandler {}, + custom_message_handler: IgnoringMessageHandler {}, + }, 0, &[0; 32], &logger, &node_signer_a); + let peer_b = PeerManager::new(MessageHandler { + chan_handler: ErroringMessageHandler::new(), + route_handler: IgnoringMessageHandler {}, + onion_message_handler: IgnoringMessageHandler {}, + custom_message_handler: IgnoringMessageHandler {}, + }, 0, &[1; 32], &logger, &node_signer_b); + + let a_id = node_signer_a.get_node_id(Recipient::Node).unwrap(); + let mut fd_a = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + let mut fd_b = FileDescriptor { + fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + }; + + // Exchange messages with both peers until they both complete the init handshake. + let act_one = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap(); + peer_a.new_inbound_connection(fd_a.clone(), None).unwrap(); + + assert_eq!(peer_a.read_event(&mut fd_a, &act_one).unwrap(), false); + peer_a.process_events(); + + let act_two = fd_a.outbound_data.lock().unwrap().split_off(0); + assert_eq!(peer_b.read_event(&mut fd_b, &act_two).unwrap(), false); + peer_b.process_events(); + + // Calling this here triggers the race on inbound connections. + peer_b.timer_tick_occurred(); + + let act_three_with_init_b = fd_b.outbound_data.lock().unwrap().split_off(0); + assert!(!peer_a.peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().handshake_complete()); + assert_eq!(peer_a.read_event(&mut fd_a, &act_three_with_init_b).unwrap(), false); + peer_a.process_events(); + assert!(peer_a.peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().handshake_complete()); + + let init_a = fd_a.outbound_data.lock().unwrap().split_off(0); + assert!(!init_a.is_empty()); + + assert!(!peer_b.peers.read().unwrap().get(&fd_b).unwrap().lock().unwrap().handshake_complete()); + assert_eq!(peer_b.read_event(&mut fd_b, &init_a).unwrap(), false); + peer_b.process_events(); + assert!(peer_b.peers.read().unwrap().get(&fd_b).unwrap().lock().unwrap().handshake_complete()); + + // Make sure we're still connected. + assert_eq!(peer_b.peers.read().unwrap().len(), 1); + + // B should send a ping on the first timer tick after `handshake_complete`. + assert!(fd_b.outbound_data.lock().unwrap().split_off(0).is_empty()); + peer_b.timer_tick_occurred(); + peer_b.process_events(); + assert!(!fd_b.outbound_data.lock().unwrap().split_off(0).is_empty()); + + let mut send_warning = || { + { + let peers = peer_a.peers.read().unwrap(); + let mut peer_b = peers.get(&fd_a).unwrap().lock().unwrap(); + peer_a.enqueue_message(&mut peer_b, &msgs::WarningMessage { + channel_id: ChannelId([0; 32]), + data: "no disconnect plz".to_string(), + }); + } + peer_a.process_events(); + let msg = fd_a.outbound_data.lock().unwrap().split_off(0); + assert!(!msg.is_empty()); + assert_eq!(peer_b.read_event(&mut fd_b, &msg).unwrap(), false); + peer_b.process_events(); + }; + + // Fire more ticks until we reach the pong timeout. We send any message except pong to + // pretend the connection is still alive. + send_warning(); + for _ in 0..MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER { + peer_b.timer_tick_occurred(); + send_warning(); + } + assert_eq!(peer_b.peers.read().unwrap().len(), 1); + + // One more tick should enforce the pong timeout. + peer_b.timer_tick_occurred(); + assert_eq!(peer_b.peers.read().unwrap().len(), 0); + } + #[test] fn test_filter_addresses(){ // Tests the filter_addresses function.