X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=9c27a23467ce7a8e7134225f352fc961a3998ace;hb=113b0f1a0eb503181ed5d5b4d07f0ef83ab30770;hp=3e07f50b203a0e6c1125ed7e257e376a1d68eb7f;hpb=8d9d099bf8e5204e3f6ac1197994239653ce75cd;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 3e07f50b..9c27a234 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -33,12 +33,13 @@ use crate::onion_message::offers::{OffersMessage, OffersMessageHandler}; use crate::onion_message::packet::OnionMessageContents; use crate::routing::gossip::{NodeId, NodeAlias}; use crate::util::atomic_counter::AtomicCounter; -use crate::util::logger::{Logger, WithContext}; +use crate::util::logger::{Level, Logger, WithContext}; use crate::util::string::PrintableString; +#[allow(unused_imports)] use crate::prelude::*; + use crate::io; -use alloc::collections::VecDeque; use crate::sync::{Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; @@ -116,7 +117,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { - InitFeatures::empty() + let mut features = InitFeatures::empty(); + features.set_gossip_queries_optional(); + features } fn processing_queue_high(&self) -> bool { false } } @@ -222,10 +225,10 @@ impl ChannelMessageHandler for ErroringMessageHandler { // Any messages which are related to a specific channel generate an error message to let the // peer know we don't care about channels. fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) { - ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + ErroringMessageHandler::push_error(self, their_node_id, msg.common_fields.temporary_channel_id); } fn handle_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) { - ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + ErroringMessageHandler::push_error(self, their_node_id, msg.common_fields.temporary_channel_id); } fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) { ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); @@ -315,11 +318,11 @@ impl ChannelMessageHandler for ErroringMessageHandler { } fn handle_open_channel_v2(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { - ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + ErroringMessageHandler::push_error(self, their_node_id, msg.common_fields.temporary_channel_id); } fn handle_accept_channel_v2(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { - ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + ErroringMessageHandler::push_error(self, their_node_id, msg.common_fields.temporary_channel_id); } fn handle_tx_add_input(&self, their_node_id: &PublicKey, msg: &msgs::TxAddInput) { @@ -431,6 +434,26 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone { fn disconnect_socket(&mut self); } +/// Details of a connected peer as returned by [`PeerManager::list_peers`]. +pub struct PeerDetails { + /// The node id of the peer. + /// + /// For outbound connections, this [`PublicKey`] will be the same as the `their_node_id` parameter + /// passed in to [`PeerManager::new_outbound_connection`]. + pub counterparty_node_id: PublicKey, + /// The socket address the peer provided in the initial handshake. + /// + /// Will only be `Some` if an address had been previously provided to + /// [`PeerManager::new_outbound_connection`] or [`PeerManager::new_inbound_connection`]. + pub socket_address: Option, + /// The features the peer provided in the initial handshake. + pub init_features: InitFeatures, + /// Indicates the direction of the peer connection. + /// + /// Will be `true` for inbound connections, and `false` for outbound connections. + pub is_inbound_connection: bool, +} + /// Error for PeerManager errors. If you get one of these, you must disconnect the socket and /// generate no further read_event/write_buffer_space_avail/socket_disconnected calls for the /// descriptor. @@ -944,8 +967,8 @@ impl Vec<(PublicKey, Option)> { + /// Returns a list of [`PeerDetails`] for connected peers that have completed the initial + /// handshake. + pub fn list_peers(&self) -> Vec { let peers = self.peers.read().unwrap(); peers.values().filter_map(|peer_mutex| { let p = peer_mutex.lock().unwrap(); if !p.handshake_complete() { return None; } - Some((p.their_node_id.unwrap().0, p.their_socket_address.clone())) + let details = PeerDetails { + // unwrap safety: their_node_id is guaranteed to be `Some` after the handshake + // completed. + counterparty_node_id: p.their_node_id.unwrap().0, + socket_address: p.their_socket_address.clone(), + // unwrap safety: their_features is guaranteed to be `Some` after the handshake + // completed. + init_features: p.their_features.clone().unwrap(), + is_inbound_connection: p.inbound_connection, + }; + Some(details) }).collect() } + /// Returns the [`PeerDetails`] of a connected peer that has completed the initial handshake. + /// + /// Will return `None` if the peer is unknown or it hasn't completed the initial handshake. + pub fn peer_by_node_id(&self, their_node_id: &PublicKey) -> Option { + let peers = self.peers.read().unwrap(); + peers.values().find_map(|peer_mutex| { + let p = peer_mutex.lock().unwrap(); + if !p.handshake_complete() { + return None; + } + + // unwrap safety: their_node_id is guaranteed to be `Some` after the handshake + // completed. + let counterparty_node_id = p.their_node_id.unwrap().0; + + if counterparty_node_id != *their_node_id { + return None; + } + + let details = PeerDetails { + counterparty_node_id, + socket_address: p.their_socket_address.clone(), + // unwrap safety: their_features is guaranteed to be `Some` after the handshake + // completed. + init_features: p.their_features.clone().unwrap(), + is_inbound_connection: p.inbound_connection, + }; + Some(details) + }) + } + fn get_ephemeral_key(&self) -> SecretKey { let mut ephemeral_hash = self.ephemeral_key_midstate.clone(); let counter = self.peer_counter.get_increment(); @@ -1329,7 +1385,9 @@ impl { - log_given_level!(logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_given_level!(logger, level, "Error handling {}message{}; ignoring: {}", + if level == Level::Gossip { "gossip " } else { "" }, + OptionalFromDebugger(&peer_node_id), e.err); continue }, msgs::ErrorAction::IgnoreDuplicateGossip => continue, // Don't even bother logging these @@ -1496,6 +1554,7 @@ impl return Err(PeerHandleError { }), (msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { }), + (msgs::DecodeError::DangerousValue, _) => return Err(PeerHandleError { }), } } }; @@ -1533,15 +1592,37 @@ impl, - mut peer_lock: MutexGuard, - message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> - ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { + peer_lock: MutexGuard, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0; let logger = WithContext::from(&self.logger, Some(their_node_id), None); + + let message = match self.do_handle_message_holding_peer_lock(peer_lock, message, &their_node_id, &logger)? { + Some(processed_message) => processed_message, + None => return Ok(None), + }; + + self.do_handle_message_without_peer_lock(peer_mutex, message, &their_node_id, &logger) + } + + // Conducts all message processing that requires us to hold the `peer_lock`. + // + // Returns `None` if the message was fully processed and otherwise returns the message back to + // allow it to be subsequently processed by `do_handle_message_without_peer_lock`. + fn do_handle_message_holding_peer_lock<'a>( + &self, + mut peer_lock: MutexGuard, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, + their_node_id: &PublicKey, + logger: &WithContext<'a, L> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> + { peer_lock.received_message_since_timer_tick = true; // Need an Init as first message @@ -1622,8 +1703,20 @@ impl( + &self, + peer_mutex: &Mutex, + message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, + their_node_id: &PublicKey, + logger: &WithContext<'a, L> + ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> + { if is_gossip_msg(message.type_id()) { log_gossip!(logger, "Received message {:?} from {}", message, log_pubkey!(their_node_id)); } else { @@ -1825,7 +1918,7 @@ impl>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { + fn forward_broadcast_msg(&self, peers: &HashMap>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { match msg { wire::Message::ChannelAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg); @@ -1938,7 +2031,7 @@ impl { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), - &msg.temporary_channel_id); + &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), - &msg.temporary_channel_id); + &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendOpenChannel event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), - &msg.temporary_channel_id); + &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { - log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), - &msg.temporary_channel_id); + &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { @@ -2217,7 +2310,7 @@ impl::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); + let msg = msg.map(|msg| wire::Message::<<::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); peers_to_disconnect.insert(node_id, msg); }, msgs::ErrorAction::DisconnectPeerWithWarning { msg } => { @@ -2590,11 +2683,13 @@ mod tests { use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{PublicKey, SecretKey}; - use crate::prelude::*; use crate::sync::{Arc, Mutex}; use core::convert::Infallible; use core::sync::atomic::{AtomicBool, Ordering}; + #[allow(unused_imports)] + use crate::prelude::*; + #[derive(Clone)] struct FileDescriptor { fd: u16, @@ -2744,6 +2839,8 @@ mod tests { }; let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000}; let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap(); + let features_a = peer_a.init_features(&id_b); + let features_b = peer_b.init_features(&id_a); let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), disconnect: Arc::new(AtomicBool::new(false)), @@ -2765,9 +2862,12 @@ mod tests { let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false); - assert!(peer_a.get_peer_node_ids().contains(&(id_b, Some(addr_b)))); - assert!(peer_b.get_peer_node_ids().contains(&(id_a, Some(addr_a)))); - + assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().counterparty_node_id, id_b); + assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().socket_address, Some(addr_b)); + assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().init_features, features_b); + assert_eq!(peer_b.peer_by_node_id(&id_a).unwrap().counterparty_node_id, id_a); + assert_eq!(peer_b.peer_by_node_id(&id_a).unwrap().socket_address, Some(addr_a)); + assert_eq!(peer_b.peer_by_node_id(&id_a).unwrap().init_features, features_a); (fd_a.clone(), fd_b.clone()) }