Merge pull request #3129 from optout21/splicing-msgs-update
[rust-lightning] / lightning / src / ln / peer_handler.rs
index 9ce230861456927f53bf4a2b65dd33e800e17e5d..1b75755fac2b897079d89902a4682dbe8b21c604 100644 (file)
@@ -19,16 +19,17 @@ use bitcoin::blockdata::constants::ChainHash;
 use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
 
 use crate::sign::{NodeSigner, Recipient};
-use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
-use crate::ln::ChannelId;
+use crate::events::{MessageSendEvent, MessageSendEventsProvider};
+use crate::ln::types::ChannelId;
 use crate::ln::features::{InitFeatures, NodeFeatures};
 use crate::ln::msgs;
-use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler};
+use crate::ln::msgs::{ChannelMessageHandler, Init, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler};
 use crate::util::ser::{VecWriter, Writeable, Writer};
 use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE};
 use crate::ln::wire;
 use crate::ln::wire::{Encode, Type};
-use crate::onion_message::messenger::{CustomOnionMessageHandler, PendingOnionMessage};
+use crate::onion_message::async_payments::{AsyncPaymentsMessageHandler, HeldHtlcAvailable, ReleaseHeldHtlc};
+use crate::onion_message::messenger::{CustomOnionMessageHandler, PendingOnionMessage, Responder, ResponseInstruction};
 use crate::onion_message::offers::{OffersMessage, OffersMessageHandler};
 use crate::onion_message::packet::OnionMessageContents;
 use crate::routing::gossip::{NodeId, NodeAlias};
@@ -79,6 +80,16 @@ pub trait CustomMessageHandler: wire::CustomMessageReader {
        /// connection to the node exists, then the message is simply not sent.
        fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>;
 
+       /// Indicates a peer disconnected.
+       fn peer_disconnected(&self, their_node_id: &PublicKey);
+
+       /// Handle a peer connecting.
+       ///
+       /// May return an `Err(())` if the features the peer supports are not sufficient to communicate
+       /// with us. Implementors should be somewhat conservative about doing so, however, as other
+       /// message handlers may still wish to communicate with this peer.
+       fn peer_connected(&self, their_node_id: &PublicKey, msg: &Init, inbound: bool) -> Result<(), ()>;
+
        /// Gets the node feature flags which this handler itself supports. All available handlers are
        /// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`]
        /// which are broadcasted in our [`NodeAnnouncement`] message.
@@ -97,9 +108,6 @@ pub trait CustomMessageHandler: wire::CustomMessageReader {
 /// A dummy struct which implements `RoutingMessageHandler` without storing any routing information
 /// or doing any processing. You can provide one of these as the route_handler in a MessageHandler.
 pub struct IgnoringMessageHandler{}
-impl EventsProvider for IgnoringMessageHandler {
-       fn process_pending_events<H: Deref>(&self, _handler: H) where H::Target: EventHandler {}
-}
 impl MessageSendEventsProvider for IgnoringMessageHandler {
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> { Vec::new() }
 }
@@ -123,6 +131,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        }
        fn processing_queue_high(&self) -> bool { false }
 }
+
 impl OnionMessageHandler for IgnoringMessageHandler {
        fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
        fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
@@ -134,12 +143,23 @@ impl OnionMessageHandler for IgnoringMessageHandler {
                InitFeatures::empty()
        }
 }
+
 impl OffersMessageHandler for IgnoringMessageHandler {
-       fn handle_message(&self, _msg: OffersMessage) -> Option<OffersMessage> { None }
+       fn handle_message(&self, _message: OffersMessage, _responder: Option<Responder>) -> ResponseInstruction<OffersMessage> {
+               ResponseInstruction::NoResponse
+       }
+}
+impl AsyncPaymentsMessageHandler for IgnoringMessageHandler {
+       fn held_htlc_available(
+               &self, _message: HeldHtlcAvailable, _responder: Option<Responder>,
+       ) -> ResponseInstruction<ReleaseHeldHtlc> {
+               ResponseInstruction::NoResponse
+       }
+       fn release_held_htlc(&self, _message: ReleaseHeldHtlc) {}
 }
 impl CustomOnionMessageHandler for IgnoringMessageHandler {
        type CustomMessage = Infallible;
-       fn handle_custom_message(&self, _msg: Infallible) -> Option<Infallible> {
+       fn handle_custom_message(&self, _message: Self::CustomMessage, _responder: Option<Responder>) -> ResponseInstruction<Self::CustomMessage> {
                // Since we always return `None` in the read the handle method should never be called.
                unreachable!();
        }
@@ -153,6 +173,7 @@ impl CustomOnionMessageHandler for IgnoringMessageHandler {
 
 impl OnionMessageContents for Infallible {
        fn tlv_type(&self) -> u64 { unreachable!(); }
+       fn msg_type(&self) -> &'static str { unreachable!(); }
 }
 
 impl Deref for IgnoringMessageHandler {
@@ -188,6 +209,10 @@ impl CustomMessageHandler for IgnoringMessageHandler {
 
        fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() }
 
+       fn peer_disconnected(&self, _their_node_id: &PublicKey) {}
+
+       fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
+
        fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
 
        fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
@@ -249,7 +274,7 @@ impl ChannelMessageHandler for ErroringMessageHandler {
                ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
        }
        #[cfg(splicing)]
-       fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) {
+       fn handle_splice_init(&self, their_node_id: &PublicKey, msg: &msgs::SpliceInit) {
                ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
        }
        #[cfg(splicing)]
@@ -718,8 +743,6 @@ pub trait APeerManager {
        type NS: Deref<Target=Self::NST>;
        /// Gets a reference to the underlying [`PeerManager`].
        fn as_ref(&self) -> &PeerManager<Self::Descriptor, Self::CM, Self::RM, Self::OM, Self::L, Self::CMH, Self::NS>;
-       /// Returns the peer manager's [`OnionMessageHandler`].
-       fn onion_message_handler(&self) -> &Self::OMT;
 }
 
 impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref, NS: Deref>
@@ -745,9 +768,6 @@ APeerManager for PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> where
        type NST = <NS as Deref>::Target;
        type NS = NS;
        fn as_ref(&self) -> &PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> { self }
-       fn onion_message_handler(&self) -> &Self::OMT {
-               self.message_handler.onion_message_handler.deref()
-       }
 }
 
 /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
@@ -1329,7 +1349,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
        /// Append a message to a peer's pending outbound/write buffer
        fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
-               let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+               let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
                if is_gossip_msg(message.type_id()) {
                        log_gossip!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0));
                } else {
@@ -1364,7 +1384,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        macro_rules! try_potential_handleerror {
                                                ($peer: expr, $thing: expr) => {{
                                                        let res = $thing;
-                                                       let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None);
+                                                       let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None, None);
                                                        match res {
                                                                Ok(x) => x,
                                                                Err(e) => {
@@ -1436,7 +1456,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
                                                macro_rules! insert_node_id {
                                                        () => {
-                                                               let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+                                                               let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
                                                                match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) {
                                                                        hash_map::Entry::Occupied(e) => {
                                                                                log_trace!(logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0));
@@ -1514,7 +1534,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                        peer.pending_read_buffer.resize(18, 0);
                                                                        peer.pending_read_is_header = true;
 
-                                                                       let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+                                                                       let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
                                                                        let message = match message_result {
                                                                                Ok(x) => x,
                                                                                Err(e) => {
@@ -1602,7 +1622,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                message: wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>
        ) -> Result<Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
                let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0;
-               let logger = WithContext::from(&self.logger, Some(their_node_id), None);
+               let logger = WithContext::from(&self.logger, Some(their_node_id), None, None);
 
                let message = match self.do_handle_message_holding_peer_lock(peer_lock, message, &their_node_id, &logger)? {
                        Some(processed_message) => processed_message,
@@ -1649,12 +1669,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
                        let our_features = self.init_features(&their_node_id);
                        if msg.features.requires_unknown_bits_from(&our_features) {
-                               log_debug!(logger, "Peer requires features unknown to us");
+                               log_debug!(logger, "Peer {} requires features unknown to us: {:?}",
+                                       log_pubkey!(their_node_id), msg.features.required_unknown_bits_from(&our_features));
                                return Err(PeerHandleError { }.into());
                        }
 
                        if our_features.requires_unknown_bits_from(&msg.features) {
-                               log_debug!(logger, "We require features unknown to our peer");
+                               log_debug!(logger, "We require features unknown to our peer {}: {:?}",
+                                       log_pubkey!(their_node_id), our_features.required_unknown_bits_from(&msg.features));
                                return Err(PeerHandleError { }.into());
                        }
 
@@ -1681,6 +1703,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                log_debug!(logger, "Onion Message Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
                                return Err(PeerHandleError { }.into());
                        }
+                       if let Err(()) = self.message_handler.custom_message_handler.peer_connected(&their_node_id, &msg, peer_lock.inbound_connection) {
+                               log_debug!(logger, "Custom Message Handler decided we couldn't communicate with peer {}", log_pubkey!(their_node_id));
+                               return Err(PeerHandleError { }.into());
+                       }
 
                        peer_lock.awaiting_pong_timer_tick_intervals = 0;
                        peer_lock.their_features = Some(msg.features);
@@ -1789,8 +1815,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
                        #[cfg(splicing)]
                        // Splicing messages:
-                       wire::Message::Splice(msg) => {
-                               self.message_handler.chan_handler.handle_splice(&their_node_id, &msg);
+                       wire::Message::SpliceInit(msg) => {
+                               self.message_handler.chan_handler.handle_splice_init(&their_node_id, &msg);
                        }
                        #[cfg(splicing)]
                        wire::Message::SpliceAck(msg) => {
@@ -1937,7 +1963,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        }
                                        debug_assert!(peer.their_node_id.is_some());
                                        debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
-                                       let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+                                       let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
                                        if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
@@ -1965,7 +1991,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        }
                                        debug_assert!(peer.their_node_id.is_some());
                                        debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
-                                       let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+                                       let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
                                        if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
@@ -1993,7 +2019,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        }
                                        debug_assert!(peer.their_node_id.is_some());
                                        debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
-                                       let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+                                       let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
                                        if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
@@ -2077,31 +2103,31 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                for event in events_generated.drain(..) {
                                        match event {
                                                MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.common_fields.temporary_channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.common_fields.temporary_channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.common_fields.temporary_channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.common_fields.temporary_channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
                                                                        log_pubkey!(node_id),
                                                                        &msg.temporary_channel_id,
                                                                        ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index));
@@ -2110,107 +2136,107 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendStfu { ref node_id, ref msg} => {
-                                                       let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id));
+                                                       let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
                                                        log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                }
-                                               MessageSendEvent::SendSplice { ref node_id, ref msg} => {
-                                                       let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id));
-                                                       log_debug!(logger, "Handling SendSplice event in peer_handler for node {} for channel {}",
+                                               MessageSendEvent::SendSpliceInit { ref node_id, ref msg} => {
+                                                       let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
+                                                       log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                }
                                                MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => {
-                                                       let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id));
+                                                       let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
                                                        log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                }
                                                MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => {
-                                                       let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id));
+                                                       let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
                                                        log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                }
                                                MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id)), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        update_add_htlcs.len(),
                                                                        update_fulfill_htlcs.len(),
@@ -2235,31 +2261,31 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                        self.enqueue_message(&mut *peer, commitment_signed);
                                                },
                                                MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling Shutdown event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
-                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
+                                                       log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
                                                                        log_pubkey!(node_id),
                                                                        msg.contents.short_channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
@@ -2297,12 +2323,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                        }
                                                },
                                                MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
-                                                       log_trace!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
+                                                       log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id), msg.contents.short_channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
                                                MessageSendEvent::HandleError { node_id, action } => {
-                                                       let logger = WithContext::from(&self.logger, Some(node_id), None);
+                                                       let logger = WithContext::from(&self.logger, Some(node_id), None, None);
                                                        match action {
                                                                msgs::ErrorAction::DisconnectPeer { msg } => {
                                                                        if let Some(msg) = msg.as_ref() {
@@ -2354,7 +2380,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                }
                                                MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
-                                                       log_gossip!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
+                                                       log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
                                                                log_pubkey!(node_id),
                                                                msg.short_channel_ids.len(),
                                                                msg.first_blocknum,
@@ -2428,9 +2454,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
                debug_assert!(peer.their_node_id.is_some());
                if let Some((node_id, _)) = peer.their_node_id {
-                       log_trace!(WithContext::from(&self.logger, Some(node_id), None), "Disconnecting peer with id {} due to {}", node_id, reason);
+                       log_trace!(WithContext::from(&self.logger, Some(node_id), None, None), "Disconnecting peer with id {} due to {}", node_id, reason);
                        self.message_handler.chan_handler.peer_disconnected(&node_id);
                        self.message_handler.onion_message_handler.peer_disconnected(&node_id);
+                       self.message_handler.custom_message_handler.peer_disconnected(&node_id);
                }
                descriptor.disconnect_socket();
        }
@@ -2447,12 +2474,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        Some(peer_lock) => {
                                let peer = peer_lock.lock().unwrap();
                                if let Some((node_id, _)) = peer.their_node_id {
-                                       log_trace!(WithContext::from(&self.logger, Some(node_id), None), "Handling disconnection of peer {}", log_pubkey!(node_id));
+                                       log_trace!(WithContext::from(&self.logger, Some(node_id), None, None), "Handling disconnection of peer {}", log_pubkey!(node_id));
                                        let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
                                        debug_assert!(removed.is_some(), "descriptor maps should be consistent");
                                        if !peer.handshake_complete() { return; }
                                        self.message_handler.chan_handler.peer_disconnected(&node_id);
                                        self.message_handler.onion_message_handler.peer_disconnected(&node_id);
+                                       self.message_handler.custom_message_handler.peer_disconnected(&node_id);
                                }
                        }
                };
@@ -2676,12 +2704,12 @@ mod tests {
        use crate::sign::{NodeSigner, Recipient};
        use crate::events;
        use crate::io;
-       use crate::ln::ChannelId;
+       use crate::ln::types::ChannelId;
        use crate::ln::features::{InitFeatures, NodeFeatures};
        use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
        use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER};
        use crate::ln::{msgs, wire};
-       use crate::ln::msgs::{LightningError, SocketAddress};
+       use crate::ln::msgs::{Init, LightningError, SocketAddress};
        use crate::util::test_utils;
 
        use bitcoin::Network;
@@ -2748,6 +2776,11 @@ mod tests {
 
                fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() }
 
+
+               fn peer_disconnected(&self, _their_node_id: &PublicKey) {}
+
+               fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
+
                fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
 
                fn provided_init_features(&self, _: &PublicKey) -> InitFeatures {