Merge pull request #1977 from jkczyz/2023-01-offers-fuzz
[rust-lightning] / lightning / src / ln / peer_handler.rs
index 010894aa4839ab45ae902a131779d38ff3ddb78e..e9eaf33e8840b021afe7fbcfc60afec3e0fa5b3d 100644 (file)
@@ -46,16 +46,23 @@ use bitcoin::hashes::sha256::Hash as Sha256;
 use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
 use bitcoin::hashes::{HashEngine, Hash};
 
-/// Handler for BOLT1-compliant messages.
+/// A handler provided to [`PeerManager`] for reading and handling custom messages.
+///
+/// [BOLT 1] specifies a custom message type range for use with experimental or application-specific
+/// messages. `CustomMessageHandler` allows for user-defined handling of such types. See the
+/// [`lightning_custom_message`] crate for tools useful in composing more than one custom handler.
+///
+/// [BOLT 1]: https://github.com/lightning/bolts/blob/master/01-messaging.md
+/// [`lightning_custom_message`]: https://docs.rs/lightning_custom_message/latest/lightning_custom_message
 pub trait CustomMessageHandler: wire::CustomMessageReader {
-       /// Called with the message type that was received and the buffer to be read.
-       /// Can return a `MessageHandlingError` if the message could not be handled.
+       /// Handles the given message sent from `sender_node_id`, possibly producing messages for
+       /// [`CustomMessageHandler::get_and_clear_pending_msg`] to return and thus for [`PeerManager`]
+       /// to send.
        fn handle_custom_message(&self, msg: Self::CustomMessage, sender_node_id: &PublicKey) -> Result<(), LightningError>;
 
-       /// Gets the list of pending messages which were generated by the custom message
-       /// handler, clearing the list in the process. The first tuple element must
-       /// correspond to the intended recipients node ids. If no connection to one of the
-       /// specified node does not exist, the message is simply not sent to it.
+       /// Returns the list of pending messages that were generated by the handler, clearing the list
+       /// in the process. Each message is paired with the node id of the intended recipient. If no
+       /// connection to the node exists, then the message is simply not sent.
        fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>;
 }
 
@@ -72,7 +79,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        fn get_next_channel_announcement(&self, _starting_point: u64) ->
                Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { None }
        fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option<msgs::NodeAnnouncement> { None }
-       fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) }
+       fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
        fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
        fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
        fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
@@ -88,8 +95,8 @@ impl OnionMessageProvider for IgnoringMessageHandler {
 }
 impl OnionMessageHandler for IgnoringMessageHandler {
        fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
-       fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) }
-       fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
+       fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
+       fn peer_disconnected(&self, _their_node_id: &PublicKey) {}
        fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
        fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
                InitFeatures::empty()
@@ -223,8 +230,8 @@ impl ChannelMessageHandler for ErroringMessageHandler {
        }
        // msgs::ChannelUpdate does not contain the channel_id field, so we just drop them.
        fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {}
-       fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
-       fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) }
+       fn peer_disconnected(&self, _their_node_id: &PublicKey) {}
+       fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
        fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
        fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
        fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
@@ -315,16 +322,7 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
 /// generate no further read_event/write_buffer_space_avail/socket_disconnected calls for the
 /// descriptor.
 #[derive(Clone)]
-pub struct PeerHandleError {
-       /// Used to indicate that we probably can't make any future connections to this peer (e.g.
-       /// because we required features that our peer was missing, or vice versa).
-       ///
-       /// While LDK's [`ChannelManager`] will not do it automatically, you likely wish to force-close
-       /// any channels with this peer or check for new versions of LDK.
-       ///
-       /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
-       pub no_connection_possible: bool,
-}
+pub struct PeerHandleError { }
 impl fmt::Debug for PeerHandleError {
        fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
                formatter.write_str("Peer Sent Invalid Data")
@@ -427,6 +425,8 @@ struct Peer {
        /// `channel_announcement` at all - we set this unconditionally but unset it every time we
        /// check if we're gossip-processing-backlogged).
        received_channel_announce_since_backlogged: bool,
+
+       inbound_connection: bool,
 }
 
 impl Peer {
@@ -444,6 +444,7 @@ impl Peer {
        /// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
        /// sent the old versions, we should send the update, and so return true here.
        fn should_forward_channel_announcement(&self, channel_id: u64) -> bool {
+               if !self.handshake_complete() { return false; }
                if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
                        !self.sent_gossip_timestamp_filter {
                                return false;
@@ -457,6 +458,7 @@ impl Peer {
 
        /// Similar to the above, but for node announcements indexed by node_id.
        fn should_forward_node_announcement(&self, node_id: NodeId) -> bool {
+               if !self.handshake_complete() { return false; }
                if self.their_features.as_ref().unwrap().supports_gossip_queries() &&
                        !self.sent_gossip_timestamp_filter {
                                return false;
@@ -483,19 +485,20 @@ impl Peer {
        fn should_buffer_gossip_backfill(&self) -> bool {
                self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
                        && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+                       && self.handshake_complete()
        }
 
        /// Determines if we should push an onion message onto a peer's outbound buffer. This is checked
        /// every time the peer's buffer may have been drained.
        fn should_buffer_onion_message(&self) -> bool {
-               self.pending_outbound_buffer.is_empty()
+               self.pending_outbound_buffer.is_empty() && self.handshake_complete()
                        && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
        }
 
        /// Determines if we should push additional gossip broadcast messages onto a peer's outbound
        /// buffer. This is checked every time the peer's buffer may have been drained.
        fn should_buffer_gossip_broadcast(&self) -> bool {
-               self.pending_outbound_buffer.is_empty()
+               self.pending_outbound_buffer.is_empty() && self.handshake_complete()
                        && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
        }
 
@@ -777,8 +780,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                let peers = self.peers.read().unwrap();
                peers.values().filter_map(|peer_mutex| {
                        let p = peer_mutex.lock().unwrap();
-                       if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() ||
-                               p.their_node_id.is_none() {
+                       if !p.handshake_complete() {
                                return None;
                        }
                        Some((p.their_node_id.unwrap().0, p.their_net_address.clone()))
@@ -836,6 +838,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        sent_gossip_timestamp_filter: false,
 
                        received_channel_announce_since_backlogged: false,
+                       inbound_connection: false,
                })).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -885,6 +888,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        sent_gossip_timestamp_filter: false,
 
                        received_channel_announce_since_backlogged: false,
+                       inbound_connection: true,
                })).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -1007,7 +1011,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                // This is most likely a simple race condition where the user found that the socket
                                // was writeable, then we told the user to `disconnect_socket()`, then they called
                                // this method. Return an error to make sure we get disconnected.
-                               return Err(PeerHandleError { no_connection_possible: false });
+                               return Err(PeerHandleError { });
                        },
                        Some(peer_mutex) => {
                                let mut peer = peer_mutex.lock().unwrap();
@@ -1040,7 +1044,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        Ok(res) => Ok(res),
                        Err(e) => {
                                log_trace!(self.logger, "Peer sent invalid data or we decided to disconnect due to a protocol error");
-                               self.disconnect_event_internal(peer_descriptor, e.no_connection_possible);
+                               self.disconnect_event_internal(peer_descriptor);
                                Err(e)
                        }
                }
@@ -1073,7 +1077,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                // This is most likely a simple race condition where the user read some bytes
                                // from the socket, then we told the user to `disconnect_socket()`, then they
                                // called this method. Return an error to make sure we get disconnected.
-                               return Err(PeerHandleError { no_connection_possible: false });
+                               return Err(PeerHandleError { });
                        },
                        Some(peer_mutex) => {
                                let mut read_pos = 0;
@@ -1087,7 +1091,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                                msgs::ErrorAction::DisconnectPeer { msg: _ } => {
                                                                                        //TODO: Try to push msg
                                                                                        log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err);
-                                                                                       return Err(PeerHandleError{ no_connection_possible: false });
+                                                                                       return Err(PeerHandleError { });
                                                                                },
                                                                                msgs::ErrorAction::IgnoreAndLog(level) => {
                                                                                        log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err);
@@ -1140,7 +1144,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                        hash_map::Entry::Occupied(_) => {
                                                                                log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0));
                                                                                peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
-                                                                               return Err(PeerHandleError{ no_connection_possible: false })
+                                                                               return Err(PeerHandleError { })
                                                                        },
                                                                        hash_map::Entry::Vacant(entry) => {
                                                                                log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0));
@@ -1197,7 +1201,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                        if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
                                                                        peer.pending_read_buffer.resize(msg_len as usize + 16, 0);
                                                                        if msg_len < 2 { // Need at least the message type tag
-                                                                               return Err(PeerHandleError{ no_connection_possible: false });
+                                                                               return Err(PeerHandleError { });
                                                                        }
                                                                        peer.pending_read_is_header = false;
                                                                } else {
@@ -1240,19 +1244,19 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                                                (msgs::DecodeError::UnknownRequiredFeature, ty) => {
                                                                                                        log_gossip!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!");
                                                                                                        self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: format!("Received an unknown required feature/TLV in message type {:?}", ty) });
-                                                                                                       return Err(PeerHandleError { no_connection_possible: false });
+                                                                                                       return Err(PeerHandleError { });
                                                                                                }
-                                                                                               (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { no_connection_possible: false }),
+                                                                                               (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }),
                                                                                                (msgs::DecodeError::InvalidValue, _) => {
                                                                                                        log_debug!(self.logger, "Got an invalid value while deserializing message");
-                                                                                                       return Err(PeerHandleError { no_connection_possible: false });
+                                                                                                       return Err(PeerHandleError { });
                                                                                                }
                                                                                                (msgs::DecodeError::ShortRead, _) => {
                                                                                                        log_debug!(self.logger, "Deserialization failed due to shortness of message");
-                                                                                                       return Err(PeerHandleError { no_connection_possible: false });
+                                                                                                       return Err(PeerHandleError { });
                                                                                                }
-                                                                                               (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { no_connection_possible: false }),
-                                                                                               (msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { no_connection_possible: false }),
+                                                                                               (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { }),
+                                                                                               (msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { }),
                                                                                        }
                                                                                }
                                                                        };
@@ -1304,10 +1308,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                if let wire::Message::Init(msg) = message {
                        if msg.features.requires_unknown_bits() {
                                log_debug!(self.logger, "Peer features required unknown version bits");
-                               return Err(PeerHandleError{ no_connection_possible: true }.into());
+                               return Err(PeerHandleError { }.into());
                        }
                        if peer_lock.their_features.is_some() {
-                               return Err(PeerHandleError{ no_connection_possible: false }.into());
+                               return Err(PeerHandleError { }.into());
                        }
 
                        log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(their_node_id), msg.features);
@@ -1317,24 +1321,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0);
                        }
 
-                       if let Err(()) = self.message_handler.route_handler.peer_connected(&their_node_id, &msg) {
+                       if let Err(()) = self.message_handler.route_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
                                log_debug!(self.logger, "Route Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
-                               return Err(PeerHandleError{ no_connection_possible: true }.into());
+                               return Err(PeerHandleError { }.into());
                        }
-                       if let Err(()) = self.message_handler.chan_handler.peer_connected(&their_node_id, &msg) {
+                       if let Err(()) = self.message_handler.chan_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
                                log_debug!(self.logger, "Channel Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
-                               return Err(PeerHandleError{ no_connection_possible: true }.into());
+                               return Err(PeerHandleError { }.into());
                        }
-                       if let Err(()) = self.message_handler.onion_message_handler.peer_connected(&their_node_id, &msg) {
+                       if let Err(()) = self.message_handler.onion_message_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
                                log_debug!(self.logger, "Onion Message Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
-                               return Err(PeerHandleError{ no_connection_possible: true }.into());
+                               return Err(PeerHandleError { }.into());
                        }
 
                        peer_lock.their_features = Some(msg.features);
                        return Ok(None);
                } else if peer_lock.their_features.is_none() {
                        log_debug!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(their_node_id));
-                       return Err(PeerHandleError{ no_connection_possible: false }.into());
+                       return Err(PeerHandleError { }.into());
                }
 
                if let wire::Message::GossipTimestampFilter(_msg) = message {
@@ -1386,7 +1390,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                }
                                self.message_handler.chan_handler.handle_error(&their_node_id, &msg);
                                if msg.channel_id == [0; 32] {
-                                       return Err(PeerHandleError{ no_connection_possible: true }.into());
+                                       return Err(PeerHandleError { }.into());
                                }
                        },
                        wire::Message::Warning(msg) => {
@@ -1516,8 +1520,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        // Unknown messages:
                        wire::Message::Unknown(type_id) if message.is_even() => {
                                log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id);
-                               // Fail the channel if message is an even, unknown type as per BOLT #1.
-                               return Err(PeerHandleError{ no_connection_possible: true }.into());
+                               return Err(PeerHandleError { }.into());
                        },
                        wire::Message::Unknown(type_id) => {
                                log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id);
@@ -1537,10 +1540,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
                                for (_, peer_mutex) in peers.iter() {
                                        let mut peer = peer_mutex.lock().unwrap();
-                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                       if !peer.handshake_complete() ||
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
                                                continue
                                        }
+                                       debug_assert!(peer.their_node_id.is_some());
+                                       debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
                                        if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
@@ -1562,10 +1567,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
                                for (_, peer_mutex) in peers.iter() {
                                        let mut peer = peer_mutex.lock().unwrap();
-                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                       if !peer.handshake_complete() ||
                                                        !peer.should_forward_node_announcement(msg.contents.node_id) {
                                                continue
                                        }
+                                       debug_assert!(peer.their_node_id.is_some());
+                                       debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
                                        if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
@@ -1587,10 +1594,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
                                for (_, peer_mutex) in peers.iter() {
                                        let mut peer = peer_mutex.lock().unwrap();
-                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                       if !peer.handshake_complete() ||
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
                                                continue
                                        }
+                                       debug_assert!(peer.their_node_id.is_some());
+                                       debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
                                        if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
@@ -1670,7 +1679,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                        Some(descriptor) => match peers.get(&descriptor) {
                                                                Some(peer_mutex) => {
                                                                        let peer_lock = peer_mutex.lock().unwrap();
-                                                                       if peer_lock.their_features.is_none() {
+                                                                       if !peer_lock.handshake_complete() {
                                                                                continue;
                                                                        }
                                                                        peer_lock
@@ -1912,7 +1921,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
        /// Indicates that the given socket descriptor's connection is now closed.
        pub fn socket_disconnected(&self, descriptor: &Descriptor) {
-               self.disconnect_event_internal(descriptor, false);
+               self.disconnect_event_internal(descriptor);
        }
 
        fn do_disconnect(&self, mut descriptor: Descriptor, peer: &Peer, reason: &'static str) {
@@ -1925,13 +1934,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                debug_assert!(peer.their_node_id.is_some());
                if let Some((node_id, _)) = peer.their_node_id {
                        log_trace!(self.logger, "Disconnecting peer with id {} due to {}", node_id, reason);
-                       self.message_handler.chan_handler.peer_disconnected(&node_id, false);
-                       self.message_handler.onion_message_handler.peer_disconnected(&node_id, false);
+                       self.message_handler.chan_handler.peer_disconnected(&node_id);
+                       self.message_handler.onion_message_handler.peer_disconnected(&node_id);
                }
                descriptor.disconnect_socket();
        }
 
-       fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
+       fn disconnect_event_internal(&self, descriptor: &Descriptor) {
                let mut peers = self.peers.write().unwrap();
                let peer_option = peers.remove(descriptor);
                match peer_option {
@@ -1945,12 +1954,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                if !peer.handshake_complete() { return; }
                                debug_assert!(peer.their_node_id.is_some());
                                if let Some((node_id, _)) = peer.their_node_id {
-                                       log_trace!(self.logger,
-                                               "Handling disconnection of peer {}, with {}future connection to the peer possible.",
-                                               log_pubkey!(node_id), if no_connection_possible { "no " } else { "" });
+                                       log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id));
                                        self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
-                                       self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
-                                       self.message_handler.onion_message_handler.peer_disconnected(&node_id, no_connection_possible);
+                                       self.message_handler.chan_handler.peer_disconnected(&node_id);
+                                       self.message_handler.onion_message_handler.peer_disconnected(&node_id);
                                }
                        }
                };
@@ -1958,14 +1965,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
        /// Disconnect a peer given its node id.
        ///
-       /// Set `no_connection_possible` to true to prevent any further connection with this peer,
-       /// force-closing any channels we have with it.
-       ///
        /// If a peer is connected, this will call [`disconnect_socket`] on the descriptor for the
        /// peer. Thus, be very careful about reentrancy issues.
        ///
        /// [`disconnect_socket`]: SocketDescriptor::disconnect_socket
-       pub fn disconnect_by_node_id(&self, node_id: PublicKey, _no_connection_possible: bool) {
+       pub fn disconnect_by_node_id(&self, node_id: PublicKey) {
                let mut peers_lock = self.peers.write().unwrap();
                if let Some(descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
                        let peer_opt = peers_lock.remove(&descriptor);
@@ -2024,7 +2028,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                let mut peer = peer_mutex.lock().unwrap();
                                if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
 
-                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
+                               if !peer.handshake_complete() {
                                        // The peer needs to complete its handshake before we can exchange messages. We
                                        // give peers one timer tick to complete handshake, reusing
                                        // `awaiting_pong_timer_tick_intervals` to track number of timer ticks taken
@@ -2036,6 +2040,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        }
                                        continue;
                                }
+                               debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
+                               debug_assert!(peer.their_node_id.is_some());
 
                                loop { // Used as a `goto` to skip writing a Ping message.
                                        if peer.awaiting_pong_timer_tick_intervals == -1 {
@@ -2171,6 +2177,7 @@ fn is_gossip_msg(type_id: u16) -> bool {
 #[cfg(test)]
 mod tests {
        use crate::chain::keysinterface::{NodeSigner, Recipient};
+       use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
        use crate::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses};
        use crate::ln::{msgs, wire};
        use crate::ln::msgs::NetAddress;
@@ -2279,19 +2286,15 @@ mod tests {
                // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
                // push a DisconnectPeer event to remove the node flagged by id
                let cfgs = create_peermgr_cfgs(2);
-               let chan_handler = test_utils::TestChannelMessageHandler::new();
-               let mut peers = create_network(2, &cfgs);
+               let peers = create_network(2, &cfgs);
                establish_connection(&peers[0], &peers[1]);
                assert_eq!(peers[0].peers.read().unwrap().len(), 1);
 
                let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap();
-
-               chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError {
+               cfgs[0].chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError {
                        node_id: their_id,
                        action: msgs::ErrorAction::DisconnectPeer { msg: None },
                });
-               assert_eq!(chan_handler.pending_events.lock().unwrap().len(), 1);
-               peers[0].message_handler.chan_handler = &chan_handler;
 
                peers[0].process_events();
                assert_eq!(peers[0].peers.read().unwrap().len(), 0);
@@ -2325,6 +2328,35 @@ mod tests {
                assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
        }
 
+       #[test]
+       fn test_non_init_first_msg() {
+               // Simple test of the first message received over a connection being something other than
+               // Init. This results in an immediate disconnection, which previously included a spurious
+               // peer_disconnected event handed to event handlers (which would panic in
+               // `TestChannelMessageHandler` here).
+               let cfgs = create_peermgr_cfgs(2);
+               let peers = create_network(2, &cfgs);
+
+               let mut fd_dup = FileDescriptor { fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())) };
+               let addr_dup = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1003};
+               let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap();
+               peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap();
+
+               let mut dup_encryptor = PeerChannelEncryptor::new_outbound(id_a, SecretKey::from_slice(&[42; 32]).unwrap());
+               let initial_data = dup_encryptor.get_act_one(&peers[1].secp_ctx);
+               assert_eq!(peers[0].read_event(&mut fd_dup, &initial_data).unwrap(), false);
+               peers[0].process_events();
+
+               let a_data = fd_dup.outbound_data.lock().unwrap().split_off(0);
+               let (act_three, _) =
+                       dup_encryptor.process_act_two(&a_data[..], &&cfgs[1].node_signer).unwrap();
+               assert_eq!(peers[0].read_event(&mut fd_dup, &act_three).unwrap(), false);
+
+               let not_init_msg = msgs::Ping { ponglen: 4, byteslen: 0 };
+               let msg_bytes = dup_encryptor.encrypt_message(&not_init_msg);
+               assert!(peers[0].read_event(&mut fd_dup, &msg_bytes).is_err());
+       }
+
        #[test]
        fn test_disconnect_all_peer() {
                // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and