Add logging for (dis)connect in peer_handler/channelmanager
[rust-lightning] / src / ln / peer_handler.rs
index 81ef41cf789d05aff4f25f659e97f1716e324d5d..5d13df95c208853bea8202c4dbff34217e374357 100644 (file)
@@ -12,13 +12,13 @@ use ln::msgs;
 use util::ser::{Writeable, Writer, Readable};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
 use util::byte_utils;
-use util::events::{EventsProvider,Event};
+use util::events::{MessageSendEvent};
 use util::logger::Logger;
 
-use std::collections::{HashMap,hash_map,LinkedList};
+use std::collections::{HashMap,hash_map,HashSet,LinkedList};
 use std::sync::{Arc, Mutex};
 use std::sync::atomic::{AtomicUsize, Ordering};
-use std::{cmp,error,mem,hash,fmt};
+use std::{cmp,error,hash,fmt};
 
 /// Provides references to trait impls which handle different types of messages.
 pub struct MessageHandler {
@@ -106,17 +106,22 @@ struct Peer {
 
 struct PeerHolder<Descriptor: SocketDescriptor> {
        peers: HashMap<Descriptor, Peer>,
+       /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
+       /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
+       peers_needing_send: HashSet<Descriptor>,
        /// Only add to this set when noise completes:
        node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
 }
 struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> {
        peers: &'a mut HashMap<Descriptor, Peer>,
+       peers_needing_send: &'a mut HashSet<Descriptor>,
        node_id_to_descriptor: &'a mut HashMap<PublicKey, Descriptor>,
 }
 impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
        fn borrow_parts(&mut self) -> MutPeerHolder<Descriptor> {
                MutPeerHolder {
                        peers: &mut self.peers,
+                       peers_needing_send: &mut self.peers_needing_send,
                        node_id_to_descriptor: &mut self.node_id_to_descriptor,
                }
        }
@@ -127,7 +132,6 @@ impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
 pub struct PeerManager<Descriptor: SocketDescriptor> {
        message_handler: MessageHandler,
        peers: Mutex<PeerHolder<Descriptor>>,
-       pending_events: Mutex<Vec<Event>>,
        our_node_secret: SecretKey,
        initial_syncs_sent: AtomicUsize,
        logger: Arc<Logger>,
@@ -163,8 +167,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc<Logger>) -> PeerManager<Descriptor> {
                PeerManager {
                        message_handler: message_handler,
-                       peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }),
-                       pending_events: Mutex::new(Vec::new()),
+                       peers: Mutex::new(PeerHolder {
+                               peers: HashMap::new(),
+                               peers_needing_send: HashSet::new(),
+                               node_id_to_descriptor: HashMap::new()
+                       }),
                        our_node_secret: our_node_secret,
                        initial_syncs_sent: AtomicUsize::new(0),
                        logger,
@@ -190,7 +197,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        /// Note that if an Err is returned here you MUST NOT call disconnect_event for the new
        /// descriptor but must disconnect the connection immediately.
        ///
-       /// Returns some bytes to send to the remote node.
+       /// Returns a small number of bytes to send to the remote node (currently always 50).
        ///
        /// Panics if descriptor is duplicative with some other descriptor which has not yet has a
        /// disconnect_event.
@@ -300,16 +307,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        ///
        /// May return an Err to indicate that the connection should be closed.
        ///
-       /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into
-       /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The
-       /// invariants around calling write_event in case a write did not fully complete must still
-       /// hold. Note that this function will often call send_data on many peers before returning, not
-       /// just this peer!
+       /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity.
+       /// Thus, however, you almost certainly want to call process_events() after any read_event to
+       /// generate send_data calls to handle responses.
        ///
        /// If Ok(true) is returned, further read_events should not be triggered until a write_event on
-       /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note
-       /// that this must be true even if a send_data call with resume_read=true was made during the
-       /// course of this function!
+       /// this file descriptor has resume_read set (preventing DoS issues in the send buffer).
        ///
        /// Panics if the descriptor was not previously registered in a new_*_connection event.
        pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
@@ -349,6 +352,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                        {
                                                                                log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
                                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
+                                                                               peers.peers_needing_send.insert(peer_descriptor.clone());
                                                                        }
                                                                }
                                                        }
@@ -413,10 +417,14 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                () => {
                                                                        match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) {
                                                                                hash_map::Entry::Occupied(_) => {
+                                                                                       log_trace!(self, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap()));
                                                                                        peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
                                                                                        return Err(PeerHandleError{ no_connection_possible: false })
                                                                                },
-                                                                               hash_map::Entry::Vacant(entry) => entry.insert(peer_descriptor.clone()),
+                                                                               hash_map::Entry::Vacant(entry) => {
+                                                                                       log_trace!(self, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap()));
+                                                                                       entry.insert(peer_descriptor.clone())
+                                                                               },
                                                                        };
                                                                }
                                                        }
@@ -473,6 +481,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                log_trace!(self, "Received message of type {} from {}", msg_type, log_pubkey!(peer.their_node_id.unwrap()));
                                                                                if msg_type != 16 && peer.their_global_features.is_none() {
                                                                                        // Need an init message as first message
+                                                                                       log_trace!(self, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
                                                                                        return Err(PeerHandleError{ no_connection_possible: false });
                                                                                }
                                                                                let mut reader = ::std::io::Cursor::new(&msg_data[2..]);
@@ -522,9 +531,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                                        }, 16);
                                                                                                }
 
-                                                                                               for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) {
-                                                                                                       encode_and_send_msg!(msg, 136);
-                                                                                               }
+                                                                                               self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap());
                                                                                        },
                                                                                        17 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader));
@@ -561,8 +568,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                        // Channel control:
                                                                                        32 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::OpenChannel::read(&mut reader));
-                                                                                               let resp = try_potential_handleerror!(self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), &msg));
-                                                                                               encode_and_send_msg!(resp, 33);
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        33 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::AcceptChannel::read(&mut reader));
@@ -571,8 +577,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
                                                                                        34 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::FundingCreated::read(&mut reader));
-                                                                                               let resp = try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg));
-                                                                                               encode_and_send_msg!(resp, 35);
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        35 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::FundingSigned::read(&mut reader));
@@ -580,29 +585,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                        },
                                                                                        36 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::FundingLocked::read(&mut reader));
-                                                                                               let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg));
-                                                                                               match resp_option {
-                                                                                                       Some(resp) => encode_and_send_msg!(resp, 259),
-                                                                                                       None => {},
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
 
                                                                                        38 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader));
-                                                                                               let resp_options = try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(resp) = resp_options.0 {
-                                                                                                       encode_and_send_msg!(resp, 38);
-                                                                                               }
-                                                                                               if let Some(resp) = resp_options.1 {
-                                                                                                       encode_and_send_msg!(resp, 39);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        39 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader));
-                                                                                               let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(resp) = resp_option {
-                                                                                                       encode_and_send_msg!(resp, 39);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
 
                                                                                        128 => {
@@ -624,33 +616,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
                                                                                        132 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::CommitmentSigned::read(&mut reader));
-                                                                                               let resps = try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg));
-                                                                                               encode_and_send_msg!(resps.0, 133);
-                                                                                               if let Some(resp) = resps.1 {
-                                                                                                       encode_and_send_msg!(resp, 132);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        133 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::RevokeAndACK::read(&mut reader));
-                                                                                               let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg));
-                                                                                               match resp_option {
-                                                                                                       Some(resps) => {
-                                                                                                               for resp in resps.update_add_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 128);
-                                                                                                               }
-                                                                                                               for resp in resps.update_fulfill_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 130);
-                                                                                                               }
-                                                                                                               for resp in resps.update_fail_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 131);
-                                                                                                               }
-                                                                                                               if let Some(resp) = resps.update_fee {
-                                                                                                                       encode_and_send_msg!(resp, 134);
-                                                                                                               }
-                                                                                                               encode_and_send_msg!(resps.commitment_signed, 132);
-                                                                                                       },
-                                                                                                       None => {},
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        134 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::UpdateFee::read(&mut reader));
@@ -658,31 +628,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                        },
                                                                                        136 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ChannelReestablish::read(&mut reader));
-                                                                                               let (funding_locked, revoke_and_ack, commitment_update) = try_potential_handleerror!(self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(lock_msg) = funding_locked {
-                                                                                                       encode_and_send_msg!(lock_msg, 36);
-                                                                                               }
-                                                                                               if let Some(revoke_msg) = revoke_and_ack {
-                                                                                                       encode_and_send_msg!(revoke_msg, 133);
-                                                                                               }
-                                                                                               match commitment_update {
-                                                                                                       Some(resps) => {
-                                                                                                               for resp in resps.update_add_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 128);
-                                                                                                               }
-                                                                                                               for resp in resps.update_fulfill_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 130);
-                                                                                                               }
-                                                                                                               for resp in resps.update_fail_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 131);
-                                                                                                               }
-                                                                                                               if let Some(resp) = resps.update_fee {
-                                                                                                                       encode_and_send_msg!(resp, 134);
-                                                                                                               }
-                                                                                                               encode_and_send_msg!(resps.commitment_signed, 132);
-                                                                                                       },
-                                                                                                       None => {},
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
 
                                                                                        // Routing control:
@@ -735,22 +681,21 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        pause_read
                };
 
-               self.process_events();
-
                Ok(pause_read)
        }
 
-       /// Checks for any events generated by our handlers and processes them. May be needed after eg
-       /// calls to ChannelManager::process_pending_htlc_forward.
+       /// Checks for any events generated by our handlers and processes them. Includes sending most
+       /// response messages as well as messages generated by calls to handler functions directly (eg
+       /// functions like ChannelManager::process_pending_htlc_forward or send_payment).
        pub fn process_events(&self) {
-               let mut upstream_events = Vec::new();
                {
                        // TODO: There are some DoS attacks here where you can flood someone's outbound send
                        // buffer by doing things like announcing channels on another node. We should be willing to
                        // drop optional-ish messages when send buffers get full!
 
-                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_events();
-                       let mut peers = self.peers.lock().unwrap();
+                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+                       let mut peers_lock = self.peers.lock().unwrap();
+                       let peers = peers_lock.borrow_parts();
                        for event in events_generated.drain(..) {
                                macro_rules! get_peer_for_forwarding {
                                        ($node_id: expr, $handle_no_such_peer: block) => {
@@ -776,14 +721,17 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                        }
                                }
                                match event {
-                                       Event::FundingGenerationReady {..} => { /* Hand upstream */ },
-                                       Event::FundingBroadcastSafe {..} => { /* Hand upstream */ },
-                                       Event::PaymentReceived {..} => { /* Hand upstream */ },
-                                       Event::PaymentSent {..} => { /* Hand upstream */ },
-                                       Event::PaymentFailed {..} => { /* Hand upstream */ },
-                                       Event::PendingHTLCsForwardable {..} => { /* Hand upstream */ },
-
-                                       Event::SendOpenChannel { ref node_id, ref msg } => {
+                                       MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.temporary_channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id));
@@ -792,9 +740,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::SendFundingCreated { ref node_id, ref msg } => {
+                                       MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id),
@@ -805,25 +752,40 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
-                                               log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {}{} for channel {}",
+                                       MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
+                                                               //they should just throw away this funding transaction
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
-                                                               if announcement_sigs.is_some() { " with announcement sigs" } else { "" },
                                                                log_bytes!(msg.channel_id));
                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36)));
-                                               match announcement_sigs {
-                                                       &Some(ref announce_msg) => peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(announce_msg, 259))),
-                                                       &None => {},
-                                               }
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                                       MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
+                                                               //they should just throw away this funding transaction
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                                log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
                                                                log_pubkey!(node_id),
                                                                update_add_htlcs.len(),
@@ -850,9 +812,28 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                }
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::SendShutdown { ref node_id, ref msg } => {
+                                       MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
@@ -861,9 +842,18 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
+                                       MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                                log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
                                                        let encoded_msg = encode_msg!(msg, 256);
@@ -886,9 +876,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
                                                        }
                                                }
-                                               continue;
                                        },
-                                       Event::BroadcastChannelUpdate { ref msg } => {
+                                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                                log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
                                                        let encoded_msg = encode_msg!(msg, 258);
@@ -901,17 +890,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
                                                        }
                                                }
-                                               continue;
                                        },
-                                       Event::PaymentFailureNetworkUpdate { ref update } => {
+                                       MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => {
                                                self.message_handler.route_handler.handle_htlc_fail_channel_update(update);
-                                               continue;
                                        },
-                                       Event::HandleError { ref node_id, ref action } => {
+                                       MessageSendEvent::HandleError { ref node_id, ref action } => {
                                                if let Some(ref action) = *action {
                                                        match *action {
                                                                msgs::ErrorAction::DisconnectPeer { ref msg } => {
                                                                        if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) {
+                                                                               peers.peers_needing_send.remove(&descriptor);
                                                                                if let Some(mut peer) = peers.peers.remove(&descriptor) {
                                                                                        if let Some(ref msg) = *msg {
                                                                                                log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
@@ -929,9 +917,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                self.message_handler.chan_handler.peer_disconnected(&node_id, false);
                                                                        }
                                                                },
-                                                               msgs::ErrorAction::IgnoreError => {
-                                                                       continue;
-                                                               },
+                                                               msgs::ErrorAction::IgnoreError => {},
                                                                msgs::ErrorAction::SendErrorMessage { ref msg } => {
                                                                        log_trace!(self, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
                                                                                        log_pubkey!(node_id),
@@ -946,17 +932,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                } else {
                                                        log_error!(self, "Got no-action HandleError Event in peer_handler for node {}, no such events should ever be generated!", log_pubkey!(node_id));
                                                }
-                                               continue;
                                        }
                                }
-
-                               upstream_events.push(event);
                        }
-               }
 
-               let mut pending_events = self.pending_events.lock().unwrap();
-               for event in upstream_events.drain(..) {
-                       pending_events.push(event);
+                       for mut descriptor in peers.peers_needing_send.drain() {
+                               match peers.peers.get_mut(&descriptor) {
+                                       Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer),
+                                       None => panic!("Inconsistent peers set state!"),
+                               }
+                       }
                }
        }
 
@@ -972,6 +957,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
        fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
                let mut peers = self.peers.lock().unwrap();
+               peers.peers_needing_send.remove(descriptor);
                let peer_option = peers.peers.remove(descriptor);
                match peer_option {
                        None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),
@@ -988,15 +974,6 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        }
 }
 
-impl<Descriptor: SocketDescriptor> EventsProvider for PeerManager<Descriptor> {
-       fn get_and_clear_pending_events(&self) -> Vec<Event> {
-               let mut pending_events = self.pending_events.lock().unwrap();
-               let mut ret = Vec::new();
-               mem::swap(&mut ret, &mut *pending_events);
-               ret
-       }
-}
-
 #[cfg(test)]
 mod tests {
        use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
@@ -1068,7 +1045,7 @@ mod tests {
                let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
 
                let chan_handler = test_utils::TestChannelMessageHandler::new();
-               chan_handler.pending_events.lock().unwrap().push(events::Event::HandleError {
+               chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError {
                        node_id: their_id,
                        action: Some(msgs::ErrorAction::DisconnectPeer { msg: None }),
                });