X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=0b752161b6aff18dee9c2a8982ee25545d85d2b8;hb=d3ddf15357589ed10f4c844dc22822755b804306;hp=ba3a733d225db072f70cc4d185ce4e4cfe1030e9;hpb=d795e247b76f6b0d70a2529c2a05995a909b1134;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index ba3a733d..0b752161 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -18,35 +18,42 @@ use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; -use crate::sign::{KeysManager, NodeSigner, Recipient}; -use crate::events::{MessageSendEvent, MessageSendEventsProvider}; +use crate::sign::{NodeSigner, Recipient}; +use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use crate::ln::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs; use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler}; -#[cfg(not(c_bindings))] -use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use crate::util::ser::{VecWriter, Writeable, Writer}; -use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; +use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE}; use crate::ln::wire; use crate::ln::wire::{Encode, Type}; -#[cfg(not(c_bindings))] -use crate::onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger}; -use crate::onion_message::{CustomOnionMessageHandler, OffersMessage, OffersMessageHandler, OnionMessageContents, PendingOnionMessage}; -use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias}; +use crate::onion_message::messenger::{CustomOnionMessageHandler, PendingOnionMessage}; +use crate::onion_message::offers::{OffersMessage, OffersMessageHandler}; +use crate::onion_message::packet::OnionMessageContents; +use crate::routing::gossip::{NodeId, NodeAlias}; use crate::util::atomic_counter::AtomicCounter; -use crate::util::logger::Logger; +use crate::util::logger::{Logger, WithContext}; use crate::util::string::PrintableString; use crate::prelude::*; use crate::io; -use alloc::collections::LinkedList; -use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock}; +use alloc::collections::VecDeque; +use crate::sync::{Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; use core::convert::Infallible; -#[cfg(feature = "std")] use std::error; +#[cfg(feature = "std")] +use std::error; +#[cfg(not(c_bindings))] +use { + crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}, + crate::onion_message::messenger::{SimpleArcOnionMessenger, SimpleRefOnionMessenger}, + crate::routing::gossip::{NetworkGraph, P2PGossipSync}, + crate::sign::KeysManager, + crate::sync::Arc, +}; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::sha256::HashEngine as Sha256Engine; @@ -89,6 +96,9 @@ pub trait CustomMessageHandler: wire::CustomMessageReader { /// A dummy struct which implements `RoutingMessageHandler` without storing any routing information /// or doing any processing. You can provide one of these as the route_handler in a MessageHandler. pub struct IgnoringMessageHandler{} +impl EventsProvider for IgnoringMessageHandler { + fn process_pending_events(&self, _handler: H) where H::Target: EventHandler {} +} impl MessageSendEventsProvider for IgnoringMessageHandler { fn get_and_clear_pending_msg_events(&self) -> Vec { Vec::new() } } @@ -115,6 +125,7 @@ impl OnionMessageHandler for IgnoringMessageHandler { fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn peer_disconnected(&self, _their_node_id: &PublicKey) {} + fn timer_tick_occurred(&self) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() @@ -211,7 +222,7 @@ impl ChannelMessageHandler for ErroringMessageHandler { // Any messages which are related to a specific channel generate an error message to let the // peer know we don't care about channels. fn handle_open_channel(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannel) { - ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + ErroringMessageHandler::push_error(self, their_node_id, msg.common_fields.temporary_channel_id); } fn handle_accept_channel(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannel) { ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); @@ -231,6 +242,18 @@ impl ChannelMessageHandler for ErroringMessageHandler { fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } + fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } @@ -280,6 +303,7 @@ impl ChannelMessageHandler for ErroringMessageHandler { features.set_channel_type_optional(); features.set_scid_privacy_optional(); features.set_zero_conf_optional(); + features.set_route_blinding_optional(); features } @@ -291,7 +315,7 @@ impl ChannelMessageHandler for ErroringMessageHandler { } fn handle_open_channel_v2(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { - ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + ErroringMessageHandler::push_error(self, their_node_id, msg.common_fields.temporary_channel_id); } fn handle_accept_channel_v2(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { @@ -361,7 +385,7 @@ pub struct MessageHandler where /// A message handler which handles onion messages. This should generally be an /// [`OnionMessenger`], but can also be an [`IgnoringMessageHandler`]. /// - /// [`OnionMessenger`]: crate::onion_message::OnionMessenger + /// [`OnionMessenger`]: crate::onion_message::messenger::OnionMessenger pub onion_message_handler: OM, /// A message handler which handles custom messages. The only LDK-provided implementation is @@ -489,13 +513,13 @@ struct Peer { their_features: Option, their_socket_address: Option, - pending_outbound_buffer: LinkedList>, + pending_outbound_buffer: VecDeque>, pending_outbound_buffer_first_msg_offset: usize, /// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily /// prioritize channel messages over them. /// /// Note that these messages are *not* encrypted/MAC'd, and are only serialized. - gossip_broadcast_buffer: LinkedList>, + gossip_broadcast_buffer: VecDeque, awaiting_write_event: bool, pending_read_buffer: Vec, @@ -668,6 +692,8 @@ pub trait APeerManager { type NS: Deref; /// Gets a reference to the underlying [`PeerManager`]. fn as_ref(&self) -> &PeerManager; + /// Returns the peer manager's [`OnionMessageHandler`]. + fn onion_message_handler(&self) -> &Self::OMT; } impl @@ -693,6 +719,9 @@ APeerManager for PeerManager where type NST = ::Target; type NS = NS; fn as_ref(&self) -> &PeerManager { self } + fn onion_message_handler(&self) -> &Self::OMT { + self.message_handler.onion_message_handler.deref() + } } /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls @@ -785,7 +814,7 @@ impl From for MessageHandlingError { macro_rules! encode_msg { ($msg: expr) => {{ - let mut buffer = VecWriter(Vec::new()); + let mut buffer = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE)); wire::write($msg, &mut buffer).unwrap(); buffer.0 }} @@ -910,7 +939,7 @@ impl InitFeatures { @@ -997,9 +1026,9 @@ impl>(); + let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE; + let lots_of_slack = peer.pending_outbound_buffer.len() + < peer.pending_outbound_buffer.capacity() / 2; + if large_capacity && lots_of_slack { + peer.pending_outbound_buffer.shrink_to_fit(); + } } else { peer.awaiting_write_event = true; } @@ -1234,18 +1270,20 @@ impl(&self, peer: &mut Peer, message: &M) { + let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None); if is_gossip_msg(message.type_id()) { - log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)); + log_gossip!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)); } else { - log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)) + log_trace!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)) } peer.msgs_sent_since_pong += 1; peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message)); } /// Append a message to a peer's pending outbound/write gossip broadcast buffer - fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: Vec) { + fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: MessageBuf) { peer.msgs_sent_since_pong += 1; + debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP); peer.gossip_broadcast_buffer.push_back(encoded_message); } @@ -1265,8 +1303,10 @@ impl { - match $thing { + ($peer: expr, $thing: expr) => {{ + let res = $thing; + let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None); + match res { Ok(x) => x, Err(e) => { match e.action { @@ -1276,7 +1316,7 @@ impl { @@ -1285,32 +1325,32 @@ impl { - log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_given_level!(logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); continue }, msgs::ErrorAction::IgnoreDuplicateGossip => continue, // Don't even bother logging these msgs::ErrorAction::IgnoreError => { - log_debug!(self.logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_debug!(logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); continue; }, msgs::ErrorAction::SendErrorMessage { msg } => { - log_debug!(self.logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_debug!(logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer_node_id), e.err); self.enqueue_message($peer, &msg); continue; }, msgs::ErrorAction::SendWarningMessage { msg, log_level } => { - log_given_level!(self.logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_given_level!(logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer_node_id), e.err); self.enqueue_message($peer, &msg); continue; }, } } } - } + }} } let mut peer_lock = peer_mutex.lock().unwrap(); @@ -1335,9 +1375,10 @@ impl { + let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None); match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) { hash_map::Entry::Occupied(e) => { - log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); + log_trace!(logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event // Check that the peers map is consistent with the // node_id_to_descriptor map, as this has been broken @@ -1346,7 +1387,7 @@ impl { - log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0)); + log_debug!(logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0)); entry.insert(peer_descriptor.clone()) }, }; @@ -1402,17 +1443,19 @@ impl= 2); + debug_assert!(peer.pending_read_buffer.len() >= 2 + 16); + try_potential_handleerror!(peer, + peer.channel_encryptor.decrypt_message(&mut peer.pending_read_buffer[..])); + + let mut reader = io::Cursor::new(&peer.pending_read_buffer[..peer.pending_read_buffer.len() - 16]); + let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler); // Reset read buffer if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); } peer.pending_read_buffer.resize(18, 0); peer.pending_read_is_header = true; - let mut reader = io::Cursor::new(&msg_data[..]); - let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler); + let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None); let message = match message_result { Ok(x) => x, Err(e) => { @@ -1422,16 +1465,16 @@ impl { - log_gossip!(self.logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!"); + log_gossip!(logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!"); continue; } (msgs::DecodeError::UnsupportedCompression, _) => { - log_gossip!(self.logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message"); + log_gossip!(logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message"); self.enqueue_message(peer, &msgs::WarningMessage { channel_id: ChannelId::new_zero(), data: "Unsupported message compression: zlib".to_owned() }); continue; } (_, Some(ty)) if is_gossip_msg(ty) => { - log_gossip!(self.logger, "Got an invalid value while deserializing a gossip message"); + log_gossip!(logger, "Got an invalid value while deserializing a gossip message"); self.enqueue_message(peer, &msgs::WarningMessage { channel_id: ChannelId::new_zero(), data: format!("Unreadable/bogus gossip message of type {}", ty), @@ -1439,16 +1482,16 @@ impl { - log_debug!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); + log_debug!(logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); return Err(PeerHandleError { }); } (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }), (msgs::DecodeError::InvalidValue, _) => { - log_debug!(self.logger, "Got an invalid value while deserializing message"); + log_debug!(logger, "Got an invalid value while deserializing message"); return Err(PeerHandleError { }); } (msgs::DecodeError::ShortRead, _) => { - log_debug!(self.logger, "Deserialization failed due to shortness of message"); + log_debug!(logger, "Deserialization failed due to shortness of message"); return Err(PeerHandleError { }); } (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { }), @@ -1498,6 +1541,7 @@ impl::Target as wire::CustomMessageReader>::CustomMessage> ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0; + let logger = WithContext::from(&self.logger, Some(their_node_id), None); peer_lock.received_message_since_timer_tick = true; // Need an Init as first message @@ -1515,7 +1559,7 @@ impl { - log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); + log_debug!(logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); self.message_handler.chan_handler.handle_error(&their_node_id, &msg); if msg.channel_id.is_zero() { return Err(PeerHandleError { }.into()); } }, wire::Message::Warning(msg) => { - log_debug!(self.logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); + log_debug!(logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); }, wire::Message::Ping(msg) => { @@ -1643,6 +1687,22 @@ impl { + self.message_handler.chan_handler.handle_stfu(&their_node_id, &msg); + } + + // Splicing messages: + wire::Message::Splice(msg) => { + self.message_handler.chan_handler.handle_splice(&their_node_id, &msg); + } + wire::Message::SpliceAck(msg) => { + self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg); + } + wire::Message::SpliceLocked(msg) => { + self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg); + } + // Interactive transaction construction messages: wire::Message::TxAddInput(msg) => { self.message_handler.chan_handler.handle_tx_add_input(&their_node_id, &msg); @@ -1752,11 +1812,11 @@ impl { - log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id); + log_debug!(logger, "Received unknown even message of type {}, disconnecting peer!", type_id); return Err(PeerHandleError { }.into()); }, wire::Message::Unknown(type_id) => { - log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id); + log_trace!(logger, "Received unknown odd message of type {}, ignoring", type_id); }, wire::Message::Custom(custom) => { self.message_handler.custom_message_handler.handle_custom_message(custom, &their_node_id)?; @@ -1779,8 +1839,9 @@ impl { @@ -1806,8 +1867,9 @@ impl { @@ -1833,14 +1895,15 @@ impl debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), @@ -1916,112 +1979,140 @@ impl { - log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), - &msg.temporary_channel_id); + &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), - &msg.temporary_channel_id); + &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", log_pubkey!(node_id), &msg.temporary_channel_id, - log_funding_channel_id!(msg.funding_txid, msg.funding_output_index)); + ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index)); // TODO: If the peer is gone we should generate a DiscardFunding event // indicating to the wallet that they should just throw away this funding transaction self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendFundingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReady event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, + MessageSendEvent::SendStfu { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSplice { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendSplice event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxComplete event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxSignatures event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAbort event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { - log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id)), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", log_pubkey!(node_id), update_add_htlcs.len(), update_fulfill_htlcs.len(), @@ -2046,31 +2137,31 @@ impl { - log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendClosingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { - log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", log_pubkey!(node_id), msg.contents.short_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); @@ -2108,49 +2199,50 @@ impl { - log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", + log_trace!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", log_pubkey!(node_id), msg.contents.short_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::HandleError { node_id, action } => { + let logger = WithContext::from(&self.logger, Some(node_id), None); match action { msgs::ErrorAction::DisconnectPeer { msg } => { if let Some(msg) = msg.as_ref() { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); } else { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}", + log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}", log_pubkey!(node_id)); } // We do not have the peers write lock, so we just store that we're - // about to disconenct the peer and do it after we finish + // about to disconnect the peer and do it after we finish // processing most messages. let msg = msg.map(|msg| wire::Message::<<::Target as wire::CustomMessageReader>::CustomMessage>::Error(msg)); peers_to_disconnect.insert(node_id, msg); }, msgs::ErrorAction::DisconnectPeerWithWarning { msg } => { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); // We do not have the peers write lock, so we just store that we're - // about to disconenct the peer and do it after we finish + // about to disconnect the peer and do it after we finish // processing most messages. peers_to_disconnect.insert(node_id, Some(wire::Message::Warning(msg))); }, msgs::ErrorAction::IgnoreAndLog(level) => { - log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); + log_given_level!(logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); }, msgs::ErrorAction::IgnoreDuplicateGossip => {}, msgs::ErrorAction::IgnoreError => { - log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); + log_debug!(logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); }, msgs::ErrorAction::SendErrorMessage { ref msg } => { - log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", + log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); }, msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => { - log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", + log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); @@ -2164,7 +2256,7 @@ impl { - log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", + log_gossip!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", log_pubkey!(node_id), msg.short_channel_ids.len(), msg.first_blocknum, @@ -2238,7 +2330,7 @@ impl { let peer = peer_lock.lock().unwrap(); if let Some((node_id, _)) = peer.their_node_id { - log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id)); + log_trace!(WithContext::from(&self.logger, Some(node_id), None), "Handling disconnection of peer {}", log_pubkey!(node_id)); let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); debug_assert!(removed.is_some(), "descriptor maps should be consistent"); if !peer.handshake_complete() { return; } @@ -2404,7 +2496,6 @@ impl