Send channel_reestablish out-of-band to ensure ordered deliver
[rust-lightning] / src / ln / peer_handler.rs
index 81ef41cf789d05aff4f25f659e97f1716e324d5d..4471ea0025807d4ad63c71338785ca6b6897cffe 100644 (file)
@@ -12,13 +12,13 @@ use ln::msgs;
 use util::ser::{Writeable, Writer, Readable};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
 use util::byte_utils;
-use util::events::{EventsProvider,Event};
+use util::events::{MessageSendEvent};
 use util::logger::Logger;
 
 use std::collections::{HashMap,hash_map,LinkedList};
 use std::sync::{Arc, Mutex};
 use std::sync::atomic::{AtomicUsize, Ordering};
-use std::{cmp,error,mem,hash,fmt};
+use std::{cmp,error,hash,fmt};
 
 /// Provides references to trait impls which handle different types of messages.
 pub struct MessageHandler {
@@ -127,7 +127,6 @@ impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
 pub struct PeerManager<Descriptor: SocketDescriptor> {
        message_handler: MessageHandler,
        peers: Mutex<PeerHolder<Descriptor>>,
-       pending_events: Mutex<Vec<Event>>,
        our_node_secret: SecretKey,
        initial_syncs_sent: AtomicUsize,
        logger: Arc<Logger>,
@@ -164,7 +163,6 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                PeerManager {
                        message_handler: message_handler,
                        peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }),
-                       pending_events: Mutex::new(Vec::new()),
                        our_node_secret: our_node_secret,
                        initial_syncs_sent: AtomicUsize::new(0),
                        logger,
@@ -522,9 +520,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                                        }, 16);
                                                                                                }
 
-                                                                                               for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) {
-                                                                                                       encode_and_send_msg!(msg, 136);
-                                                                                               }
+                                                                                               self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap());
                                                                                        },
                                                                                        17 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader));
@@ -561,8 +557,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                        // Channel control:
                                                                                        32 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::OpenChannel::read(&mut reader));
-                                                                                               let resp = try_potential_handleerror!(self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), &msg));
-                                                                                               encode_and_send_msg!(resp, 33);
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        33 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::AcceptChannel::read(&mut reader));
@@ -571,8 +566,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
                                                                                        34 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::FundingCreated::read(&mut reader));
-                                                                                               let resp = try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg));
-                                                                                               encode_and_send_msg!(resp, 35);
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        35 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::FundingSigned::read(&mut reader));
@@ -580,29 +574,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                        },
                                                                                        36 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::FundingLocked::read(&mut reader));
-                                                                                               let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg));
-                                                                                               match resp_option {
-                                                                                                       Some(resp) => encode_and_send_msg!(resp, 259),
-                                                                                                       None => {},
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
 
                                                                                        38 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader));
-                                                                                               let resp_options = try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(resp) = resp_options.0 {
-                                                                                                       encode_and_send_msg!(resp, 38);
-                                                                                               }
-                                                                                               if let Some(resp) = resp_options.1 {
-                                                                                                       encode_and_send_msg!(resp, 39);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        39 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader));
-                                                                                               let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(resp) = resp_option {
-                                                                                                       encode_and_send_msg!(resp, 39);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
 
                                                                                        128 => {
@@ -624,33 +605,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
                                                                                        132 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::CommitmentSigned::read(&mut reader));
-                                                                                               let resps = try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg));
-                                                                                               encode_and_send_msg!(resps.0, 133);
-                                                                                               if let Some(resp) = resps.1 {
-                                                                                                       encode_and_send_msg!(resp, 132);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        133 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::RevokeAndACK::read(&mut reader));
-                                                                                               let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg));
-                                                                                               match resp_option {
-                                                                                                       Some(resps) => {
-                                                                                                               for resp in resps.update_add_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 128);
-                                                                                                               }
-                                                                                                               for resp in resps.update_fulfill_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 130);
-                                                                                                               }
-                                                                                                               for resp in resps.update_fail_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 131);
-                                                                                                               }
-                                                                                                               if let Some(resp) = resps.update_fee {
-                                                                                                                       encode_and_send_msg!(resp, 134);
-                                                                                                               }
-                                                                                                               encode_and_send_msg!(resps.commitment_signed, 132);
-                                                                                                       },
-                                                                                                       None => {},
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        134 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::UpdateFee::read(&mut reader));
@@ -658,31 +617,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                        },
                                                                                        136 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ChannelReestablish::read(&mut reader));
-                                                                                               let (funding_locked, revoke_and_ack, commitment_update) = try_potential_handleerror!(self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(lock_msg) = funding_locked {
-                                                                                                       encode_and_send_msg!(lock_msg, 36);
-                                                                                               }
-                                                                                               if let Some(revoke_msg) = revoke_and_ack {
-                                                                                                       encode_and_send_msg!(revoke_msg, 133);
-                                                                                               }
-                                                                                               match commitment_update {
-                                                                                                       Some(resps) => {
-                                                                                                               for resp in resps.update_add_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 128);
-                                                                                                               }
-                                                                                                               for resp in resps.update_fulfill_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 130);
-                                                                                                               }
-                                                                                                               for resp in resps.update_fail_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 131);
-                                                                                                               }
-                                                                                                               if let Some(resp) = resps.update_fee {
-                                                                                                                       encode_and_send_msg!(resp, 134);
-                                                                                                               }
-                                                                                                               encode_and_send_msg!(resps.commitment_signed, 132);
-                                                                                                       },
-                                                                                                       None => {},
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
 
                                                                                        // Routing control:
@@ -743,13 +678,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        /// Checks for any events generated by our handlers and processes them. May be needed after eg
        /// calls to ChannelManager::process_pending_htlc_forward.
        pub fn process_events(&self) {
-               let mut upstream_events = Vec::new();
                {
                        // TODO: There are some DoS attacks here where you can flood someone's outbound send
                        // buffer by doing things like announcing channels on another node. We should be willing to
                        // drop optional-ish messages when send buffers get full!
 
-                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_events();
+                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
                        let mut peers = self.peers.lock().unwrap();
                        for event in events_generated.drain(..) {
                                macro_rules! get_peer_for_forwarding {
@@ -776,14 +710,17 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                        }
                                }
                                match event {
-                                       Event::FundingGenerationReady {..} => { /* Hand upstream */ },
-                                       Event::FundingBroadcastSafe {..} => { /* Hand upstream */ },
-                                       Event::PaymentReceived {..} => { /* Hand upstream */ },
-                                       Event::PaymentSent {..} => { /* Hand upstream */ },
-                                       Event::PaymentFailed {..} => { /* Hand upstream */ },
-                                       Event::PendingHTLCsForwardable {..} => { /* Hand upstream */ },
-
-                                       Event::SendOpenChannel { ref node_id, ref msg } => {
+                                       MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.temporary_channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id));
@@ -792,9 +729,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::SendFundingCreated { ref node_id, ref msg } => {
+                                       MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id),
@@ -805,25 +741,40 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => {
-                                               log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {}{} for channel {}",
+                                       MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
+                                                               //they should just throw away this funding transaction
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
-                                                               if announcement_sigs.is_some() { " with announcement sigs" } else { "" },
                                                                log_bytes!(msg.channel_id));
                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36)));
-                                               match announcement_sigs {
-                                                       &Some(ref announce_msg) => peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(announce_msg, 259))),
-                                                       &None => {},
-                                               }
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                                       MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
+                                                               //they should just throw away this funding transaction
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                                log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
                                                                log_pubkey!(node_id),
                                                                update_add_htlcs.len(),
@@ -850,9 +801,28 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                }
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::SendShutdown { ref node_id, ref msg } => {
+                                       MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
@@ -861,9 +831,18 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
-                                               continue;
                                        },
-                                       Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
+                                       MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                                log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
                                                        let encoded_msg = encode_msg!(msg, 256);
@@ -886,9 +865,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
                                                        }
                                                }
-                                               continue;
                                        },
-                                       Event::BroadcastChannelUpdate { ref msg } => {
+                                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                                log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
                                                        let encoded_msg = encode_msg!(msg, 258);
@@ -901,13 +879,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
                                                        }
                                                }
-                                               continue;
                                        },
-                                       Event::PaymentFailureNetworkUpdate { ref update } => {
+                                       MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => {
                                                self.message_handler.route_handler.handle_htlc_fail_channel_update(update);
-                                               continue;
                                        },
-                                       Event::HandleError { ref node_id, ref action } => {
+                                       MessageSendEvent::HandleError { ref node_id, ref action } => {
                                                if let Some(ref action) = *action {
                                                        match *action {
                                                                msgs::ErrorAction::DisconnectPeer { ref msg } => {
@@ -929,9 +905,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                self.message_handler.chan_handler.peer_disconnected(&node_id, false);
                                                                        }
                                                                },
-                                                               msgs::ErrorAction::IgnoreError => {
-                                                                       continue;
-                                                               },
+                                                               msgs::ErrorAction::IgnoreError => {},
                                                                msgs::ErrorAction::SendErrorMessage { ref msg } => {
                                                                        log_trace!(self, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
                                                                                        log_pubkey!(node_id),
@@ -946,18 +920,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                } else {
                                                        log_error!(self, "Got no-action HandleError Event in peer_handler for node {}, no such events should ever be generated!", log_pubkey!(node_id));
                                                }
-                                               continue;
                                        }
                                }
-
-                               upstream_events.push(event);
                        }
                }
-
-               let mut pending_events = self.pending_events.lock().unwrap();
-               for event in upstream_events.drain(..) {
-                       pending_events.push(event);
-               }
        }
 
        /// Indicates that the given socket descriptor's connection is now closed.
@@ -988,15 +954,6 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        }
 }
 
-impl<Descriptor: SocketDescriptor> EventsProvider for PeerManager<Descriptor> {
-       fn get_and_clear_pending_events(&self) -> Vec<Event> {
-               let mut pending_events = self.pending_events.lock().unwrap();
-               let mut ret = Vec::new();
-               mem::swap(&mut ret, &mut *pending_events);
-               ret
-       }
-}
-
 #[cfg(test)]
 mod tests {
        use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
@@ -1068,7 +1025,7 @@ mod tests {
                let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret);
 
                let chan_handler = test_utils::TestChannelMessageHandler::new();
-               chan_handler.pending_events.lock().unwrap().push(events::Event::HandleError {
+               chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError {
                        node_id: their_id,
                        action: Some(msgs::ErrorAction::DisconnectPeer { msg: None }),
                });