X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fln%2Fpeer_handler.rs;h=d868b0758bd745a6e7ee53294e7127c7942d4a89;hb=f0490118b3dcbf3d52fd305ea777c0c9d6b6d7d0;hp=76cc35f64c550b9b401faab6272a9d163383e821;hpb=6c1123cafd5e4821042e6f57efdf83af62e6fc6a;p=rust-lightning diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index 76cc35f6..d868b075 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -12,13 +12,13 @@ use ln::msgs; use util::ser::{Writeable, Writer, Readable}; use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; use util::byte_utils; -use util::events::{EventsProvider,Event}; +use util::events::{MessageSendEvent}; use util::logger::Logger; -use std::collections::{HashMap,hash_map,LinkedList}; +use std::collections::{HashMap,hash_map,HashSet,LinkedList}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::{cmp,error,mem,hash,fmt}; +use std::{cmp,error,hash,fmt}; /// Provides references to trait impls which handle different types of messages. pub struct MessageHandler { @@ -106,17 +106,22 @@ struct Peer { struct PeerHolder { peers: HashMap, + /// Added to by do_read_event for cases where we pushed a message onto the send buffer but + /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events() + peers_needing_send: HashSet, /// Only add to this set when noise completes: node_id_to_descriptor: HashMap, } struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> { peers: &'a mut HashMap, + peers_needing_send: &'a mut HashSet, node_id_to_descriptor: &'a mut HashMap, } impl PeerHolder { fn borrow_parts(&mut self) -> MutPeerHolder { MutPeerHolder { peers: &mut self.peers, + peers_needing_send: &mut self.peers_needing_send, node_id_to_descriptor: &mut self.node_id_to_descriptor, } } @@ -127,7 +132,6 @@ impl PeerHolder { pub struct PeerManager { message_handler: MessageHandler, peers: Mutex>, - pending_events: Mutex>, our_node_secret: SecretKey, initial_syncs_sent: AtomicUsize, logger: Arc, @@ -163,8 +167,11 @@ impl PeerManager { pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc) -> PeerManager { PeerManager { message_handler: message_handler, - peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }), - pending_events: Mutex::new(Vec::new()), + peers: Mutex::new(PeerHolder { + peers: HashMap::new(), + peers_needing_send: HashSet::new(), + node_id_to_descriptor: HashMap::new() + }), our_node_secret: our_node_secret, initial_syncs_sent: AtomicUsize::new(0), logger, @@ -190,7 +197,7 @@ impl PeerManager { /// Note that if an Err is returned here you MUST NOT call disconnect_event for the new /// descriptor but must disconnect the connection immediately. /// - /// Returns some bytes to send to the remote node. + /// Returns a small number of bytes to send to the remote node (currently always 50). /// /// Panics if descriptor is duplicative with some other descriptor which has not yet has a /// disconnect_event. @@ -300,16 +307,12 @@ impl PeerManager { /// /// May return an Err to indicate that the connection should be closed. /// - /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into - /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The - /// invariants around calling write_event in case a write did not fully complete must still - /// hold. Note that this function will often call send_data on many peers before returning, not - /// just this peer! + /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity. + /// Thus, however, you almost certainly want to call process_events() after any read_event to + /// generate send_data calls to handle responses. /// /// If Ok(true) is returned, further read_events should not be triggered until a write_event on - /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note - /// that this must be true even if a send_data call with resume_read=true was made during the - /// course of this function! + /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). /// /// Panics if the descriptor was not previously registered in a new_*_connection event. pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec) -> Result { @@ -349,6 +352,7 @@ impl PeerManager { { log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap())); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..])); + peers.peers_needing_send.insert(peer_descriptor.clone()); } } } @@ -522,9 +526,7 @@ impl PeerManager { }, 16); } - for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) { - encode_and_send_msg!(msg, 136); - } + self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()); }, 17 => { let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader)); @@ -561,8 +563,7 @@ impl PeerManager { // Channel control: 32 => { let msg = try_potential_decodeerror!(msgs::OpenChannel::read(&mut reader)); - let resp = try_potential_handleerror!(self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), &msg)); - encode_and_send_msg!(resp, 33); + try_potential_handleerror!(self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), &msg)); }, 33 => { let msg = try_potential_decodeerror!(msgs::AcceptChannel::read(&mut reader)); @@ -571,8 +572,7 @@ impl PeerManager { 34 => { let msg = try_potential_decodeerror!(msgs::FundingCreated::read(&mut reader)); - let resp = try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg)); - encode_and_send_msg!(resp, 35); + try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg)); }, 35 => { let msg = try_potential_decodeerror!(msgs::FundingSigned::read(&mut reader)); @@ -580,29 +580,16 @@ impl PeerManager { }, 36 => { let msg = try_potential_decodeerror!(msgs::FundingLocked::read(&mut reader)); - let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg)); - match resp_option { - Some(resp) => encode_and_send_msg!(resp, 259), - None => {}, - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg)); }, 38 => { let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader)); - let resp_options = try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg)); - if let Some(resp) = resp_options.0 { - encode_and_send_msg!(resp, 38); - } - if let Some(resp) = resp_options.1 { - encode_and_send_msg!(resp, 39); - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg)); }, 39 => { let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader)); - let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg)); - if let Some(resp) = resp_option { - encode_and_send_msg!(resp, 39); - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg)); }, 128 => { @@ -615,10 +602,7 @@ impl PeerManager { }, 131 => { let msg = try_potential_decodeerror!(msgs::UpdateFailHTLC::read(&mut reader)); - let chan_update = try_potential_handleerror!(self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg)); - if let Some(update) = chan_update { - self.message_handler.route_handler.handle_htlc_fail_channel_update(&update); - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg)); }, 135 => { let msg = try_potential_decodeerror!(msgs::UpdateFailMalformedHTLC::read(&mut reader)); @@ -627,33 +611,11 @@ impl PeerManager { 132 => { let msg = try_potential_decodeerror!(msgs::CommitmentSigned::read(&mut reader)); - let resps = try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg)); - encode_and_send_msg!(resps.0, 133); - if let Some(resp) = resps.1 { - encode_and_send_msg!(resp, 132); - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg)); }, 133 => { let msg = try_potential_decodeerror!(msgs::RevokeAndACK::read(&mut reader)); - let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg)); - match resp_option { - Some(resps) => { - for resp in resps.update_add_htlcs { - encode_and_send_msg!(resp, 128); - } - for resp in resps.update_fulfill_htlcs { - encode_and_send_msg!(resp, 130); - } - for resp in resps.update_fail_htlcs { - encode_and_send_msg!(resp, 131); - } - if let Some(resp) = resps.update_fee { - encode_and_send_msg!(resp, 134); - } - encode_and_send_msg!(resps.commitment_signed, 132); - }, - None => {}, - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg)); }, 134 => { let msg = try_potential_decodeerror!(msgs::UpdateFee::read(&mut reader)); @@ -661,31 +623,7 @@ impl PeerManager { }, 136 => { let msg = try_potential_decodeerror!(msgs::ChannelReestablish::read(&mut reader)); - let (funding_locked, revoke_and_ack, commitment_update) = try_potential_handleerror!(self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg)); - if let Some(lock_msg) = funding_locked { - encode_and_send_msg!(lock_msg, 36); - } - if let Some(revoke_msg) = revoke_and_ack { - encode_and_send_msg!(revoke_msg, 133); - } - match commitment_update { - Some(resps) => { - for resp in resps.update_add_htlcs { - encode_and_send_msg!(resp, 128); - } - for resp in resps.update_fulfill_htlcs { - encode_and_send_msg!(resp, 130); - } - for resp in resps.update_fail_htlcs { - encode_and_send_msg!(resp, 131); - } - if let Some(resp) = resps.update_fee { - encode_and_send_msg!(resp, 134); - } - encode_and_send_msg!(resps.commitment_signed, 132); - }, - None => {}, - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg)); }, // Routing control: @@ -738,22 +676,21 @@ impl PeerManager { pause_read }; - self.process_events(); - Ok(pause_read) } - /// Checks for any events generated by our handlers and processes them. May be needed after eg - /// calls to ChannelManager::process_pending_htlc_forward. + /// Checks for any events generated by our handlers and processes them. Includes sending most + /// response messages as well as messages generated by calls to handler functions directly (eg + /// functions like ChannelManager::process_pending_htlc_forward or send_payment). pub fn process_events(&self) { - let mut upstream_events = Vec::new(); { // TODO: There are some DoS attacks here where you can flood someone's outbound send // buffer by doing things like announcing channels on another node. We should be willing to // drop optional-ish messages when send buffers get full! - let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_events(); - let mut peers = self.peers.lock().unwrap(); + let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); + let mut peers_lock = self.peers.lock().unwrap(); + let peers = peers_lock.borrow_parts(); for event in events_generated.drain(..) { macro_rules! get_peer_for_forwarding { ($node_id: expr, $handle_no_such_peer: block) => { @@ -779,14 +716,17 @@ impl PeerManager { } } match event { - Event::FundingGenerationReady {..} => { /* Hand upstream */ }, - Event::FundingBroadcastSafe {..} => { /* Hand upstream */ }, - Event::PaymentReceived {..} => { /* Hand upstream */ }, - Event::PaymentSent {..} => { /* Hand upstream */ }, - Event::PaymentFailed {..} => { /* Hand upstream */ }, - Event::PendingHTLCsForwardable {..} => { /* Hand upstream */ }, - - Event::SendOpenChannel { ref node_id, ref msg } => { + MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { + log_trace!(self, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.temporary_channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: Drop the pending channel? (or just let it timeout, but that sucks) + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, + MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.temporary_channel_id)); @@ -795,9 +735,8 @@ impl PeerManager { }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32))); Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::SendFundingCreated { ref node_id, ref msg } => { + MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", log_pubkey!(node_id), log_bytes!(msg.temporary_channel_id), @@ -808,25 +747,40 @@ impl PeerManager { }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34))); Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::SendFundingLocked { ref node_id, ref msg, ref announcement_sigs } => { - log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {}{} for channel {}", + MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { + log_trace!(self, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: generate a DiscardFunding event indicating to the wallet that + //they should just throw away this funding transaction + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, + MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => { + log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {} for channel {}", log_pubkey!(node_id), - if announcement_sigs.is_some() { " with announcement sigs" } else { "" }, log_bytes!(msg.channel_id)); let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36))); - match announcement_sigs { - &Some(ref announce_msg) => peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(announce_msg, 259))), - &None => {}, - } Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { + MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { + log_trace!(self, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: generate a DiscardFunding event indicating to the wallet that + //they should just throw away this funding transaction + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, + MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", log_pubkey!(node_id), update_add_htlcs.len(), @@ -853,9 +807,28 @@ impl PeerManager { } peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132))); Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::SendShutdown { ref node_id, ref msg } => { + MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { + log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: Do whatever we're gonna do for handling dropped messages + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, + MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { + log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: Do whatever we're gonna do for handling dropped messages + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, + MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); @@ -864,9 +837,18 @@ impl PeerManager { }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38))); Self::do_attempt_write_data(&mut descriptor, peer); - continue; }, - Event::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { + MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { + log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: Do whatever we're gonna do for handling dropped messages + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, + MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() { let encoded_msg = encode_msg!(msg, 256); @@ -889,9 +871,8 @@ impl PeerManager { Self::do_attempt_write_data(&mut (*descriptor).clone(), peer); } } - continue; }, - Event::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { ref msg } => { log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_update(msg).is_ok() { let encoded_msg = encode_msg!(msg, 258); @@ -904,13 +885,16 @@ impl PeerManager { Self::do_attempt_write_data(&mut (*descriptor).clone(), peer); } } - continue; }, - Event::HandleError { ref node_id, ref action } => { + MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => { + self.message_handler.route_handler.handle_htlc_fail_channel_update(update); + }, + MessageSendEvent::HandleError { ref node_id, ref action } => { if let Some(ref action) = *action { match *action { msgs::ErrorAction::DisconnectPeer { ref msg } => { if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) { + peers.peers_needing_send.remove(&descriptor); if let Some(mut peer) = peers.peers.remove(&descriptor) { if let Some(ref msg) = *msg { log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", @@ -928,9 +912,7 @@ impl PeerManager { self.message_handler.chan_handler.peer_disconnected(&node_id, false); } }, - msgs::ErrorAction::IgnoreError => { - continue; - }, + msgs::ErrorAction::IgnoreError => {}, msgs::ErrorAction::SendErrorMessage { ref msg } => { log_trace!(self, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), @@ -945,17 +927,16 @@ impl PeerManager { } else { log_error!(self, "Got no-action HandleError Event in peer_handler for node {}, no such events should ever be generated!", log_pubkey!(node_id)); } - continue; } } - - upstream_events.push(event); } - } - let mut pending_events = self.pending_events.lock().unwrap(); - for event in upstream_events.drain(..) { - pending_events.push(event); + for mut descriptor in peers.peers_needing_send.drain() { + match peers.peers.get_mut(&descriptor) { + Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer), + None => panic!("Inconsistent peers set state!"), + } + } } } @@ -971,6 +952,7 @@ impl PeerManager { fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { let mut peers = self.peers.lock().unwrap(); + peers.peers_needing_send.remove(descriptor); let peer_option = peers.peers.remove(descriptor); match peer_option { None => panic!("Descriptor for disconnect_event is not already known to PeerManager"), @@ -987,15 +969,6 @@ impl PeerManager { } } -impl EventsProvider for PeerManager { - fn get_and_clear_pending_events(&self) -> Vec { - let mut pending_events = self.pending_events.lock().unwrap(); - let mut ret = Vec::new(); - mem::swap(&mut ret, &mut *pending_events); - ret - } -} - #[cfg(test)] mod tests { use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; @@ -1067,7 +1040,7 @@ mod tests { let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); let chan_handler = test_utils::TestChannelMessageHandler::new(); - chan_handler.pending_events.lock().unwrap().push(events::Event::HandleError { + chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError { node_id: their_id, action: Some(msgs::ErrorAction::DisconnectPeer { msg: None }), });