Merge pull request #1503 from valentinewallace/2022-05-onion-msgs
[rust-lightning] / lightning / src / ln / peer_handler.rs
index 7408263085c41e8162c038e5d573ee01db15e470..d34fdfeb52a1ddcb887bfd42c4cfcddac8e7d0e9 100644 (file)
 //! Instead of actually servicing sockets ourselves we require that you implement the
 //! SocketDescriptor interface and use that to receive actions which you should perform on the
 //! socket, and call into PeerManager with bytes read from the socket. The PeerManager will then
-//! call into the provided message handlers (probably a ChannelManager and NetGraphmsgHandler) with messages
-//! they should handle, and encoding/sending response messages.
+//! call into the provided message handlers (probably a ChannelManager and P2PGossipSync) with
+//! messages they should handle, and encoding/sending response messages.
 
-use bitcoin::secp256k1::key::{SecretKey,PublicKey};
+use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
 
 use ln::features::InitFeatures;
 use ln::msgs;
@@ -25,15 +25,16 @@ use util::ser::{VecWriter, Writeable, Writer};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
 use ln::wire;
 use ln::wire::Encode;
+use routing::gossip::{NetworkGraph, P2PGossipSync};
 use util::atomic_counter::AtomicCounter;
 use util::events::{MessageSendEvent, MessageSendEventsProvider};
 use util::logger::Logger;
-use routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
 
 use prelude::*;
 use io;
 use alloc::collections::LinkedList;
-use sync::{Arc, Mutex, MutexGuard, RwLock};
+use sync::{Arc, Mutex, MutexGuard, FairRwLock};
+use core::sync::atomic::{AtomicBool, Ordering};
 use core::{cmp, hash, fmt, mem};
 use core::ops::Deref;
 use core::convert::Infallible;
@@ -150,7 +151,7 @@ impl ChannelMessageHandler for ErroringMessageHandler {
        fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
                ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
        }
-       fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
+       fn handle_channel_ready(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReady) {
                ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
        }
        fn handle_shutdown(&self, their_node_id: &PublicKey, _their_features: &InitFeatures, msg: &msgs::Shutdown) {
@@ -207,10 +208,9 @@ pub struct MessageHandler<CM: Deref, RM: Deref> where
        /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
        pub chan_handler: CM,
        /// A message handler which handles messages updating our knowledge of the network channel
-       /// graph. Usually this is just a [`NetGraphMsgHandler`] object or an
-       /// [`IgnoringMessageHandler`].
+       /// graph. Usually this is just a [`P2PGossipSync`] object or an [`IgnoringMessageHandler`].
        ///
-       /// [`NetGraphMsgHandler`]: crate::routing::network_graph::NetGraphMsgHandler
+       /// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync
        pub route_handler: RM,
 }
 
@@ -257,8 +257,13 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
 /// descriptor.
 #[derive(Clone)]
 pub struct PeerHandleError {
-       /// Used to indicate that we probably can't make any future connections to this peer, implying
-       /// we should go ahead and force-close any channels we have with it.
+       /// Used to indicate that we probably can't make any future connections to this peer (e.g.
+       /// because we required features that our peer was missing, or vice versa).
+       ///
+       /// While LDK's [`ChannelManager`] will not do it automatically, you likely wish to force-close
+       /// any channels with this peer or check for new versions of LDK.
+       ///
+       /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
        pub no_connection_possible: bool,
 }
 impl fmt::Debug for PeerHandleError {
@@ -375,14 +380,6 @@ impl Peer {
        }
 }
 
-struct PeerHolder<Descriptor: SocketDescriptor> {
-       /// Peer is under its own mutex for sending and receiving bytes, but note that we do *not* hold
-       /// this mutex while we're processing a message. This is fine as [`PeerManager::read_event`]
-       /// requires that there be no parallel calls for a given peer, so mutual exclusion of messages
-       /// handed to the `MessageHandler`s for a given peer is already guaranteed.
-       peers: HashMap<Descriptor, Mutex<Peer>>,
-}
-
 /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
 /// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
 /// lifetimes). Other times you can afford a reference, which is more efficient, in which case
@@ -390,7 +387,7 @@ struct PeerHolder<Descriptor: SocketDescriptor> {
 /// issues such as overly long function definitions.
 ///
 /// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<NetGraphMsgHandler<Arc<NetworkGraph>, Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
 
 /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
 /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
@@ -400,7 +397,7 @@ pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArc
 /// helps with issues such as long function definitions.
 ///
 /// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e NetGraphMsgHandler<&'g NetworkGraph, &'h C, &'f L>, &'f L, IgnoringMessageHandler>;
+pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'f L, IgnoringMessageHandler>;
 
 /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
 /// socket events into messages which it passes on to its [`MessageHandler`].
@@ -427,7 +424,15 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
                L::Target: Logger,
                CMH::Target: CustomMessageHandler {
        message_handler: MessageHandler<CM, RM>,
-       peers: RwLock<PeerHolder<Descriptor>>,
+       /// Connection state for each connected peer - we have an outer read-write lock which is taken
+       /// as read while we're doing processing for a peer and taken write when a peer is being added
+       /// or removed.
+       ///
+       /// The inner Peer lock is held for sending and receiving bytes, but note that we do *not* hold
+       /// it while we're processing a message. This is fine as [`PeerManager::read_event`] requires
+       /// that there be no parallel calls for a given peer, so mutual exclusion of messages handed to
+       /// the `MessageHandler`s for a given peer is already guaranteed.
+       peers: FairRwLock<HashMap<Descriptor, Mutex<Peer>>>,
        /// Only add to this set when noise completes.
        /// Locked *after* peers. When an item is removed, it must be removed with the `peers` write
        /// lock held. Entries may be added with only the `peers` read lock held (though the
@@ -437,6 +442,11 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
        /// `peers` write lock to do so, so instead we block on this empty mutex when entering
        /// `process_events`.
        event_processing_lock: Mutex<()>,
+       /// Because event processing is global and always does all available work before returning,
+       /// there is no reason for us to have many event processors waiting on the lock at once.
+       /// Instead, we limit the total blocked event processors to always exactly one by setting this
+       /// when an event process call is waiting.
+       blocked_event_processors: AtomicBool,
        our_node_secret: SecretKey,
        ephemeral_key_midstate: Sha256Engine,
        custom_message_handler: CMH,
@@ -444,6 +454,7 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
        peer_counter: AtomicCounter,
 
        logger: L,
+       secp_ctx: Secp256k1<secp256k1::SignOnly>
 }
 
 enum MessageHandlingError {
@@ -562,18 +573,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                let mut ephemeral_key_midstate = Sha256::engine();
                ephemeral_key_midstate.input(ephemeral_random_data);
 
+               let mut secp_ctx = Secp256k1::signing_only();
+               let ephemeral_hash = Sha256::from_engine(ephemeral_key_midstate.clone()).into_inner();
+               secp_ctx.seeded_randomize(&ephemeral_hash);
+
                PeerManager {
                        message_handler,
-                       peers: RwLock::new(PeerHolder {
-                               peers: HashMap::new(),
-                       }),
+                       peers: FairRwLock::new(HashMap::new()),
                        node_id_to_descriptor: Mutex::new(HashMap::new()),
                        event_processing_lock: Mutex::new(()),
+                       blocked_event_processors: AtomicBool::new(false),
                        our_node_secret,
                        ephemeral_key_midstate,
                        peer_counter: AtomicCounter::new(),
                        logger,
                        custom_message_handler,
+                       secp_ctx,
                }
        }
 
@@ -584,7 +599,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// completed and we are sure the remote peer has the private key for the given node_id.
        pub fn get_peer_node_ids(&self) -> Vec<PublicKey> {
                let peers = self.peers.read().unwrap();
-               peers.peers.values().filter_map(|peer_mutex| {
+               peers.values().filter_map(|peer_mutex| {
                        let p = peer_mutex.lock().unwrap();
                        if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() {
                                return None;
@@ -607,8 +622,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// peer using the init message.
        /// The user should pass the remote network address of the host they are connected to.
        ///
-       /// Note that if an Err is returned here you MUST NOT call socket_disconnected for the new
-       /// descriptor but must disconnect the connection immediately.
+       /// If an `Err` is returned here you must disconnect the connection immediately.
        ///
        /// Returns a small number of bytes to send to the remote node (currently always 50).
        ///
@@ -618,11 +632,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// [`socket_disconnected()`]: PeerManager::socket_disconnected
        pub fn new_outbound_connection(&self, their_node_id: PublicKey, descriptor: Descriptor, remote_network_address: Option<NetAddress>) -> Result<Vec<u8>, PeerHandleError> {
                let mut peer_encryptor = PeerChannelEncryptor::new_outbound(their_node_id.clone(), self.get_ephemeral_key());
-               let res = peer_encryptor.get_act_one().to_vec();
+               let res = peer_encryptor.get_act_one(&self.secp_ctx).to_vec();
                let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes
 
                let mut peers = self.peers.write().unwrap();
-               if peers.peers.insert(descriptor, Mutex::new(Peer {
+               if peers.insert(descriptor, Mutex::new(Peer {
                        channel_encryptor: peer_encryptor,
                        their_node_id: None,
                        their_features: None,
@@ -656,20 +670,19 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// The user should pass the remote network address of the host they are connected to.
        ///
        /// May refuse the connection by returning an Err, but will never write bytes to the remote end
-       /// (outbound connector always speaks first). Note that if an Err is returned here you MUST NOT
-       /// call socket_disconnected for the new descriptor but must disconnect the connection
-       /// immediately.
+       /// (outbound connector always speaks first). If an `Err` is returned here you must disconnect
+       /// the connection immediately.
        ///
        /// Panics if descriptor is duplicative with some other descriptor which has not yet been
        /// [`socket_disconnected()`].
        ///
        /// [`socket_disconnected()`]: PeerManager::socket_disconnected
        pub fn new_inbound_connection(&self, descriptor: Descriptor, remote_network_address: Option<NetAddress>) -> Result<(), PeerHandleError> {
-               let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.our_node_secret);
+               let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.our_node_secret, &self.secp_ctx);
                let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes
 
                let mut peers = self.peers.write().unwrap();
-               if peers.peers.insert(descriptor, Mutex::new(Peer {
+               if peers.insert(descriptor, Mutex::new(Peer {
                        channel_encryptor: peer_encryptor,
                        their_node_id: None,
                        their_features: None,
@@ -780,7 +793,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
        pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> {
                let peers = self.peers.read().unwrap();
-               match peers.peers.get(descriptor) {
+               match peers.get(descriptor) {
                        None => {
                                // This is most likely a simple race condition where the user found that the socket
                                // was writeable, then we told the user to `disconnect_socket()`, then they called
@@ -845,7 +858,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                let peers = self.peers.read().unwrap();
                let mut msgs_to_forward = Vec::new();
                let mut peer_node_id = None;
-               match peers.peers.get(peer_descriptor) {
+               match peers.get(peer_descriptor) {
                        None => {
                                // This is most likely a simple race condition where the user read some bytes
                                // from the socket, then we told the user to `disconnect_socket()`, then they
@@ -930,14 +943,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                let next_step = peer.channel_encryptor.get_noise_step();
                                                match next_step {
                                                        NextNoiseStep::ActOne => {
-                                                               let act_two = try_potential_handleerror!(peer,
-                                                                       peer.channel_encryptor.process_act_one_with_keys(&peer.pending_read_buffer[..], &self.our_node_secret, self.get_ephemeral_key())).to_vec();
+                                                               let act_two = try_potential_handleerror!(peer, peer.channel_encryptor
+                                                                       .process_act_one_with_keys(&peer.pending_read_buffer[..],
+                                                                               &self.our_node_secret, self.get_ephemeral_key(), &self.secp_ctx)).to_vec();
                                                                peer.pending_outbound_buffer.push_back(act_two);
                                                                peer.pending_read_buffer = [0; 66].to_vec(); // act three is 66 bytes long
                                                        },
                                                        NextNoiseStep::ActTwo => {
                                                                let (act_three, their_node_id) = try_potential_handleerror!(peer,
-                                                                       peer.channel_encryptor.process_act_two(&peer.pending_read_buffer[..], &self.our_node_secret));
+                                                                       peer.channel_encryptor.process_act_two(&peer.pending_read_buffer[..],
+                                                                               &self.our_node_secret, &self.secp_ctx));
                                                                peer.pending_outbound_buffer.push_back(act_three.to_vec());
                                                                peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes
                                                                peer.pending_read_is_header = true;
@@ -965,7 +980,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                                if peer.pending_read_is_header {
                                                                        let msg_len = try_potential_handleerror!(peer,
                                                                                peer.channel_encryptor.decrypt_length_header(&peer.pending_read_buffer[..]));
-                                                                       peer.pending_read_buffer = Vec::with_capacity(msg_len as usize + 16);
+                                                                       if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
                                                                        peer.pending_read_buffer.resize(msg_len as usize + 16, 0);
                                                                        if msg_len < 2 { // Need at least the message type tag
                                                                                return Err(PeerHandleError{ no_connection_possible: false });
@@ -977,7 +992,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                                        assert!(msg_data.len() >= 2);
 
                                                                        // Reset read buffer
-                                                                       peer.pending_read_buffer = [0; 18].to_vec();
+                                                                       if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
+                                                                       peer.pending_read_buffer.resize(18, 0);
                                                                        peer.pending_read_is_header = true;
 
                                                                        let mut reader = io::Cursor::new(&msg_data[..]);
@@ -1190,8 +1206,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                        wire::Message::FundingSigned(msg) => {
                                self.message_handler.chan_handler.handle_funding_signed(&their_node_id, &msg);
                        },
-                       wire::Message::FundingLocked(msg) => {
-                               self.message_handler.chan_handler.handle_funding_locked(&their_node_id, &msg);
+                       wire::Message::ChannelReady(msg) => {
+                               self.message_handler.chan_handler.handle_channel_ready(&their_node_id, &msg);
                        },
 
                        wire::Message::Shutdown(msg) => {
@@ -1280,13 +1296,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                Ok(should_forward)
        }
 
-       fn forward_broadcast_msg(&self, peers: &PeerHolder<Descriptor>, msg: &wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
+       fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
                match msg {
                        wire::Message::ChannelAnnouncement(ref msg) => {
                                log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
                                let encoded_msg = encode_msg!(msg);
 
-                               for (_, peer_mutex) in peers.peers.iter() {
+                               for (_, peer_mutex) in peers.iter() {
                                        let mut peer = peer_mutex.lock().unwrap();
                                        if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
@@ -1312,7 +1328,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg);
                                let encoded_msg = encode_msg!(msg);
 
-                               for (_, peer_mutex) in peers.peers.iter() {
+                               for (_, peer_mutex) in peers.iter() {
                                        let mut peer = peer_mutex.lock().unwrap();
                                        if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
                                                        !peer.should_forward_node_announcement(msg.contents.node_id) {
@@ -1337,7 +1353,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                log_gossip!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg);
                                let encoded_msg = encode_msg!(msg);
 
-                               for (_, peer_mutex) in peers.peers.iter() {
+                               for (_, peer_mutex) in peers.iter() {
                                        let mut peer = peer_mutex.lock().unwrap();
                                        if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
@@ -1369,11 +1385,34 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
        /// or one of the other clients provided in our language bindings.
        ///
+       /// Note that if there are any other calls to this function waiting on lock(s) this may return
+       /// without doing any work. All available events that need handling will be handled before the
+       /// other calls return.
+       ///
        /// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
        /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
        /// [`send_data`]: SocketDescriptor::send_data
        pub fn process_events(&self) {
-               let _single_processor_lock = self.event_processing_lock.lock().unwrap();
+               let mut _single_processor_lock = self.event_processing_lock.try_lock();
+               if _single_processor_lock.is_err() {
+                       // While we could wake the older sleeper here with a CV and make more even waiting
+                       // times, that would be a lot of overengineering for a simple "reduce total waiter
+                       // count" goal.
+                       match self.blocked_event_processors.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) {
+                               Err(val) => {
+                                       debug_assert!(val, "compare_exchange failed spuriously?");
+                                       return;
+                               },
+                               Ok(val) => {
+                                       debug_assert!(!val, "compare_exchange succeeded spuriously?");
+                                       // We're the only waiter, as the running process_events may have emptied the
+                                       // pending events "long" ago and there are new events for us to process, wait until
+                                       // its done and process any leftover events before returning.
+                                       _single_processor_lock = Ok(self.event_processing_lock.lock().unwrap());
+                                       self.blocked_event_processors.store(false, Ordering::Release);
+                               }
+                       }
+               }
 
                let mut peers_to_disconnect = HashMap::new();
                let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
@@ -1395,7 +1434,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                }
                                                let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
                                                match descriptor_opt {
-                                                       Some(descriptor) => match peers.peers.get(&descriptor) {
+                                                       Some(descriptor) => match peers.get(&descriptor) {
                                                                Some(peer_mutex) => {
                                                                        let peer_lock = peer_mutex.lock().unwrap();
                                                                        if peer_lock.their_features.is_none() {
@@ -1444,8 +1483,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                                log_bytes!(msg.channel_id));
                                                self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                        },
-                                       MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
+                                       MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
                                                self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
@@ -1594,7 +1633,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
                        }
 
-                       for (descriptor, peer_mutex) in peers.peers.iter() {
+                       for (descriptor, peer_mutex) in peers.iter() {
                                self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
                        }
                }
@@ -1608,7 +1647,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                // lock).
 
                                if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
-                                       if let Some(peer_mutex) = peers.peers.remove(&descriptor) {
+                                       if let Some(peer_mutex) = peers.remove(&descriptor) {
                                                if let Some(msg) = msg {
                                                        log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
                                                                        log_pubkey!(node_id),
@@ -1636,7 +1675,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
        fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
                let mut peers = self.peers.write().unwrap();
-               let peer_option = peers.peers.remove(descriptor);
+               let peer_option = peers.remove(descriptor);
                match peer_option {
                        None => {
                                // This is most likely a simple race condition where the user found that the socket
@@ -1645,15 +1684,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                        },
                        Some(peer_lock) => {
                                let peer = peer_lock.lock().unwrap();
-                               match peer.their_node_id {
-                                       Some(node_id) => {
-                                               log_trace!(self.logger,
-                                                       "Handling disconnection of peer {}, with {}future connection to the peer possible.",
-                                                       log_pubkey!(node_id), if no_connection_possible { "no " } else { "" });
-                                               self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
-                                               self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
-                                       },
-                                       None => {}
+                               if let Some(node_id) = peer.their_node_id {
+                                       log_trace!(self.logger,
+                                               "Handling disconnection of peer {}, with {}future connection to the peer possible.",
+                                               log_pubkey!(node_id), if no_connection_possible { "no " } else { "" });
+                                       self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
+                                       self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
                                }
                        }
                };
@@ -1672,7 +1708,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                let mut peers_lock = self.peers.write().unwrap();
                if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
                        log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id);
-                       peers_lock.peers.remove(&descriptor);
+                       peers_lock.remove(&descriptor);
                        self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
                        descriptor.disconnect_socket();
                }
@@ -1685,7 +1721,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                let mut peers_lock = self.peers.write().unwrap();
                self.node_id_to_descriptor.lock().unwrap().clear();
                let peers = &mut *peers_lock;
-               for (mut descriptor, peer) in peers.peers.drain() {
+               for (mut descriptor, peer) in peers.drain() {
                        if let Some(node_id) = peer.lock().unwrap().their_node_id {
                                log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
                                self.message_handler.chan_handler.peer_disconnected(&node_id, false);
@@ -1724,7 +1760,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                {
                        let peers_lock = self.peers.read().unwrap();
 
-                       for (descriptor, peer_mutex) in peers_lock.peers.iter() {
+                       for (descriptor, peer_mutex) in peers_lock.iter() {
                                let mut peer = peer_mutex.lock().unwrap();
                                if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
                                        // The peer needs to complete its handshake before we can exchange messages. We
@@ -1748,7 +1784,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
                                if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
                                        || peer.awaiting_pong_timer_tick_intervals as u64 >
-                                               MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.peers.len() as u64
+                                               MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64
                                {
                                        descriptors_needing_disconnect.push(descriptor.clone());
                                        continue;
@@ -1774,7 +1810,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                        {
                                let mut peers_lock = self.peers.write().unwrap();
                                for descriptor in descriptors_needing_disconnect.iter() {
-                                       if let Some(peer) = peers_lock.peers.remove(&descriptor) {
+                                       if let Some(peer) = peers_lock.remove(descriptor) {
                                                if let Some(node_id) = peer.lock().unwrap().their_node_id {
                                                        log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id);
                                                        self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
@@ -1807,13 +1843,13 @@ fn is_gossip_msg(type_id: u16) -> bool {
 #[cfg(test)]
 mod tests {
        use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses};
-       use ln::msgs;
+       use ln::{msgs, wire};
        use ln::msgs::NetAddress;
        use util::events;
        use util::test_utils;
 
        use bitcoin::secp256k1::Secp256k1;
-       use bitcoin::secp256k1::key::{SecretKey, PublicKey};
+       use bitcoin::secp256k1::{SecretKey, PublicKey};
 
        use prelude::*;
        use sync::{Arc, Mutex};
@@ -1888,11 +1924,18 @@ mod tests {
                peer_a.new_inbound_connection(fd_a.clone(), None).unwrap();
                assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
                peer_a.process_events();
-               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
                peer_b.process_events();
-               assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_a.read_event(&mut fd_a, &b_data).unwrap(), false);
+
                peer_a.process_events();
-               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
                (fd_a.clone(), fd_b.clone())
        }
 
@@ -1904,7 +1947,7 @@ mod tests {
                let chan_handler = test_utils::TestChannelMessageHandler::new();
                let mut peers = create_network(2, &cfgs);
                establish_connection(&peers[0], &peers[1]);
-               assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
+               assert_eq!(peers[0].peers.read().unwrap().len(), 1);
 
                let secp_ctx = Secp256k1::new();
                let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
@@ -1917,7 +1960,49 @@ mod tests {
                peers[0].message_handler.chan_handler = &chan_handler;
 
                peers[0].process_events();
-               assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0);
+               assert_eq!(peers[0].peers.read().unwrap().len(), 0);
+       }
+
+       #[test]
+       fn test_send_simple_msg() {
+               // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
+               // push a message from one peer to another.
+               let cfgs = create_peermgr_cfgs(2);
+               let a_chan_handler = test_utils::TestChannelMessageHandler::new();
+               let b_chan_handler = test_utils::TestChannelMessageHandler::new();
+               let mut peers = create_network(2, &cfgs);
+               let (fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
+               assert_eq!(peers[0].peers.read().unwrap().len(), 1);
+
+               let secp_ctx = Secp256k1::new();
+               let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
+
+               let msg = msgs::Shutdown { channel_id: [42; 32], scriptpubkey: bitcoin::Script::new() };
+               a_chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::SendShutdown {
+                       node_id: their_id, msg: msg.clone()
+               });
+               peers[0].message_handler.chan_handler = &a_chan_handler;
+
+               b_chan_handler.expect_receive_msg(wire::Message::Shutdown(msg));
+               peers[1].message_handler.chan_handler = &b_chan_handler;
+
+               peers[0].process_events();
+
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
+       }
+
+       #[test]
+       fn test_disconnect_all_peer() {
+               // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
+               // then calls disconnect_all_peers
+               let cfgs = create_peermgr_cfgs(2);
+               let peers = create_network(2, &cfgs);
+               establish_connection(&peers[0], &peers[1]);
+               assert_eq!(peers[0].peers.read().unwrap().len(), 1);
+
+               peers[0].disconnect_all_peers();
+               assert_eq!(peers[0].peers.read().unwrap().len(), 0);
        }
 
        #[test]
@@ -1926,17 +2011,17 @@ mod tests {
                let cfgs = create_peermgr_cfgs(2);
                let peers = create_network(2, &cfgs);
                establish_connection(&peers[0], &peers[1]);
-               assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
+               assert_eq!(peers[0].peers.read().unwrap().len(), 1);
 
                // peers[0] awaiting_pong is set to true, but the Peer is still connected
                peers[0].timer_tick_occurred();
                peers[0].process_events();
-               assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
+               assert_eq!(peers[0].peers.read().unwrap().len(), 1);
 
                // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected
                peers[0].timer_tick_occurred();
                peers[0].process_events();
-               assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0);
+               assert_eq!(peers[0].peers.read().unwrap().len(), 0);
        }
 
        #[test]
@@ -1998,20 +2083,22 @@ mod tests {
                peers[0].new_inbound_connection(fd_a.clone(), None).unwrap();
 
                // If we get a single timer tick before completion, that's fine
-               assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
+               assert_eq!(peers[0].peers.read().unwrap().len(), 1);
                peers[0].timer_tick_occurred();
-               assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1);
+               assert_eq!(peers[0].peers.read().unwrap().len(), 1);
 
                assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
                peers[0].process_events();
-               assert_eq!(peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
                peers[1].process_events();
 
                // ...but if we get a second timer tick, we should disconnect the peer
                peers[0].timer_tick_occurred();
-               assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0);
+               assert_eq!(peers[0].peers.read().unwrap().len(), 0);
 
-               assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err());
+               let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+               assert!(peers[0].read_event(&mut fd_a, &b_data).is_err());
        }
 
        #[test]