use util::ser::{VecWriter, Writeable, Writer};
use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
use ln::wire;
-use util::byte_utils;
+use ln::wire::Encode;
+use util::atomic_counter::AtomicCounter;
use util::events::{MessageSendEvent, MessageSendEventsProvider};
use util::logger::Logger;
-use routing::network_graph::NetGraphMsgHandler;
+use routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
use prelude::*;
use io;
use alloc::collections::LinkedList;
-use alloc::fmt::Debug;
use sync::{Arc, Mutex};
-use core::sync::atomic::{AtomicUsize, Ordering};
use core::{cmp, hash, fmt, mem};
use core::ops::Deref;
+use core::convert::Infallible;
#[cfg(feature = "std")] use std::error;
use bitcoin::hashes::sha256::Hash as Sha256;
pub trait CustomMessageHandler: wire::CustomMessageReader {
/// Called with the message type that was received and the buffer to be read.
/// Can return a `MessageHandlingError` if the message could not be handled.
- fn handle_custom_message(&self, msg: Self::CustomMessage) -> Result<(), LightningError>;
+ fn handle_custom_message(&self, msg: Self::CustomMessage, sender_node_id: &PublicKey) -> Result<(), LightningError>;
/// Gets the list of pending messages which were generated by the custom message
/// handler, clearing the list in the process. The first tuple element must
fn deref(&self) -> &Self { self }
}
-impl wire::Type for () {
+// Implement Type for Infallible, note that it cannot be constructed, and thus you can never call a
+// method that takes self for it.
+impl wire::Type for Infallible {
fn type_id(&self) -> u16 {
- // We should never call this for `DummyCustomType`
unreachable!();
}
}
-
-impl Writeable for () {
+impl Writeable for Infallible {
fn write<W: Writer>(&self, _: &mut W) -> Result<(), io::Error> {
unreachable!();
}
}
impl wire::CustomMessageReader for IgnoringMessageHandler {
- type CustomMessage = ();
+ type CustomMessage = Infallible;
fn read<R: io::Read>(&self, _message_type: u16, _buffer: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
Ok(None)
}
}
impl CustomMessageHandler for IgnoringMessageHandler {
- fn handle_custom_message(&self, _msg: Self::CustomMessage) -> Result<(), LightningError> {
+ fn handle_custom_message(&self, _msg: Infallible, _sender_node_id: &PublicKey) -> Result<(), LightningError> {
// Since we always return `None` in the read the handle method should never be called.
unreachable!();
}
NodesSyncing(PublicKey),
}
+/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
+/// forwarding gossip messages to peers altogether.
+const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
+
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
/// we have fewer than this many messages in the outbound buffer again.
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
/// the peer.
-const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = 20;
+const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
+
+/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
+/// the socket receive buffer before receiving the ping.
+///
+/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
+/// including any network delays, outbound traffic, or the same for messages from other peers.
+///
+/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
+/// per connected peer to respond to a ping, as long as they send us at least one message during
+/// each tick, ensuring we aren't actually just disconnected.
+/// With a timer tick interval of five seconds, this translates to about 30 seconds per connected
+/// peer.
+///
+/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per
+/// two connected peers, assuming most LDK-running systems have at least two cores.
+const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 6;
+
+/// This is the minimum number of messages we expect a peer to be able to handle within one timer
+/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
+/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
+/// process before the next ping.
+const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
struct Peer {
channel_encryptor: PeerChannelEncryptor,
sync_status: InitSyncTracker,
- awaiting_pong: bool,
+ msgs_sent_since_pong: usize,
+ awaiting_pong_timer_tick_intervals: i8,
+ received_message_since_timer_tick: bool,
}
impl Peer {
node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
}
-#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
-fn _check_usize_is_32_or_64() {
- // See below, less than 32 bit pointers may be unsafe here!
- unsafe { mem::transmute::<*const usize, [u8; 4]>(panic!()); }
-}
-
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
/// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
/// lifetimes). Other times you can afford a reference, which is more efficient, in which case
/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
/// issues such as overly long function definitions.
-pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<NetGraphMsgHandler<Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<NetGraphMsgHandler<Arc<NetworkGraph>, Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
/// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
/// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
/// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
/// But if this is not necessary, using a reference is more efficient. Defining these type aliases
/// helps with issues such as long function definitions.
-pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L, IgnoringMessageHandler>;
+pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e NetGraphMsgHandler<&'g NetworkGraph, &'h C, &'f L>, &'f L, IgnoringMessageHandler>;
/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
/// socket events into messages which it passes on to its [`MessageHandler`].
ephemeral_key_midstate: Sha256Engine,
custom_message_handler: CMH,
- // Usize needs to be at least 32 bits to avoid overflowing both low and high. If usize is 64
- // bits we will never realistically count into high:
- peer_counter_low: AtomicUsize,
- peer_counter_high: AtomicUsize,
+ peer_counter: AtomicCounter,
logger: L,
}
}
}
+/// A simple wrapper that optionally prints " from <pubkey>" for an optional pubkey.
+/// This works around `format!()` taking a reference to each argument, preventing
+/// `if let Some(node_id) = peer.their_node_id { format!(.., node_id) } else { .. }` from compiling
+/// due to lifetime errors.
+struct OptionalFromDebugger<'a>(&'a Option<PublicKey>);
+impl core::fmt::Display for OptionalFromDebugger<'_> {
+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
+ if let Some(node_id) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) }
+ }
+}
+
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> PeerManager<Descriptor, CM, RM, L, CMH> where
CM::Target: ChannelMessageHandler,
RM::Target: RoutingMessageHandler,
L::Target: Logger,
- CMH::Target: CustomMessageHandler + wire::CustomMessageReader {
+ CMH::Target: CustomMessageHandler {
/// Constructs a new PeerManager with the given message handlers and node_id secret key
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
}),
our_node_secret,
ephemeral_key_midstate,
- peer_counter_low: AtomicUsize::new(0),
- peer_counter_high: AtomicUsize::new(0),
+ peer_counter: AtomicCounter::new(),
logger,
custom_message_handler,
}
fn get_ephemeral_key(&self) -> SecretKey {
let mut ephemeral_hash = self.ephemeral_key_midstate.clone();
- let low = self.peer_counter_low.fetch_add(1, Ordering::AcqRel);
- let high = if low == 0 {
- self.peer_counter_high.fetch_add(1, Ordering::AcqRel)
- } else {
- self.peer_counter_high.load(Ordering::Acquire)
- };
- ephemeral_hash.input(&byte_utils::le64_to_array(low as u64));
- ephemeral_hash.input(&byte_utils::le64_to_array(high as u64));
+ let counter = self.peer_counter.get_increment();
+ ephemeral_hash.input(&counter.to_le_bytes());
SecretKey::from_slice(&Sha256::from_engine(ephemeral_hash).into_inner()).expect("You broke SHA-256!")
}
sync_status: InitSyncTracker::NoSyncRequested,
- awaiting_pong: false,
+ msgs_sent_since_pong: 0,
+ awaiting_pong_timer_tick_intervals: 0,
+ received_message_since_timer_tick: false,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
sync_status: InitSyncTracker::NoSyncRequested,
- awaiting_pong: false,
+ msgs_sent_since_pong: 0,
+ awaiting_pong_timer_tick_intervals: 0,
+ received_message_since_timer_tick: false,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
while !peer.awaiting_write_event {
- if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
+ if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
match peer.sync_status {
InitSyncTracker::NoSyncRequested => {},
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
},
}
}
+ if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
+ self.maybe_send_extra_ping(peer);
+ }
if {
let next_buff = match peer.pending_outbound_buffer.front() {
}
}
- /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
- fn enqueue_message<M: wire::Type + Writeable + Debug>(&self, peer: &mut Peer, message: &M) {
- let mut buffer = VecWriter(Vec::new());
+ /// Append a message to a peer's pending outbound/write buffer
+ fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
+ peer.msgs_sent_since_pong += 1;
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
+ }
+
+ /// Append a message to a peer's pending outbound/write buffer
+ fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
+ let mut buffer = VecWriter(Vec::with_capacity(2048));
wire::write(message, &mut buffer).unwrap(); // crash if the write failed
- let encoded_message = buffer.0;
- log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()));
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
+ if is_gossip_msg(message.type_id()) {
+ log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()));
+ } else {
+ log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()))
+ }
+ self.enqueue_encoded_message(peer, &buffer.0);
}
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
match e.action {
msgs::ErrorAction::DisconnectPeer { msg: _ } => {
//TODO: Try to push msg
- log_debug!(self.logger, "Error handling message; disconnecting peer with: {}", e.err);
+ log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer.their_node_id), e.err);
return Err(PeerHandleError{ no_connection_possible: false });
},
msgs::ErrorAction::IgnoreAndLog(level) => {
- log_given_level!(self.logger, level, "Error handling message; ignoring: {}", e.err);
+ log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer.their_node_id), e.err);
continue
},
+ msgs::ErrorAction::IgnoreDuplicateGossip => continue, // Don't even bother logging these
msgs::ErrorAction::IgnoreError => {
- log_debug!(self.logger, "Error handling message; ignoring: {}", e.err);
+ log_debug!(self.logger, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer.their_node_id), e.err);
continue;
},
msgs::ErrorAction::SendErrorMessage { msg } => {
- log_debug!(self.logger, "Error handling message; sending error message with: {}", e.err);
+ log_debug!(self.logger, "Error handling message{}; sending error message with: {}", OptionalFromDebugger(&peer.their_node_id), e.err);
self.enqueue_message(peer, &msg);
continue;
},
let features = InitFeatures::known();
let resp = msgs::Init { features };
self.enqueue_message(peer, &resp);
+ peer.awaiting_pong_timer_tick_intervals = 0;
},
NextNoiseStep::ActThree => {
let their_node_id = try_potential_handleerror!(peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..]));
let features = InitFeatures::known();
let resp = msgs::Init { features };
self.enqueue_message(peer, &resp);
+ peer.awaiting_pong_timer_tick_intervals = 0;
},
NextNoiseStep::NoiseComplete => {
if peer.pending_read_is_header {
match e {
msgs::DecodeError::UnknownVersion => return Err(PeerHandleError { no_connection_possible: false }),
msgs::DecodeError::UnknownRequiredFeature => {
- log_trace!(self.logger, "Got a channel/node announcement with an known required feature flag, you may want to update!");
+ log_gossip!(self.logger, "Got a channel/node announcement with an unknown required feature flag, you may want to update!");
continue;
}
msgs::DecodeError::InvalidValue => {
msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }),
msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }),
msgs::DecodeError::UnsupportedCompression => {
- log_trace!(self.logger, "We don't support zlib-compressed message fields, ignoring message");
+ log_gossip!(self.logger, "We don't support zlib-compressed message fields, ignoring message");
continue;
}
}
peer: &mut Peer,
message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
- log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap()));
+ if is_gossip_msg(message.type_id()) {
+ log_gossip!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap()));
+ } else {
+ log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap()));
+ }
+
+ peer.received_message_since_timer_tick = true;
// Need an Init as first message
if let wire::Message::Init(_) = message {
return Err(PeerHandleError{ no_connection_possible: false }.into());
}
- log_info!(self.logger, "Received peer Init message: {}", msg.features);
+ log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.features);
if msg.features.initial_routing_sync() {
peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
}
},
wire::Message::Pong(_msg) => {
- peer.awaiting_pong = false;
+ peer.awaiting_pong_timer_tick_intervals = 0;
+ peer.msgs_sent_since_pong = 0;
},
// Channel messages:
log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id);
},
wire::Message::Custom(custom) => {
- self.custom_message_handler.handle_custom_message(custom)?;
+ self.custom_message_handler.handle_custom_message(custom, &peer.their_node_id.unwrap())?;
},
};
Ok(should_forward)
fn forward_broadcast_msg(&self, peers: &mut PeerHolder<Descriptor>, msg: &wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
match msg {
wire::Message::ChannelAnnouncement(ref msg) => {
- log_trace!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
+ log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
let encoded_msg = encode_msg!(msg);
for (_, peer) in peers.peers.iter_mut() {
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
- log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
+ if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+ || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
+ {
+ log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if peer.their_node_id.as_ref() == Some(&msg.contents.node_id_1) ||
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.enqueue_encoded_message(peer, &encoded_msg);
}
},
wire::Message::NodeAnnouncement(ref msg) => {
- log_trace!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg);
+ log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg);
let encoded_msg = encode_msg!(msg);
for (_, peer) in peers.peers.iter_mut() {
!peer.should_forward_node_announcement(msg.contents.node_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
- log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
+ if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+ || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
+ {
+ log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if peer.their_node_id.as_ref() == Some(&msg.contents.node_id) {
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.enqueue_encoded_message(peer, &encoded_msg);
}
},
wire::Message::ChannelUpdate(ref msg) => {
- log_trace!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg);
+ log_gossip!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg);
let encoded_msg = encode_msg!(msg);
for (_, peer) in peers.peers.iter_mut() {
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
- log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
+ if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+ || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
+ {
+ log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.enqueue_encoded_message(peer, &encoded_msg);
}
},
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
/// May call [`send_data`] on [`SocketDescriptor`]s. Thus, be very careful with reentrancy
/// issues!
///
+ /// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
+ /// or one of the other clients provided in our language bindings.
+ ///
/// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
/// [`send_data`]: SocketDescriptor::send_data
},
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
- if self.message_handler.route_handler.handle_channel_announcement(&msg).is_ok() && self.message_handler.route_handler.handle_channel_update(&update_msg).is_ok() {
- self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None);
- self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None);
+ match self.message_handler.route_handler.handle_channel_announcement(&msg) {
+ Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
+ self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
+ _ => {},
+ }
+ match self.message_handler.route_handler.handle_channel_update(&update_msg) {
+ Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
+ self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None),
+ _ => {},
}
},
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler");
- if self.message_handler.route_handler.handle_node_announcement(&msg).is_ok() {
- self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None);
+ match self.message_handler.route_handler.handle_node_announcement(&msg) {
+ Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
+ self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
+ _ => {},
}
},
MessageSendEvent::BroadcastChannelUpdate { msg } => {
log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
- if self.message_handler.route_handler.handle_channel_update(&msg).is_ok() {
- self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None);
+ match self.message_handler.route_handler.handle_channel_update(&msg) {
+ Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
+ self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
+ _ => {},
}
},
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
// room in the send buffer, put the error message there...
self.do_attempt_write_data(&mut descriptor, &mut peer);
} else {
- log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
+ log_gossip!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
}
}
descriptor.disconnect_socket();
msgs::ErrorAction::IgnoreAndLog(level) => {
log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
+ msgs::ErrorAction::IgnoreDuplicateGossip => {},
msgs::ErrorAction::IgnoreError => {
log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
},
self.enqueue_message(get_peer_for_forwarding!(node_id), msg);
}
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
- log_trace!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
+ log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
log_pubkey!(node_id),
msg.short_channel_ids.len(),
msg.first_blocknum,
}
}
+ /// Disconnects all currently-connected peers. This is useful on platforms where there may be
+ /// an indication that TCP sockets have stalled even if we weren't around to time them out
+ /// using regular ping/pongs.
+ pub fn disconnect_all_peers(&self) {
+ let mut peers_lock = self.peers.lock().unwrap();
+ let peers = &mut *peers_lock;
+ for (mut descriptor, peer) in peers.peers.drain() {
+ if let Some(node_id) = peer.their_node_id {
+ log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
+ peers.node_id_to_descriptor.remove(&node_id);
+ self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ }
+ descriptor.disconnect_socket();
+ }
+ debug_assert!(peers.node_id_to_descriptor.is_empty());
+ }
+
+ /// This is called when we're blocked on sending additional gossip messages until we receive a
+ /// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting
+ /// `awaiting_pong_timer_tick_intervals` to a special flag value to indicate this).
+ fn maybe_send_extra_ping(&self, peer: &mut Peer) {
+ if peer.awaiting_pong_timer_tick_intervals == 0 {
+ peer.awaiting_pong_timer_tick_intervals = -1;
+ let ping = msgs::Ping {
+ ponglen: 0,
+ byteslen: 64,
+ };
+ self.enqueue_message(peer, &ping);
+ }
+ }
+
/// Send pings to each peer and disconnect those which did not respond to the last round of
/// pings.
///
let node_id_to_descriptor = &mut peers.node_id_to_descriptor;
let peers = &mut peers.peers;
let mut descriptors_needing_disconnect = Vec::new();
+ let peer_count = peers.len();
peers.retain(|descriptor, peer| {
- if peer.awaiting_pong {
+ let mut do_disconnect_peer = false;
+ if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
+ // The peer needs to complete its handshake before we can exchange messages. We
+ // give peers one timer tick to complete handshake, reusing
+ // `awaiting_pong_timer_tick_intervals` to track number of timer ticks taken
+ // for handshake completion.
+ if peer.awaiting_pong_timer_tick_intervals != 0 {
+ do_disconnect_peer = true;
+ } else {
+ peer.awaiting_pong_timer_tick_intervals = 1;
+ return true;
+ }
+ }
+
+ if peer.awaiting_pong_timer_tick_intervals == -1 {
+ // Magic value set in `maybe_send_extra_ping`.
+ peer.awaiting_pong_timer_tick_intervals = 1;
+ peer.received_message_since_timer_tick = false;
+ return true;
+ }
+
+ if do_disconnect_peer
+ || (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
+ || peer.awaiting_pong_timer_tick_intervals as u64 >
+ MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64
+ {
descriptors_needing_disconnect.push(descriptor.clone());
match peer.their_node_id {
Some(node_id) => {
node_id_to_descriptor.remove(&node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
}
- None => {
- // This can't actually happen as we should have hit
- // is_ready_for_encryption() previously on this same peer.
- unreachable!();
- },
+ None => {},
}
return false;
}
+ peer.received_message_since_timer_tick = false;
- if !peer.channel_encryptor.is_ready_for_encryption() {
- // The peer needs to complete its handshake before we can exchange messages
+ if peer.awaiting_pong_timer_tick_intervals > 0 {
+ peer.awaiting_pong_timer_tick_intervals += 1;
return true;
}
+ peer.awaiting_pong_timer_tick_intervals = 1;
let ping = msgs::Ping {
ponglen: 0,
byteslen: 64,
};
self.enqueue_message(peer, &ping);
+ self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
- let mut descriptor_clone = descriptor.clone();
- self.do_attempt_write_data(&mut descriptor_clone, peer);
-
- peer.awaiting_pong = true;
true
});
}
}
+fn is_gossip_msg(type_id: u16) -> bool {
+ match type_id {
+ msgs::ChannelAnnouncement::TYPE |
+ msgs::ChannelUpdate::TYPE |
+ msgs::NodeAnnouncement::TYPE => true,
+ _ => false
+ }
+}
+
#[cfg(test)]
mod tests {
use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
// than can fit into a peer's buffer).
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
- // Make each peer to read the messages that the other peer just wrote to them.
- peers[0].process_events();
- peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
- peers[1].process_events();
- peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
+ // Make each peer to read the messages that the other peer just wrote to them. Note that
+ // due to the max-messagse-before-ping limits this may take a few iterations to complete.
+ for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
+ peers[0].process_events();
+ let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert!(!b_read_data.is_empty());
+
+ peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
+ peers[1].process_events();
+
+ let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+ assert!(!a_read_data.is_empty());
+ peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
+
+ peers[1].process_events();
+ assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
+ }
// Check that each peer has received the expected number of channel updates and channel
// announcements.
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
}
+
+ #[test]
+ fn test_handshake_timeout() {
+ // Tests that we time out a peer still waiting on handshake completion after a full timer
+ // tick.
+ let cfgs = create_peermgr_cfgs(2);
+ cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
+ cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
+ let peers = create_network(2, &cfgs);
+
+ let secp_ctx = Secp256k1::new();
+ let a_id = PublicKey::from_secret_key(&secp_ctx, &peers[0].our_node_secret);
+ let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
+ let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
+ let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone()).unwrap();
+ peers[0].new_inbound_connection(fd_a.clone()).unwrap();
+
+ // If we get a single timer tick before completion, that's fine
+ assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
+ peers[0].timer_tick_occurred();
+ assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
+
+ assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
+ peers[0].process_events();
+ assert_eq!(peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+ peers[1].process_events();
+
+ // ...but if we get a second timer tick, we should disconnect the peer
+ peers[0].timer_tick_occurred();
+ assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
+
+ assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err());
+ }
}