use crate::sign::{NodeSigner, Recipient};
use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
-use crate::ln::ChannelId;
+use crate::ln::types::ChannelId;
use crate::ln::features::{InitFeatures, NodeFeatures};
use crate::ln::msgs;
use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler};
use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE};
use crate::ln::wire;
use crate::ln::wire::{Encode, Type};
-use crate::onion_message::messenger::{CustomOnionMessageHandler, PendingOnionMessage};
+use crate::onion_message::messenger::{CustomOnionMessageHandler, PendingOnionMessage, Responder, ResponseInstruction};
use crate::onion_message::offers::{OffersMessage, OffersMessageHandler};
use crate::onion_message::packet::OnionMessageContents;
use crate::routing::gossip::{NodeId, NodeAlias};
}
fn processing_queue_high(&self) -> bool { false }
}
+
impl OnionMessageHandler for IgnoringMessageHandler {
fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
InitFeatures::empty()
}
}
+
impl OffersMessageHandler for IgnoringMessageHandler {
- fn handle_message(&self, _msg: OffersMessage) -> Option<OffersMessage> { None }
+ fn handle_message(&self, _message: OffersMessage, _responder: Option<Responder>) -> ResponseInstruction<OffersMessage> {
+ ResponseInstruction::NoResponse
+ }
}
impl CustomOnionMessageHandler for IgnoringMessageHandler {
type CustomMessage = Infallible;
- fn handle_custom_message(&self, _msg: Infallible) -> Option<Infallible> {
+ fn handle_custom_message(&self, _message: Self::CustomMessage, _responder: Option<Responder>) -> ResponseInstruction<Self::CustomMessage> {
// Since we always return `None` in the read the handle method should never be called.
unreachable!();
}
impl OnionMessageContents for Infallible {
fn tlv_type(&self) -> u64 { unreachable!(); }
+ fn msg_type(&self) -> &'static str { unreachable!(); }
}
impl Deref for IgnoringMessageHandler {
fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) {
ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
}
+ #[cfg(splicing)]
fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) {
ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
}
+ #[cfg(splicing)]
fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) {
ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
}
+ #[cfg(splicing)]
fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) {
ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
}
/// Append a message to a peer's pending outbound/write buffer
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
- let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+ let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
if is_gossip_msg(message.type_id()) {
log_gossip!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0));
} else {
macro_rules! try_potential_handleerror {
($peer: expr, $thing: expr) => {{
let res = $thing;
- let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None);
+ let logger = WithContext::from(&self.logger, peer_node_id.map(|(id, _)| id), None, None);
match res {
Ok(x) => x,
Err(e) => {
macro_rules! insert_node_id {
() => {
- let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+ let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) {
hash_map::Entry::Occupied(e) => {
log_trace!(logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0));
let networks = self.message_handler.chan_handler.get_chain_hashes();
let resp = msgs::Init { features, networks, remote_network_address: filter_addresses(peer.their_socket_address.clone()) };
self.enqueue_message(peer, &resp);
- peer.awaiting_pong_timer_tick_intervals = 0;
},
NextNoiseStep::ActThree => {
let their_node_id = try_potential_handleerror!(peer,
let networks = self.message_handler.chan_handler.get_chain_hashes();
let resp = msgs::Init { features, networks, remote_network_address: filter_addresses(peer.their_socket_address.clone()) };
self.enqueue_message(peer, &resp);
- peer.awaiting_pong_timer_tick_intervals = 0;
},
NextNoiseStep::NoiseComplete => {
if peer.pending_read_is_header {
peer.pending_read_buffer.resize(18, 0);
peer.pending_read_is_header = true;
- let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+ let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
let message = match message_result {
Ok(x) => x,
Err(e) => {
message: wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>
) -> Result<Option<wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0;
- let logger = WithContext::from(&self.logger, Some(their_node_id), None);
+ let logger = WithContext::from(&self.logger, Some(their_node_id), None, None);
let message = match self.do_handle_message_holding_peer_lock(peer_lock, message, &their_node_id, &logger)? {
Some(processed_message) => processed_message,
return Err(PeerHandleError { }.into());
}
+ peer_lock.awaiting_pong_timer_tick_intervals = 0;
peer_lock.their_features = Some(msg.features);
return Ok(None);
} else if peer_lock.their_features.is_none() {
self.message_handler.chan_handler.handle_stfu(&their_node_id, &msg);
}
+ #[cfg(splicing)]
// Splicing messages:
wire::Message::Splice(msg) => {
self.message_handler.chan_handler.handle_splice(&their_node_id, &msg);
}
+ #[cfg(splicing)]
wire::Message::SpliceAck(msg) => {
self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg);
}
+ #[cfg(splicing)]
wire::Message::SpliceLocked(msg) => {
self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg);
}
}
debug_assert!(peer.their_node_id.is_some());
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
- let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+ let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
debug_assert!(peer.their_node_id.is_some());
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
- let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+ let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
debug_assert!(peer.their_node_id.is_some());
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
- let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None);
+ let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
for event in events_generated.drain(..) {
match event {
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id)), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id)), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
log_pubkey!(node_id),
&msg.temporary_channel_id,
ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index));
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendStfu { ref node_id, ref msg} => {
- let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id));
+ let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendSplice { ref node_id, ref msg} => {
- let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id));
+ let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendSplice event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => {
- let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id));
+ let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => {
- let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id));
+ let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id)), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
log_pubkey!(node_id),
update_add_htlcs.len(),
update_fulfill_htlcs.len(),
self.enqueue_message(&mut *peer, commitment_signed);
},
MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling Shutdown event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id)), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
- log_debug!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
+ log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
log_pubkey!(node_id),
msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
},
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
- log_trace!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
+ log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
log_pubkey!(node_id), msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
MessageSendEvent::HandleError { node_id, action } => {
- let logger = WithContext::from(&self.logger, Some(node_id), None);
+ let logger = WithContext::from(&self.logger, Some(node_id), None, None);
match action {
msgs::ErrorAction::DisconnectPeer { msg } => {
if let Some(msg) = msg.as_ref() {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
- log_gossip!(WithContext::from(&self.logger, Some(*node_id), None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
+ log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
log_pubkey!(node_id),
msg.short_channel_ids.len(),
msg.first_blocknum,
debug_assert!(peer.their_node_id.is_some());
if let Some((node_id, _)) = peer.their_node_id {
- log_trace!(WithContext::from(&self.logger, Some(node_id), None), "Disconnecting peer with id {} due to {}", node_id, reason);
+ log_trace!(WithContext::from(&self.logger, Some(node_id), None, None), "Disconnecting peer with id {} due to {}", node_id, reason);
self.message_handler.chan_handler.peer_disconnected(&node_id);
self.message_handler.onion_message_handler.peer_disconnected(&node_id);
}
Some(peer_lock) => {
let peer = peer_lock.lock().unwrap();
if let Some((node_id, _)) = peer.their_node_id {
- log_trace!(WithContext::from(&self.logger, Some(node_id), None), "Handling disconnection of peer {}", log_pubkey!(node_id));
+ log_trace!(WithContext::from(&self.logger, Some(node_id), None, None), "Handling disconnection of peer {}", log_pubkey!(node_id));
let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
debug_assert!(removed.is_some(), "descriptor maps should be consistent");
if !peer.handshake_complete() { return; }
use crate::sign::{NodeSigner, Recipient};
use crate::events;
use crate::io;
- use crate::ln::ChannelId;
+ use crate::ln::types::ChannelId;
use crate::ln::features::{InitFeatures, NodeFeatures};
use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
- use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses};
+ use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER};
use crate::ln::{msgs, wire};
use crate::ln::msgs::{LightningError, SocketAddress};
use crate::util::test_utils;
assert!(peers[0].read_event(&mut fd_a, &b_data).is_err());
}
+ #[test]
+ fn test_inbound_conn_handshake_complete_awaiting_pong() {
+ // Test that we do not disconnect an outbound peer after the noise handshake completes due
+ // to a pong timeout for a ping that was never sent if a timer tick fires after we send act
+ // two of the noise handshake along with our init message but before we receive their init
+ // message.
+ let logger = test_utils::TestLogger::new();
+ let node_signer_a = test_utils::TestNodeSigner::new(SecretKey::from_slice(&[42; 32]).unwrap());
+ let node_signer_b = test_utils::TestNodeSigner::new(SecretKey::from_slice(&[43; 32]).unwrap());
+ let peer_a = PeerManager::new(MessageHandler {
+ chan_handler: ErroringMessageHandler::new(),
+ route_handler: IgnoringMessageHandler {},
+ onion_message_handler: IgnoringMessageHandler {},
+ custom_message_handler: IgnoringMessageHandler {},
+ }, 0, &[0; 32], &logger, &node_signer_a);
+ let peer_b = PeerManager::new(MessageHandler {
+ chan_handler: ErroringMessageHandler::new(),
+ route_handler: IgnoringMessageHandler {},
+ onion_message_handler: IgnoringMessageHandler {},
+ custom_message_handler: IgnoringMessageHandler {},
+ }, 0, &[1; 32], &logger, &node_signer_b);
+
+ let a_id = node_signer_a.get_node_id(Recipient::Node).unwrap();
+ let mut fd_a = FileDescriptor {
+ fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+ disconnect: Arc::new(AtomicBool::new(false)),
+ };
+ let mut fd_b = FileDescriptor {
+ fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+ disconnect: Arc::new(AtomicBool::new(false)),
+ };
+
+ // Exchange messages with both peers until they both complete the init handshake.
+ let act_one = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
+ peer_a.new_inbound_connection(fd_a.clone(), None).unwrap();
+
+ assert_eq!(peer_a.read_event(&mut fd_a, &act_one).unwrap(), false);
+ peer_a.process_events();
+
+ let act_two = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert_eq!(peer_b.read_event(&mut fd_b, &act_two).unwrap(), false);
+ peer_b.process_events();
+
+ // Calling this here triggers the race on inbound connections.
+ peer_b.timer_tick_occurred();
+
+ let act_three_with_init_b = fd_b.outbound_data.lock().unwrap().split_off(0);
+ assert!(!peer_a.peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().handshake_complete());
+ assert_eq!(peer_a.read_event(&mut fd_a, &act_three_with_init_b).unwrap(), false);
+ peer_a.process_events();
+ assert!(peer_a.peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().handshake_complete());
+
+ let init_a = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert!(!init_a.is_empty());
+
+ assert!(!peer_b.peers.read().unwrap().get(&fd_b).unwrap().lock().unwrap().handshake_complete());
+ assert_eq!(peer_b.read_event(&mut fd_b, &init_a).unwrap(), false);
+ peer_b.process_events();
+ assert!(peer_b.peers.read().unwrap().get(&fd_b).unwrap().lock().unwrap().handshake_complete());
+
+ // Make sure we're still connected.
+ assert_eq!(peer_b.peers.read().unwrap().len(), 1);
+
+ // B should send a ping on the first timer tick after `handshake_complete`.
+ assert!(fd_b.outbound_data.lock().unwrap().split_off(0).is_empty());
+ peer_b.timer_tick_occurred();
+ peer_b.process_events();
+ assert!(!fd_b.outbound_data.lock().unwrap().split_off(0).is_empty());
+
+ let mut send_warning = || {
+ {
+ let peers = peer_a.peers.read().unwrap();
+ let mut peer_b = peers.get(&fd_a).unwrap().lock().unwrap();
+ peer_a.enqueue_message(&mut peer_b, &msgs::WarningMessage {
+ channel_id: ChannelId([0; 32]),
+ data: "no disconnect plz".to_string(),
+ });
+ }
+ peer_a.process_events();
+ let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert!(!msg.is_empty());
+ assert_eq!(peer_b.read_event(&mut fd_b, &msg).unwrap(), false);
+ peer_b.process_events();
+ };
+
+ // Fire more ticks until we reach the pong timeout. We send any message except pong to
+ // pretend the connection is still alive.
+ send_warning();
+ for _ in 0..MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER {
+ peer_b.timer_tick_occurred();
+ send_warning();
+ }
+ assert_eq!(peer_b.peers.read().unwrap().len(), 1);
+
+ // One more tick should enforce the pong timeout.
+ peer_b.timer_tick_occurred();
+ assert_eq!(peer_b.peers.read().unwrap().len(), 0);
+ }
+
#[test]
fn test_filter_addresses(){
// Tests the filter_addresses function.