X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=a4e0042414f8996f95c08deb72d7a578322fff97;hb=c92db69183143a58b3017ec450bdbf15a035bd9f;hp=a1a4d4b26729c6822227b7d58f576ed0e1725ad1;hpb=48edd01d02a68258c046bff0e2bd05d25efd28ce;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index a1a4d4b2..a4e00424 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -19,7 +19,7 @@ use bitcoin::blockdata::constants::ChainHash; use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; use crate::sign::{KeysManager, NodeSigner, Recipient}; -use crate::events::{MessageSendEvent, MessageSendEventsProvider}; +use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use crate::ln::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs; @@ -27,7 +27,7 @@ use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, Onio #[cfg(not(c_bindings))] use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use crate::util::ser::{VecWriter, Writeable, Writer}; -use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MSG_BUF_ALLOC_SIZE}; +use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE}; use crate::ln::wire; use crate::ln::wire::{Encode, Type}; #[cfg(not(c_bindings))] @@ -35,12 +35,12 @@ use crate::onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger}; use crate::onion_message::{CustomOnionMessageHandler, OffersMessage, OffersMessageHandler, OnionMessageContents, PendingOnionMessage}; use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias}; use crate::util::atomic_counter::AtomicCounter; -use crate::util::logger::Logger; +use crate::util::logger::{Logger, WithContext}; use crate::util::string::PrintableString; use crate::prelude::*; use crate::io; -use alloc::collections::LinkedList; +use alloc::collections::VecDeque; use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; @@ -89,6 +89,9 @@ pub trait CustomMessageHandler: wire::CustomMessageReader { /// A dummy struct which implements `RoutingMessageHandler` without storing any routing information /// or doing any processing. You can provide one of these as the route_handler in a MessageHandler. pub struct IgnoringMessageHandler{} +impl EventsProvider for IgnoringMessageHandler { + fn process_pending_events(&self, _handler: H) where H::Target: EventHandler {} +} impl MessageSendEventsProvider for IgnoringMessageHandler { fn get_and_clear_pending_msg_events(&self) -> Vec { Vec::new() } } @@ -115,6 +118,7 @@ impl OnionMessageHandler for IgnoringMessageHandler { fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn peer_disconnected(&self, _their_node_id: &PublicKey) {} + fn timer_tick_occurred(&self) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() @@ -231,6 +235,18 @@ impl ChannelMessageHandler for ErroringMessageHandler { fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } + fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } @@ -489,13 +505,13 @@ struct Peer { their_features: Option, their_socket_address: Option, - pending_outbound_buffer: LinkedList>, + pending_outbound_buffer: VecDeque>, pending_outbound_buffer_first_msg_offset: usize, /// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily /// prioritize channel messages over them. /// /// Note that these messages are *not* encrypted/MAC'd, and are only serialized. - gossip_broadcast_buffer: LinkedList>, + gossip_broadcast_buffer: VecDeque, awaiting_write_event: bool, pending_read_buffer: Vec, @@ -668,6 +684,8 @@ pub trait APeerManager { type NS: Deref; /// Gets a reference to the underlying [`PeerManager`]. fn as_ref(&self) -> &PeerManager; + /// Returns the peer manager's [`OnionMessageHandler`]. + fn onion_message_handler(&self) -> &Self::OMT; } impl @@ -693,6 +711,9 @@ APeerManager for PeerManager where type NST = ::Target; type NS = NS; fn as_ref(&self) -> &PeerManager { self } + fn onion_message_handler(&self) -> &Self::OMT { + self.message_handler.onion_message_handler.deref() + } } /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls @@ -910,7 +931,7 @@ impl InitFeatures { @@ -997,9 +1018,9 @@ impl>(); + let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE; + let lots_of_slack = peer.pending_outbound_buffer.len() + < peer.pending_outbound_buffer.capacity() / 2; + if large_capacity && lots_of_slack { + peer.pending_outbound_buffer.shrink_to_fit(); + } } else { peer.awaiting_write_event = true; } @@ -1234,18 +1262,20 @@ impl(&self, peer: &mut Peer, message: &M) { + let logger = WithContext::from(&self.logger, Some(peer.their_node_id.unwrap().0), None); if is_gossip_msg(message.type_id()) { - log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)); + log_gossip!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)); } else { - log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)) + log_trace!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)) } peer.msgs_sent_since_pong += 1; peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message)); } /// Append a message to a peer's pending outbound/write gossip broadcast buffer - fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: Vec) { + fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: MessageBuf) { peer.msgs_sent_since_pong += 1; + debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP); peer.gossip_broadcast_buffer.push_back(encoded_message); } @@ -1265,8 +1295,10 @@ impl { - match $thing { + ($peer: expr, $thing: expr) => {{ + let res = $thing; + let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None); + match res { Ok(x) => x, Err(e) => { match e.action { @@ -1276,7 +1308,7 @@ impl { @@ -1285,32 +1317,32 @@ impl { - log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_given_level!(logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); continue }, msgs::ErrorAction::IgnoreDuplicateGossip => continue, // Don't even bother logging these msgs::ErrorAction::IgnoreError => { - log_debug!(self.logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_debug!(logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); continue; }, msgs::ErrorAction::SendErrorMessage { msg } => { - log_debug!(self.logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_debug!(logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer_node_id), e.err); self.enqueue_message($peer, &msg); continue; }, msgs::ErrorAction::SendWarningMessage { msg, log_level } => { - log_given_level!(self.logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer_node_id), e.err); + log_given_level!(logger, log_level, "Error handling message{}; sending warning message with: {}", OptionalFromDebugger(&peer_node_id), e.err); self.enqueue_message($peer, &msg); continue; }, } } } - } + }} } let mut peer_lock = peer_mutex.lock().unwrap(); @@ -1335,9 +1367,10 @@ impl { + let logger = WithContext::from(&self.logger, Some(peer.their_node_id.unwrap().0), None); match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) { hash_map::Entry::Occupied(e) => { - log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); + log_trace!(logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event // Check that the peers map is consistent with the // node_id_to_descriptor map, as this has been broken @@ -1346,7 +1379,7 @@ impl { - log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0)); + log_debug!(logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0)); entry.insert(peer_descriptor.clone()) }, }; @@ -1414,6 +1447,7 @@ impl x, Err(e) => { @@ -1423,16 +1457,16 @@ impl { - log_gossip!(self.logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!"); + log_gossip!(logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!"); continue; } (msgs::DecodeError::UnsupportedCompression, _) => { - log_gossip!(self.logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message"); + log_gossip!(logger, "We don't support zlib-compressed message fields, sending a warning and ignoring message"); self.enqueue_message(peer, &msgs::WarningMessage { channel_id: ChannelId::new_zero(), data: "Unsupported message compression: zlib".to_owned() }); continue; } (_, Some(ty)) if is_gossip_msg(ty) => { - log_gossip!(self.logger, "Got an invalid value while deserializing a gossip message"); + log_gossip!(logger, "Got an invalid value while deserializing a gossip message"); self.enqueue_message(peer, &msgs::WarningMessage { channel_id: ChannelId::new_zero(), data: format!("Unreadable/bogus gossip message of type {}", ty), @@ -1440,16 +1474,16 @@ impl { - log_debug!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); + log_debug!(logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); return Err(PeerHandleError { }); } (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }), (msgs::DecodeError::InvalidValue, _) => { - log_debug!(self.logger, "Got an invalid value while deserializing message"); + log_debug!(logger, "Got an invalid value while deserializing message"); return Err(PeerHandleError { }); } (msgs::DecodeError::ShortRead, _) => { - log_debug!(self.logger, "Deserialization failed due to shortness of message"); + log_debug!(logger, "Deserialization failed due to shortness of message"); return Err(PeerHandleError { }); } (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { }), @@ -1499,6 +1533,7 @@ impl::Target as wire::CustomMessageReader>::CustomMessage> ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0; + let logger = WithContext::from(&self.logger, Some(their_node_id), None); peer_lock.received_message_since_timer_tick = true; // Need an Init as first message @@ -1516,7 +1551,7 @@ impl { - log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); + log_debug!(logger, "Got Err message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); self.message_handler.chan_handler.handle_error(&their_node_id, &msg); if msg.channel_id.is_zero() { return Err(PeerHandleError { }.into()); } }, wire::Message::Warning(msg) => { - log_debug!(self.logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); + log_debug!(logger, "Got warning message from {}: {}", log_pubkey!(their_node_id), PrintableString(&msg.data)); }, wire::Message::Ping(msg) => { @@ -1644,6 +1679,22 @@ impl { + self.message_handler.chan_handler.handle_stfu(&their_node_id, &msg); + } + + // Splicing messages: + wire::Message::Splice(msg) => { + self.message_handler.chan_handler.handle_splice(&their_node_id, &msg); + } + wire::Message::SpliceAck(msg) => { + self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg); + } + wire::Message::SpliceLocked(msg) => { + self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg); + } + // Interactive transaction construction messages: wire::Message::TxAddInput(msg) => { self.message_handler.chan_handler.handle_tx_add_input(&their_node_id, &msg); @@ -1753,11 +1804,11 @@ impl { - log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id); + log_debug!(logger, "Received unknown even message of type {}, disconnecting peer!", type_id); return Err(PeerHandleError { }.into()); }, wire::Message::Unknown(type_id) => { - log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id); + log_trace!(logger, "Received unknown odd message of type {}, ignoring", type_id); }, wire::Message::Custom(custom) => { self.message_handler.custom_message_handler.handle_custom_message(custom, &their_node_id)?; @@ -1774,6 +1825,7 @@ impl { @@ -1801,6 +1853,7 @@ impl { @@ -1828,6 +1881,7 @@ impl debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), @@ -1917,31 +1971,31 @@ impl { - log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", log_pubkey!(node_id), &msg.temporary_channel_id, log_funding_channel_id!(msg.funding_txid, msg.funding_output_index)); @@ -1950,79 +2004,107 @@ impl { - log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendFundingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReady event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, + MessageSendEvent::SendStfu { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSplice { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendSplice event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => { + let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)); + log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxComplete event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxSignatures event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAbort event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { - log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id)), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", log_pubkey!(node_id), update_add_htlcs.len(), update_fulfill_htlcs.len(), @@ -2047,31 +2129,31 @@ impl { - log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendClosingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { - log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { - log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", + log_debug!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", log_pubkey!(node_id), msg.contents.short_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); @@ -2109,18 +2191,19 @@ impl { - log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", + log_trace!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", log_pubkey!(node_id), msg.contents.short_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::HandleError { node_id, action } => { + let logger = WithContext::from(&self.logger, Some(node_id), None); match action { msgs::ErrorAction::DisconnectPeer { msg } => { if let Some(msg) = msg.as_ref() { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); } else { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}", + log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}", log_pubkey!(node_id)); } // We do not have the peers write lock, so we just store that we're @@ -2130,7 +2213,7 @@ impl { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); // We do not have the peers write lock, so we just store that we're // about to disconenct the peer and do it after we finish @@ -2138,20 +2221,20 @@ impl { - log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); + log_given_level!(logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); }, msgs::ErrorAction::IgnoreDuplicateGossip => {}, msgs::ErrorAction::IgnoreError => { - log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); + log_debug!(logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); }, msgs::ErrorAction::SendErrorMessage { ref msg } => { - log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", + log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); }, msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => { - log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", + log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); @@ -2165,7 +2248,7 @@ impl { - log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", + log_gossip!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", log_pubkey!(node_id), msg.short_channel_ids.len(), msg.first_blocknum, @@ -2239,7 +2322,7 @@ impl { let peer = peer_lock.lock().unwrap(); if let Some((node_id, _)) = peer.their_node_id { - log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id)); + log_trace!(WithContext::from(&self.logger, Some(node_id), None), "Handling disconnection of peer {}", log_pubkey!(node_id)); let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); debug_assert!(removed.is_some(), "descriptor maps should be consistent"); if !peer.handshake_complete() { return; } @@ -2405,7 +2488,6 @@ impl