X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=f128f6c5259802466cf99f27496b554d7b88f9ea;hb=a534a5e7af4923122a359005a99f01cfb33b451c;hp=bb7b697e420ea5603e18dca088808b5150938ecd;hpb=97711aef96556637b3f8b4336b688fbfcf4c2beb;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index bb7b697e..f128f6c5 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -15,7 +15,7 @@ //! call into the provided message handlers (probably a ChannelManager and NetGraphmsgHandler) with messages //! they should handle, and encoding/sending response messages. -use bitcoin::secp256k1::key::{SecretKey,PublicKey}; +use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; use ln::features::InitFeatures; use ln::msgs; @@ -33,7 +33,7 @@ use routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; use prelude::*; use io; use alloc::collections::LinkedList; -use sync::{Arc, Mutex, MutexGuard, RwLock}; +use sync::{Arc, Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, Ordering}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; @@ -258,8 +258,13 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone { /// descriptor. #[derive(Clone)] pub struct PeerHandleError { - /// Used to indicate that we probably can't make any future connections to this peer, implying - /// we should go ahead and force-close any channels we have with it. + /// Used to indicate that we probably can't make any future connections to this peer (e.g. + /// because we required features that our peer was missing, or vice versa). + /// + /// While LDK's [`ChannelManager`] will not do it automatically, you likely wish to force-close + /// any channels with this peer or check for new versions of LDK. + /// + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub no_connection_possible: bool, } impl fmt::Debug for PeerHandleError { @@ -376,14 +381,6 @@ impl Peer { } } -struct PeerHolder { - /// Peer is under its own mutex for sending and receiving bytes, but note that we do *not* hold - /// this mutex while we're processing a message. This is fine as [`PeerManager::read_event`] - /// requires that there be no parallel calls for a given peer, so mutual exclusion of messages - /// handed to the `MessageHandler`s for a given peer is already guaranteed. - peers: HashMap>, -} - /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. /// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static /// lifetimes). Other times you can afford a reference, which is more efficient, in which case @@ -428,7 +425,15 @@ pub struct PeerManager, - peers: RwLock>, + /// Connection state for each connected peer - we have an outer read-write lock which is taken + /// as read while we're doing processing for a peer and taken write when a peer is being added + /// or removed. + /// + /// The inner Peer lock is held for sending and receiving bytes, but note that we do *not* hold + /// it while we're processing a message. This is fine as [`PeerManager::read_event`] requires + /// that there be no parallel calls for a given peer, so mutual exclusion of messages handed to + /// the `MessageHandler`s for a given peer is already guaranteed. + peers: FairRwLock>>, /// Only add to this set when noise completes. /// Locked *after* peers. When an item is removed, it must be removed with the `peers` write /// lock held. Entries may be added with only the `peers` read lock held (though the @@ -450,6 +455,7 @@ pub struct PeerManager } enum MessageHandlingError { @@ -568,11 +574,13 @@ impl P let mut ephemeral_key_midstate = Sha256::engine(); ephemeral_key_midstate.input(ephemeral_random_data); + let mut secp_ctx = Secp256k1::signing_only(); + let ephemeral_hash = Sha256::from_engine(ephemeral_key_midstate.clone()).into_inner(); + secp_ctx.seeded_randomize(&ephemeral_hash); + PeerManager { message_handler, - peers: RwLock::new(PeerHolder { - peers: HashMap::new(), - }), + peers: FairRwLock::new(HashMap::new()), node_id_to_descriptor: Mutex::new(HashMap::new()), event_processing_lock: Mutex::new(()), blocked_event_processors: AtomicBool::new(false), @@ -581,6 +589,7 @@ impl P peer_counter: AtomicCounter::new(), logger, custom_message_handler, + secp_ctx, } } @@ -591,7 +600,7 @@ impl P /// completed and we are sure the remote peer has the private key for the given node_id. pub fn get_peer_node_ids(&self) -> Vec { let peers = self.peers.read().unwrap(); - peers.peers.values().filter_map(|peer_mutex| { + peers.values().filter_map(|peer_mutex| { let p = peer_mutex.lock().unwrap(); if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() { return None; @@ -625,11 +634,11 @@ impl P /// [`socket_disconnected()`]: PeerManager::socket_disconnected pub fn new_outbound_connection(&self, their_node_id: PublicKey, descriptor: Descriptor, remote_network_address: Option) -> Result, PeerHandleError> { let mut peer_encryptor = PeerChannelEncryptor::new_outbound(their_node_id.clone(), self.get_ephemeral_key()); - let res = peer_encryptor.get_act_one().to_vec(); + let res = peer_encryptor.get_act_one(&self.secp_ctx).to_vec(); let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes let mut peers = self.peers.write().unwrap(); - if peers.peers.insert(descriptor, Mutex::new(Peer { + if peers.insert(descriptor, Mutex::new(Peer { channel_encryptor: peer_encryptor, their_node_id: None, their_features: None, @@ -672,11 +681,11 @@ impl P /// /// [`socket_disconnected()`]: PeerManager::socket_disconnected pub fn new_inbound_connection(&self, descriptor: Descriptor, remote_network_address: Option) -> Result<(), PeerHandleError> { - let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.our_node_secret); + let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.our_node_secret, &self.secp_ctx); let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes let mut peers = self.peers.write().unwrap(); - if peers.peers.insert(descriptor, Mutex::new(Peer { + if peers.insert(descriptor, Mutex::new(Peer { channel_encryptor: peer_encryptor, their_node_id: None, their_features: None, @@ -787,7 +796,7 @@ impl P /// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> { let peers = self.peers.read().unwrap(); - match peers.peers.get(descriptor) { + match peers.get(descriptor) { None => { // This is most likely a simple race condition where the user found that the socket // was writeable, then we told the user to `disconnect_socket()`, then they called @@ -852,7 +861,7 @@ impl P let peers = self.peers.read().unwrap(); let mut msgs_to_forward = Vec::new(); let mut peer_node_id = None; - match peers.peers.get(peer_descriptor) { + match peers.get(peer_descriptor) { None => { // This is most likely a simple race condition where the user read some bytes // from the socket, then we told the user to `disconnect_socket()`, then they @@ -937,14 +946,16 @@ impl P let next_step = peer.channel_encryptor.get_noise_step(); match next_step { NextNoiseStep::ActOne => { - let act_two = try_potential_handleerror!(peer, - peer.channel_encryptor.process_act_one_with_keys(&peer.pending_read_buffer[..], &self.our_node_secret, self.get_ephemeral_key())).to_vec(); + let act_two = try_potential_handleerror!(peer, peer.channel_encryptor + .process_act_one_with_keys(&peer.pending_read_buffer[..], + &self.our_node_secret, self.get_ephemeral_key(), &self.secp_ctx)).to_vec(); peer.pending_outbound_buffer.push_back(act_two); peer.pending_read_buffer = [0; 66].to_vec(); // act three is 66 bytes long }, NextNoiseStep::ActTwo => { let (act_three, their_node_id) = try_potential_handleerror!(peer, - peer.channel_encryptor.process_act_two(&peer.pending_read_buffer[..], &self.our_node_secret)); + peer.channel_encryptor.process_act_two(&peer.pending_read_buffer[..], + &self.our_node_secret, &self.secp_ctx)); peer.pending_outbound_buffer.push_back(act_three.to_vec()); peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes peer.pending_read_is_header = true; @@ -972,7 +983,7 @@ impl P if peer.pending_read_is_header { let msg_len = try_potential_handleerror!(peer, peer.channel_encryptor.decrypt_length_header(&peer.pending_read_buffer[..])); - peer.pending_read_buffer = Vec::with_capacity(msg_len as usize + 16); + if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); } peer.pending_read_buffer.resize(msg_len as usize + 16, 0); if msg_len < 2 { // Need at least the message type tag return Err(PeerHandleError{ no_connection_possible: false }); @@ -984,7 +995,8 @@ impl P assert!(msg_data.len() >= 2); // Reset read buffer - peer.pending_read_buffer = [0; 18].to_vec(); + if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); } + peer.pending_read_buffer.resize(18, 0); peer.pending_read_is_header = true; let mut reader = io::Cursor::new(&msg_data[..]); @@ -1287,13 +1299,13 @@ impl P Ok(should_forward) } - fn forward_broadcast_msg(&self, peers: &PeerHolder, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { + fn forward_broadcast_msg(&self, peers: &HashMap>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { match msg { wire::Message::ChannelAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer_mutex) in peers.peers.iter() { + for (_, peer_mutex) in peers.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { @@ -1319,7 +1331,7 @@ impl P log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer_mutex) in peers.peers.iter() { + for (_, peer_mutex) in peers.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_node_announcement(msg.contents.node_id) { @@ -1344,7 +1356,7 @@ impl P log_gossip!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer_mutex) in peers.peers.iter() { + for (_, peer_mutex) in peers.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { @@ -1425,7 +1437,7 @@ impl P } let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); match descriptor_opt { - Some(descriptor) => match peers.peers.get(&descriptor) { + Some(descriptor) => match peers.get(&descriptor) { Some(peer_mutex) => { let peer_lock = peer_mutex.lock().unwrap(); if peer_lock.their_features.is_none() { @@ -1624,7 +1636,7 @@ impl P self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); } - for (descriptor, peer_mutex) in peers.peers.iter() { + for (descriptor, peer_mutex) in peers.iter() { self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap()); } } @@ -1638,7 +1650,7 @@ impl P // lock). if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { - if let Some(peer_mutex) = peers.peers.remove(&descriptor) { + if let Some(peer_mutex) = peers.remove(&descriptor) { if let Some(msg) = msg { log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), @@ -1666,7 +1678,7 @@ impl P fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { let mut peers = self.peers.write().unwrap(); - let peer_option = peers.peers.remove(descriptor); + let peer_option = peers.remove(descriptor); match peer_option { None => { // This is most likely a simple race condition where the user found that the socket @@ -1675,15 +1687,12 @@ impl P }, Some(peer_lock) => { let peer = peer_lock.lock().unwrap(); - match peer.their_node_id { - Some(node_id) => { - log_trace!(self.logger, - "Handling disconnection of peer {}, with {}future connection to the peer possible.", - log_pubkey!(node_id), if no_connection_possible { "no " } else { "" }); - self.node_id_to_descriptor.lock().unwrap().remove(&node_id); - self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); - }, - None => {} + if let Some(node_id) = peer.their_node_id { + log_trace!(self.logger, + "Handling disconnection of peer {}, with {}future connection to the peer possible.", + log_pubkey!(node_id), if no_connection_possible { "no " } else { "" }); + self.node_id_to_descriptor.lock().unwrap().remove(&node_id); + self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); } } }; @@ -1702,7 +1711,7 @@ impl P let mut peers_lock = self.peers.write().unwrap(); if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id); - peers_lock.peers.remove(&descriptor); + peers_lock.remove(&descriptor); self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); descriptor.disconnect_socket(); } @@ -1715,7 +1724,7 @@ impl P let mut peers_lock = self.peers.write().unwrap(); self.node_id_to_descriptor.lock().unwrap().clear(); let peers = &mut *peers_lock; - for (mut descriptor, peer) in peers.peers.drain() { + for (mut descriptor, peer) in peers.drain() { if let Some(node_id) = peer.lock().unwrap().their_node_id { log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id); self.message_handler.chan_handler.peer_disconnected(&node_id, false); @@ -1754,7 +1763,7 @@ impl P { let peers_lock = self.peers.read().unwrap(); - for (descriptor, peer_mutex) in peers_lock.peers.iter() { + for (descriptor, peer_mutex) in peers_lock.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() { // The peer needs to complete its handshake before we can exchange messages. We @@ -1778,7 +1787,7 @@ impl P if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick) || peer.awaiting_pong_timer_tick_intervals as u64 > - MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.peers.len() as u64 + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 { descriptors_needing_disconnect.push(descriptor.clone()); continue; @@ -1804,7 +1813,7 @@ impl P { let mut peers_lock = self.peers.write().unwrap(); for descriptor in descriptors_needing_disconnect.iter() { - if let Some(peer) = peers_lock.peers.remove(&descriptor) { + if let Some(peer) = peers_lock.remove(descriptor) { if let Some(node_id) = peer.lock().unwrap().their_node_id { log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id); self.node_id_to_descriptor.lock().unwrap().remove(&node_id); @@ -1837,13 +1846,13 @@ fn is_gossip_msg(type_id: u16) -> bool { #[cfg(test)] mod tests { use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses}; - use ln::msgs; + use ln::{msgs, wire}; use ln::msgs::NetAddress; use util::events; use util::test_utils; use bitcoin::secp256k1::Secp256k1; - use bitcoin::secp256k1::key::{SecretKey, PublicKey}; + use bitcoin::secp256k1::{SecretKey, PublicKey}; use prelude::*; use sync::{Arc, Mutex}; @@ -1934,7 +1943,7 @@ mod tests { let chan_handler = test_utils::TestChannelMessageHandler::new(); let mut peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); let secp_ctx = Secp256k1::new(); let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); @@ -1947,7 +1956,49 @@ mod tests { peers[0].message_handler.chan_handler = &chan_handler; peers[0].process_events(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); + } + + #[test] + fn test_send_simple_msg() { + // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and + // push a message from one peer to another. + let cfgs = create_peermgr_cfgs(2); + let a_chan_handler = test_utils::TestChannelMessageHandler::new(); + let b_chan_handler = test_utils::TestChannelMessageHandler::new(); + let mut peers = create_network(2, &cfgs); + let (fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); + + let secp_ctx = Secp256k1::new(); + let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); + + let msg = msgs::Shutdown { channel_id: [42; 32], scriptpubkey: bitcoin::Script::new() }; + a_chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::SendShutdown { + node_id: their_id, msg: msg.clone() + }); + peers[0].message_handler.chan_handler = &a_chan_handler; + + b_chan_handler.expect_receive_msg(wire::Message::Shutdown(msg)); + peers[1].message_handler.chan_handler = &b_chan_handler; + + peers[0].process_events(); + + let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); + assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false); + } + + #[test] + fn test_disconnect_all_peer() { + // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and + // then calls disconnect_all_peers + let cfgs = create_peermgr_cfgs(2); + let peers = create_network(2, &cfgs); + establish_connection(&peers[0], &peers[1]); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); + + peers[0].disconnect_all_peers(); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); } #[test] @@ -1956,17 +2007,17 @@ mod tests { let cfgs = create_peermgr_cfgs(2); let peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); // peers[0] awaiting_pong is set to true, but the Peer is still connected peers[0].timer_tick_occurred(); peers[0].process_events(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected peers[0].timer_tick_occurred(); peers[0].process_events(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); } #[test] @@ -2028,9 +2079,9 @@ mod tests { peers[0].new_inbound_connection(fd_a.clone(), None).unwrap(); // If we get a single timer tick before completion, that's fine - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); peers[0].timer_tick_occurred(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false); peers[0].process_events(); @@ -2039,7 +2090,7 @@ mod tests { // ...but if we get a second timer tick, we should disconnect the peer peers[0].timer_tick_occurred(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err()); }