Merge pull request #2293 from wpaulino/disconnect-peers-timer-tick
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Tue, 30 May 2023 18:30:49 +0000 (18:30 +0000)
committerGitHub <noreply@github.com>
Tue, 30 May 2023 18:30:49 +0000 (18:30 +0000)
Disconnect peers on timer ticks to unblock channel state machine

lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/msgs.rs
lightning/src/ln/peer_handler.rs
lightning/src/ln/wire.rs

index 11f0261d677a3df37b2fa36f94ea142e5ce8f16c..d1412b2aeb1c04aaf0cbb60e2da960bc8352cedb 100644 (file)
@@ -479,6 +479,13 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4;
 ///   * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
 pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5;
 
+/// The number of ticks that may elapse while we're waiting for a response to a
+/// [`msgs::RevokeAndACK`] or [`msgs::ChannelReestablish`] message before we attempt to disconnect
+/// them.
+///
+/// See [`Channel::sent_message_awaiting_response`] for more information.
+pub(crate) const DISCONNECT_PEER_AWAITING_RESPONSE_TICKS: usize = 2;
+
 struct PendingChannelMonitorUpdate {
        update: ChannelMonitorUpdate,
        /// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
@@ -715,6 +722,19 @@ pub(super) struct Channel<Signer: ChannelSigner> {
        /// See-also <https://github.com/lightningnetwork/lnd/issues/4006>
        pub workaround_lnd_bug_4006: Option<msgs::ChannelReady>,
 
+       /// An option set when we wish to track how many ticks have elapsed while waiting for a response
+       /// from our counterparty after sending a message. If the peer has yet to respond after reaching
+       /// `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`, a reconnection should be attempted to try to
+       /// unblock the state machine.
+       ///
+       /// This behavior is mostly motivated by a lnd bug in which we don't receive a message we expect
+       /// to in a timely manner, which may lead to channels becoming unusable and/or force-closed. An
+       /// example of such can be found at <https://github.com/lightningnetwork/lnd/issues/7682>.
+       ///
+       /// This is currently only used when waiting for a [`msgs::ChannelReestablish`] or
+       /// [`msgs::RevokeAndACK`] message from the counterparty.
+       sent_message_awaiting_response: Option<usize>,
+
        #[cfg(any(test, fuzzing))]
        // When we receive an HTLC fulfill on an outbound path, we may immediately fulfill the
        // corresponding HTLC on the inbound path. If, then, the outbound path channel is
@@ -1130,6 +1150,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        next_remote_commitment_tx_fee_info_cached: Mutex::new(None),
 
                        workaround_lnd_bug_4006: None,
+                       sent_message_awaiting_response: None,
 
                        latest_inbound_scid_alias: None,
                        outbound_scid_alias,
@@ -1489,6 +1510,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        next_remote_commitment_tx_fee_info_cached: Mutex::new(None),
 
                        workaround_lnd_bug_4006: None,
+                       sent_message_awaiting_response: None,
 
                        latest_inbound_scid_alias: None,
                        outbound_scid_alias,
@@ -3526,6 +3548,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                // OK, we step the channel here and *then* if the new generation fails we can fail the
                // channel based on that, but stepping stuff here should be safe either way.
                self.channel_state &= !(ChannelState::AwaitingRemoteRevoke as u32);
+               self.sent_message_awaiting_response = None;
                self.counterparty_prev_commitment_point = self.counterparty_cur_commitment_point;
                self.counterparty_cur_commitment_point = Some(msg.next_per_commitment_point);
                self.cur_counterparty_commitment_transaction_number -= 1;
@@ -3841,6 +3864,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        }
                }
 
+               self.sent_message_awaiting_response = None;
+
                self.channel_state |= ChannelState::PeerDisconnected as u32;
                log_trace!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops on channel {}", inbound_drop_count, log_bytes!(self.channel_id()));
        }
@@ -3943,6 +3968,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        Some(self.get_last_revoke_and_ack())
                } else { None };
                let commitment_update = if self.monitor_pending_commitment_signed {
+                       self.mark_awaiting_response();
                        Some(self.get_last_commitment_update(logger))
                } else { None };
 
@@ -4132,6 +4158,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                // Go ahead and unmark PeerDisconnected as various calls we may make check for it (and all
                // remaining cases either succeed or ErrorMessage-fail).
                self.channel_state &= !(ChannelState::PeerDisconnected as u32);
+               self.sent_message_awaiting_response = None;
 
                let shutdown_msg = if self.channel_state & (ChannelState::LocalShutdownSent as u32) != 0 {
                        assert!(self.shutdown_scriptpubkey.is_some());
@@ -4192,7 +4219,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                // revoke_and_ack, not on sending commitment_signed, so we add one if have
                // AwaitingRemoteRevoke set, which indicates we sent a commitment_signed but haven't gotten
                // the corresponding revoke_and_ack back yet.
-               let next_counterparty_commitment_number = INITIAL_COMMITMENT_NUMBER - self.cur_counterparty_commitment_transaction_number + if (self.channel_state & ChannelState::AwaitingRemoteRevoke as u32) != 0 { 1 } else { 0 };
+               let is_awaiting_remote_revoke = self.channel_state & ChannelState::AwaitingRemoteRevoke as u32 != 0;
+               if is_awaiting_remote_revoke && !self.is_awaiting_monitor_update() {
+                       self.mark_awaiting_response();
+               }
+               let next_counterparty_commitment_number = INITIAL_COMMITMENT_NUMBER - self.cur_counterparty_commitment_transaction_number + if is_awaiting_remote_revoke { 1 } else { 0 };
 
                let channel_ready = if msg.next_local_commitment_number == 1 && INITIAL_COMMITMENT_NUMBER - self.cur_holder_commitment_transaction_number == 1 {
                        // We should never have to worry about MonitorUpdateInProgress resending ChannelReady
@@ -4361,6 +4392,28 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                }), None))
        }
 
+       // Marks a channel as waiting for a response from the counterparty. If it's not received
+       // [`DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`] after sending our own to them, then we'll attempt
+       // a reconnection.
+       fn mark_awaiting_response(&mut self) {
+               self.sent_message_awaiting_response = Some(0);
+       }
+
+       /// Determines whether we should disconnect the counterparty due to not receiving a response
+       /// within our expected timeframe.
+       ///
+       /// This should be called on every [`super::channelmanager::ChannelManager::timer_tick_occurred`].
+       pub fn should_disconnect_peer_awaiting_response(&mut self) -> bool {
+               let ticks_elapsed = if let Some(ticks_elapsed) = self.sent_message_awaiting_response.as_mut() {
+                       ticks_elapsed
+               } else {
+                       // Don't disconnect when we're not waiting on a response.
+                       return false;
+               };
+               *ticks_elapsed += 1;
+               *ticks_elapsed >= DISCONNECT_PEER_AWAITING_RESPONSE_TICKS
+       }
+
        pub fn shutdown<SP: Deref>(
                &mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown
        ) -> Result<(Option<msgs::Shutdown>, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
@@ -5733,7 +5786,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
        /// May panic if called on a channel that wasn't immediately-previously
        /// self.remove_uncommitted_htlcs_and_mark_paused()'d
-       pub fn get_channel_reestablish<L: Deref>(&self, logger: &L) -> msgs::ChannelReestablish where L::Target: Logger {
+       pub fn get_channel_reestablish<L: Deref>(&mut self, logger: &L) -> msgs::ChannelReestablish where L::Target: Logger {
                assert_eq!(self.channel_state & ChannelState::PeerDisconnected as u32, ChannelState::PeerDisconnected as u32);
                assert_ne!(self.cur_counterparty_commitment_transaction_number, INITIAL_COMMITMENT_NUMBER);
                // Prior to static_remotekey, my_current_per_commitment_point was critical to claiming
@@ -5752,6 +5805,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        log_info!(logger, "Sending a data_loss_protect with no previous remote per_commitment_secret for channel {}", log_bytes!(self.channel_id()));
                        [0;32]
                };
+               self.mark_awaiting_response();
                msgs::ChannelReestablish {
                        channel_id: self.channel_id(),
                        // The protocol has two different commitment number concepts - the "commitment
@@ -7090,6 +7144,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                        next_remote_commitment_tx_fee_info_cached: Mutex::new(None),
 
                        workaround_lnd_bug_4006: None,
+                       sent_message_awaiting_response: None,
 
                        latest_inbound_scid_alias,
                        // Later in the ChannelManager deserialization phase we scan for channels and assign scid aliases if its missing
index 30eab9066fd0b4190ef65167d8e82c98c51c9be1..f489003029afd847f5314fb3dba4e2a721f57c4b 100644 (file)
@@ -3921,6 +3921,20 @@ where
 
                                                chan.maybe_expire_prev_config();
 
+                                               if chan.should_disconnect_peer_awaiting_response() {
+                                                       log_debug!(self.logger, "Disconnecting peer {} due to not making any progress on channel {}",
+                                                                       counterparty_node_id, log_bytes!(*chan_id));
+                                                       pending_msg_events.push(MessageSendEvent::HandleError {
+                                                               node_id: counterparty_node_id,
+                                                               action: msgs::ErrorAction::DisconnectPeerWithWarning {
+                                                                       msg: msgs::WarningMessage {
+                                                                               channel_id: *chan_id,
+                                                                               data: "Disconnecting due to timeout awaiting response".to_owned(),
+                                                                       },
+                                                               },
+                                                       });
+                                               }
+
                                                true
                                        });
                                        if peer_state.ok_to_remove(true) {
index 98fc4bd95b6d58f100ae09aaa32519e57237598c..508c540dff756a01a8fe76946cc9e30632b3d1ca 100644 (file)
@@ -22,7 +22,7 @@ use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFail
 use crate::ln::{PaymentPreimage, PaymentSecret, PaymentHash};
 use crate::ln::channel::{commitment_tx_base_weight, COMMITMENT_TX_WEIGHT_PER_HTLC, CONCURRENT_INBOUND_HTLC_FEE_BUFFER, FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE, MIN_AFFORDABLE_HTLC_COUNT};
 use crate::ln::channelmanager::{self, PaymentId, RAACommitmentOrder, PaymentSendFailure, RecipientOnionFields, BREAKDOWN_TIMEOUT, ENABLE_GOSSIP_TICKS, DISABLE_GOSSIP_TICKS, MIN_CLTV_EXPIRY_DELTA};
-use crate::ln::channel::{Channel, ChannelError};
+use crate::ln::channel::{DISCONNECT_PEER_AWAITING_RESPONSE_TICKS, Channel, ChannelError};
 use crate::ln::{chan_utils, onion_utils};
 use crate::ln::chan_utils::{OFFERED_HTLC_SCRIPT_WEIGHT, htlc_success_tx_weight, htlc_timeout_tx_weight, HTLCOutputInCommitment};
 use crate::routing::gossip::{NetworkGraph, NetworkUpdate};
@@ -9955,3 +9955,128 @@ fn test_payment_with_custom_min_cltv_expiry_delta() {
        do_payment_with_custom_min_final_cltv_expiry(true, false);
        do_payment_with_custom_min_final_cltv_expiry(true, true);
 }
+
+#[test]
+fn test_disconnects_peer_awaiting_response_ticks() {
+       // Tests that nodes which are awaiting on a response critical for channel responsiveness
+       // disconnect their counterparty after `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
+       let mut chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+       let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+       // Asserts a disconnect event is queued to the user.
+       let check_disconnect_event = |node: &Node, should_disconnect: bool| {
+               let disconnect_event = node.node.get_and_clear_pending_msg_events().iter().find_map(|event|
+                       if let MessageSendEvent::HandleError { action, .. } = event {
+                               if let msgs::ErrorAction::DisconnectPeerWithWarning { .. } = action {
+                                       Some(())
+                               } else {
+                                       None
+                               }
+                       } else {
+                               None
+                       }
+               );
+               assert_eq!(disconnect_event.is_some(), should_disconnect);
+       };
+
+       // Fires timer ticks ensuring we only attempt to disconnect peers after reaching
+       // `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
+       let check_disconnect = |node: &Node| {
+               // No disconnect without any timer ticks.
+               check_disconnect_event(node, false);
+
+               // No disconnect with 1 timer tick less than required.
+               for _ in 0..DISCONNECT_PEER_AWAITING_RESPONSE_TICKS - 1 {
+                       node.node.timer_tick_occurred();
+                       check_disconnect_event(node, false);
+               }
+
+               // Disconnect after reaching the required ticks.
+               node.node.timer_tick_occurred();
+               check_disconnect_event(node, true);
+
+               // Disconnect again on the next tick if the peer hasn't been disconnected yet.
+               node.node.timer_tick_occurred();
+               check_disconnect_event(node, true);
+       };
+
+       create_chan_between_nodes(&nodes[0], &nodes[1]);
+
+       // We'll start by performing a fee update with Alice (nodes[0]) on the channel.
+       *nodes[0].fee_estimator.sat_per_kw.lock().unwrap() *= 2;
+       nodes[0].node.timer_tick_occurred();
+       check_added_monitors!(&nodes[0], 1);
+       let alice_fee_update = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id());
+       nodes[1].node.handle_update_fee(&nodes[0].node.get_our_node_id(), alice_fee_update.update_fee.as_ref().unwrap());
+       nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &alice_fee_update.commitment_signed);
+       check_added_monitors!(&nodes[1], 1);
+
+       // This will prompt Bob (nodes[1]) to respond with his `CommitmentSigned` and `RevokeAndACK`.
+       let (bob_revoke_and_ack, bob_commitment_signed) = get_revoke_commit_msgs!(&nodes[1], nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bob_revoke_and_ack);
+       check_added_monitors!(&nodes[0], 1);
+       nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bob_commitment_signed);
+       check_added_monitors(&nodes[0], 1);
+
+       // Alice then needs to send her final `RevokeAndACK` to complete the commitment dance. We
+       // pretend Bob hasn't received the message and check whether he'll disconnect Alice after
+       // reaching `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
+       let alice_revoke_and_ack = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id());
+       check_disconnect(&nodes[1]);
+
+       // Now, we'll reconnect them to test awaiting a `ChannelReestablish` message.
+       //
+       // Note that since the commitment dance didn't complete above, Alice is expected to resend her
+       // final `RevokeAndACK` to Bob to complete it.
+       nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
+       nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+       let bob_init = msgs::Init { features: nodes[1].node.init_features(), remote_network_address: None };
+       nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &bob_init, true).unwrap();
+       let alice_init = msgs::Init { features: nodes[0].node.init_features(), remote_network_address: None };
+       nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &alice_init, true).unwrap();
+
+       // Upon reconnection, Alice sends her `ChannelReestablish` to Bob. Alice, however, hasn't
+       // received Bob's yet, so she should disconnect him after reaching
+       // `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
+       let alice_channel_reestablish = get_event_msg!(
+               nodes[0], MessageSendEvent::SendChannelReestablish, nodes[1].node.get_our_node_id()
+       );
+       nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &alice_channel_reestablish);
+       check_disconnect(&nodes[0]);
+
+       // Bob now sends his `ChannelReestablish` to Alice to resume the channel and consider it "live".
+       let bob_channel_reestablish = nodes[1].node.get_and_clear_pending_msg_events().iter().find_map(|event|
+               if let MessageSendEvent::SendChannelReestablish { node_id, msg } = event {
+                       assert_eq!(*node_id, nodes[0].node.get_our_node_id());
+                       Some(msg.clone())
+               } else {
+                       None
+               }
+       ).unwrap();
+       nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &bob_channel_reestablish);
+
+       // Sanity check that Alice won't disconnect Bob since she's no longer waiting for any messages.
+       for _ in 0..DISCONNECT_PEER_AWAITING_RESPONSE_TICKS {
+               nodes[0].node.timer_tick_occurred();
+               check_disconnect_event(&nodes[0], false);
+       }
+
+       // However, Bob is still waiting on Alice's `RevokeAndACK`, so he should disconnect her after
+       // reaching `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`.
+       check_disconnect(&nodes[1]);
+
+       // Finally, have Bob process the last message.
+       nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &alice_revoke_and_ack);
+       check_added_monitors(&nodes[1], 1);
+
+       // At this point, neither node should attempt to disconnect each other, since they aren't
+       // waiting on any messages.
+       for node in &nodes {
+               for _ in 0..DISCONNECT_PEER_AWAITING_RESPONSE_TICKS {
+                       node.node.timer_tick_occurred();
+                       check_disconnect_event(node, false);
+               }
+       }
+}
index 76ed56635ac6a57bd22cb82f5bba93e4c210b5c6..d32eb3e4743c460707d2d48b00a195b1f313fc68 100644 (file)
@@ -1137,6 +1137,11 @@ pub enum ErrorAction {
                /// An error message which we should make an effort to send before we disconnect.
                msg: Option<ErrorMessage>
        },
+       /// The peer did something incorrect. Tell them without closing any channels and disconnect them.
+       DisconnectPeerWithWarning {
+               /// A warning message which we should make an effort to send before we disconnect.
+               msg: WarningMessage,
+       },
        /// The peer did something harmless that we weren't able to process, just log and ignore
        // New code should *not* use this. New code must use IgnoreAndLog, below!
        IgnoreError,
index 3cb6a55f2e80ae0a8a1f717f6e03afe66a9faa64..a7f35a59c275baeb70ca6fdb607fe85283c4e965 100644 (file)
@@ -26,7 +26,7 @@ use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager
 use crate::util::ser::{VecWriter, Writeable, Writer};
 use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
 use crate::ln::wire;
-use crate::ln::wire::Encode;
+use crate::ln::wire::{Encode, Type};
 use crate::onion_message::{CustomOnionMessageContents, CustomOnionMessageHandler, SimpleArcOnionMessenger, SimpleRefOnionMessenger};
 use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias};
 use crate::util::atomic_counter::AtomicCounter;
@@ -1230,8 +1230,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                Ok(x) => x,
                                                                Err(e) => {
                                                                        match e.action {
-                                                                               msgs::ErrorAction::DisconnectPeer { msg: _ } => {
-                                                                                       //TODO: Try to push msg
+                                                                               msgs::ErrorAction::DisconnectPeer { .. } => {
+                                                                                       // We may have an `ErrorMessage` to send to the peer,
+                                                                                       // but writing to the socket while reading can lead to
+                                                                                       // re-entrant code and possibly unexpected behavior. The
+                                                                                       // message send is optimistic anyway, and in this case
+                                                                                       // we immediately disconnect the peer.
+                                                                                       log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err);
+                                                                                       return Err(PeerHandleError { });
+                                                                               },
+                                                                               msgs::ErrorAction::DisconnectPeerWithWarning { .. } => {
+                                                                                       // We have a `WarningMessage` to send to the peer, but
+                                                                                       // writing to the socket while reading can lead to
+                                                                                       // re-entrant code and possibly unexpected behavior. The
+                                                                                       // message send is optimistic anyway, and in this case
+                                                                                       // we immediately disconnect the peer.
                                                                                        log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err);
                                                                                        return Err(PeerHandleError { });
                                                                                },
@@ -1362,7 +1375,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                                Ok(x) => x,
                                                                                Err(e) => {
                                                                                        match e {
-                                                                                               // Note that to avoid recursion we never call
+                                                                                               // Note that to avoid re-entrancy we never call
                                                                                                // `do_attempt_write_data` from here, causing
                                                                                                // the messages enqueued here to not actually
                                                                                                // be sent before the peer is disconnected.
@@ -1383,9 +1396,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                                                        });
                                                                                                        continue;
                                                                                                }
-                                                                                               (msgs::DecodeError::UnknownRequiredFeature, ty) => {
-                                                                                                       log_gossip!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!");
-                                                                                                       self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: format!("Received an unknown required feature/TLV in message type {:?}", ty) });
+                                                                                               (msgs::DecodeError::UnknownRequiredFeature, _) => {
+                                                                                                       log_debug!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!");
                                                                                                        return Err(PeerHandleError { });
                                                                                                }
                                                                                                (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }),
@@ -2065,32 +2077,48 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                        log_pubkey!(node_id), msg.contents.short_channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
-                                               MessageSendEvent::HandleError { ref node_id, ref action } => {
-                                                       match *action {
-                                                               msgs::ErrorAction::DisconnectPeer { ref msg } => {
+                                               MessageSendEvent::HandleError { node_id, action } => {
+                                                       match action {
+                                                               msgs::ErrorAction::DisconnectPeer { msg } => {
+                                                                       if let Some(msg) = msg.as_ref() {
+                                                                               log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
+                                                                                       log_pubkey!(node_id), msg.data);
+                                                                       } else {
+                                                                               log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}",
+                                                                                       log_pubkey!(node_id));
+                                                                       }
+                                                                       // We do not have the peers write lock, so we just store that we're
+                                                                       // about to disconenct the peer and do it after we finish
+                                                                       // processing most messages.
+                                                                       let msg = msg.map(|msg| wire::Message::<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg));
+                                                                       peers_to_disconnect.insert(node_id, msg);
+                                                               },
+                                                               msgs::ErrorAction::DisconnectPeerWithWarning { msg } => {
+                                                                       log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
+                                                                               log_pubkey!(node_id), msg.data);
                                                                        // We do not have the peers write lock, so we just store that we're
                                                                        // about to disconenct the peer and do it after we finish
                                                                        // processing most messages.
-                                                                       peers_to_disconnect.insert(*node_id, msg.clone());
+                                                                       peers_to_disconnect.insert(node_id, Some(wire::Message::Warning(msg)));
                                                                },
                                                                msgs::ErrorAction::IgnoreAndLog(level) => {
                                                                        log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
                                                                },
                                                                msgs::ErrorAction::IgnoreDuplicateGossip => {},
                                                                msgs::ErrorAction::IgnoreError => {
-                                                                       log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
-                                                               },
+                                                                               log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
+                                                                       },
                                                                msgs::ErrorAction::SendErrorMessage { ref msg } => {
                                                                        log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
                                                                                        log_pubkey!(node_id),
                                                                                        msg.data);
-                                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
                                                                },
                                                                msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
                                                                        log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
                                                                                        log_pubkey!(node_id),
                                                                                        msg.data);
-                                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
                                                                },
                                                        }
                                                },
@@ -2140,9 +2168,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                if let Some(peer_mutex) = peers.remove(&descriptor) {
                                                        let mut peer = peer_mutex.lock().unwrap();
                                                        if let Some(msg) = msg {
-                                                               log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
-                                                                               log_pubkey!(node_id),
-                                                                               msg.data);
                                                                self.enqueue_message(&mut *peer, &msg);
                                                                // This isn't guaranteed to work, but if there is enough free
                                                                // room in the send buffer, put the error message there...
index 1a01e33826dbb9790086afb1063219ea2f42c98c..88e2ad7c1daee03ef245c795f0119eb229a10520 100644 (file)
@@ -96,9 +96,59 @@ pub(crate) enum Message<T> where T: core::fmt::Debug + Type + TestEq {
        Custom(T),
 }
 
-impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
+impl<T> Writeable for Message<T> where T: core::fmt::Debug + Type + TestEq {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
+               match self {
+                       &Message::Init(ref msg) => msg.write(writer),
+                       &Message::Error(ref msg) => msg.write(writer),
+                       &Message::Warning(ref msg) => msg.write(writer),
+                       &Message::Ping(ref msg) => msg.write(writer),
+                       &Message::Pong(ref msg) => msg.write(writer),
+                       &Message::OpenChannel(ref msg) => msg.write(writer),
+                       &Message::OpenChannelV2(ref msg) => msg.write(writer),
+                       &Message::AcceptChannel(ref msg) => msg.write(writer),
+                       &Message::AcceptChannelV2(ref msg) => msg.write(writer),
+                       &Message::FundingCreated(ref msg) => msg.write(writer),
+                       &Message::FundingSigned(ref msg) => msg.write(writer),
+                       &Message::TxAddInput(ref msg) => msg.write(writer),
+                       &Message::TxAddOutput(ref msg) => msg.write(writer),
+                       &Message::TxRemoveInput(ref msg) => msg.write(writer),
+                       &Message::TxRemoveOutput(ref msg) => msg.write(writer),
+                       &Message::TxComplete(ref msg) => msg.write(writer),
+                       &Message::TxSignatures(ref msg) => msg.write(writer),
+                       &Message::TxInitRbf(ref msg) => msg.write(writer),
+                       &Message::TxAckRbf(ref msg) => msg.write(writer),
+                       &Message::TxAbort(ref msg) => msg.write(writer),
+                       &Message::ChannelReady(ref msg) => msg.write(writer),
+                       &Message::Shutdown(ref msg) => msg.write(writer),
+                       &Message::ClosingSigned(ref msg) => msg.write(writer),
+                       &Message::OnionMessage(ref msg) => msg.write(writer),
+                       &Message::UpdateAddHTLC(ref msg) => msg.write(writer),
+                       &Message::UpdateFulfillHTLC(ref msg) => msg.write(writer),
+                       &Message::UpdateFailHTLC(ref msg) => msg.write(writer),
+                       &Message::UpdateFailMalformedHTLC(ref msg) => msg.write(writer),
+                       &Message::CommitmentSigned(ref msg) => msg.write(writer),
+                       &Message::RevokeAndACK(ref msg) => msg.write(writer),
+                       &Message::UpdateFee(ref msg) => msg.write(writer),
+                       &Message::ChannelReestablish(ref msg) => msg.write(writer),
+                       &Message::AnnouncementSignatures(ref msg) => msg.write(writer),
+                       &Message::ChannelAnnouncement(ref msg) => msg.write(writer),
+                       &Message::NodeAnnouncement(ref msg) => msg.write(writer),
+                       &Message::ChannelUpdate(ref msg) => msg.write(writer),
+                       &Message::QueryShortChannelIds(ref msg) => msg.write(writer),
+                       &Message::ReplyShortChannelIdsEnd(ref msg) => msg.write(writer),
+                       &Message::QueryChannelRange(ref msg) => msg.write(writer),
+                       &Message::ReplyChannelRange(ref msg) => msg.write(writer),
+                       &Message::GossipTimestampFilter(ref msg) => msg.write(writer),
+                       &Message::Unknown(_) => { Ok(()) },
+                       &Message::Custom(ref msg) => msg.write(writer),
+               }
+       }
+}
+
+impl<T> Type for Message<T> where T: core::fmt::Debug + Type + TestEq {
        /// Returns the type that was used to decode the message payload.
-       pub fn type_id(&self) -> u16 {
+       fn type_id(&self) -> u16 {
                match self {
                        &Message::Init(ref msg) => msg.type_id(),
                        &Message::Error(ref msg) => msg.type_id(),
@@ -145,7 +195,9 @@ impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
                        &Message::Custom(ref msg) => msg.type_id(),
                }
        }
+}
 
+impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
        /// Returns whether the message's type is even, indicating both endpoints must support it.
        pub fn is_even(&self) -> bool {
                (self.type_id() & 1) == 0