use prelude::*;
use io;
use alloc::collections::LinkedList;
-use alloc::fmt::Debug;
use sync::{Arc, Mutex};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::{cmp, hash, fmt, mem};
use core::ops::Deref;
+use core::convert::Infallible;
#[cfg(feature = "std")] use std::error;
use bitcoin::hashes::sha256::Hash as Sha256;
fn deref(&self) -> &Self { self }
}
-impl wire::Type for () {
+// Implement Type for Infallible, note that it cannot be constructed, and thus you can never call a
+// method that takes self for it.
+impl wire::Type for Infallible {
fn type_id(&self) -> u16 {
- // We should never call this for `DummyCustomType`
unreachable!();
}
}
-
-impl Writeable for () {
+impl Writeable for Infallible {
fn write<W: Writer>(&self, _: &mut W) -> Result<(), io::Error> {
unreachable!();
}
}
impl wire::CustomMessageReader for IgnoringMessageHandler {
- type CustomMessage = ();
+ type CustomMessage = Infallible;
fn read<R: io::Read>(&self, _message_type: u16, _buffer: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
Ok(None)
}
}
impl CustomMessageHandler for IgnoringMessageHandler {
- fn handle_custom_message(&self, _msg: Self::CustomMessage, _sender_node_id: &PublicKey) -> Result<(), LightningError> {
+ fn handle_custom_message(&self, _msg: Infallible, _sender_node_id: &PublicKey) -> Result<(), LightningError> {
// Since we always return `None` in the read the handle method should never be called.
unreachable!();
}
NodesSyncing(PublicKey),
}
+/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
+/// forwarding gossip messages to peers altogether.
+const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
+
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
/// we have fewer than this many messages in the outbound buffer again.
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
/// the peer.
-const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = 20;
+const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
+
+/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
+/// the socket receive buffer before receiving the ping.
+///
+/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
+/// including any network delays, outbound traffic, or the same for messages from other peers.
+///
+/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
+/// per connected peer to respond to a ping, as long as they send us at least one message during
+/// each tick, ensuring we aren't actually just disconnected.
+/// With a timer tick interval of five seconds, this translates to about 30 seconds per connected
+/// peer.
+///
+/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per
+/// two connected peers, assuming most LDK-running systems have at least two cores.
+const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 6;
+
+/// This is the minimum number of messages we expect a peer to be able to handle within one timer
+/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
+/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
+/// process before the next ping.
+const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
struct Peer {
channel_encryptor: PeerChannelEncryptor,
sync_status: InitSyncTracker,
- awaiting_pong: bool,
+ msgs_sent_since_pong: usize,
+ awaiting_pong_tick_intervals: i8,
+ received_message_since_timer_tick: bool,
}
impl Peer {
CM::Target: ChannelMessageHandler,
RM::Target: RoutingMessageHandler,
L::Target: Logger,
- CMH::Target: CustomMessageHandler + wire::CustomMessageReader {
+ CMH::Target: CustomMessageHandler {
/// Constructs a new PeerManager with the given message handlers and node_id secret key
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
sync_status: InitSyncTracker::NoSyncRequested,
- awaiting_pong: false,
+ msgs_sent_since_pong: 0,
+ awaiting_pong_tick_intervals: 0,
+ received_message_since_timer_tick: false,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
sync_status: InitSyncTracker::NoSyncRequested,
- awaiting_pong: false,
+ msgs_sent_since_pong: 0,
+ awaiting_pong_tick_intervals: 0,
+ received_message_since_timer_tick: false,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
while !peer.awaiting_write_event {
- if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
+ if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
match peer.sync_status {
InitSyncTracker::NoSyncRequested => {},
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
},
}
}
+ if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
+ self.maybe_send_extra_ping(peer);
+ }
if {
let next_buff = match peer.pending_outbound_buffer.front() {
}
}
- /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
- fn enqueue_message<M: wire::Type + Writeable + Debug>(&self, peer: &mut Peer, message: &M) {
+ /// Append a message to a peer's pending outbound/write buffer
+ fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
+ peer.msgs_sent_since_pong += 1;
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
+ }
+
+ /// Append a message to a peer's pending outbound/write buffer
+ fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
let mut buffer = VecWriter(Vec::with_capacity(2048));
wire::write(message, &mut buffer).unwrap(); // crash if the write failed
- let encoded_message = buffer.0;
-
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()));
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
+ self.enqueue_encoded_message(peer, &buffer.0);
}
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap()));
+ peer.received_message_since_timer_tick = true;
// Need an Init as first message
if let wire::Message::Init(_) = message {
}
},
wire::Message::Pong(_msg) => {
- peer.awaiting_pong = false;
+ peer.awaiting_pong_tick_intervals = 0;
+ peer.msgs_sent_since_pong = 0;
},
// Channel messages:
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
+ if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+ || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT
+ {
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.enqueue_encoded_message(peer, &encoded_msg);
}
},
wire::Message::NodeAnnouncement(ref msg) => {
!peer.should_forward_node_announcement(msg.contents.node_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
+ if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+ || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT
+ {
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.enqueue_encoded_message(peer, &encoded_msg);
}
},
wire::Message::ChannelUpdate(ref msg) => {
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
+ if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+ || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT
+ {
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.enqueue_encoded_message(peer, &encoded_msg);
}
},
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
}
}
+ /// Disconnects all currently-connected peers. This is useful on platforms where there may be
+ /// an indication that TCP sockets have stalled even if we weren't around to time them out
+ /// using regular ping/pongs.
+ pub fn disconnect_all_peers(&self) {
+ let mut peers_lock = self.peers.lock().unwrap();
+ let peers = &mut *peers_lock;
+ for (mut descriptor, peer) in peers.peers.drain() {
+ if let Some(node_id) = peer.their_node_id {
+ log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
+ peers.node_id_to_descriptor.remove(&node_id);
+ self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ }
+ descriptor.disconnect_socket();
+ }
+ debug_assert!(peers.node_id_to_descriptor.is_empty());
+ }
+
+ /// This is called when we're blocked on sending additional gossip messages until we receive a
+ /// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting
+ /// `awaiting_pong_tick_intervals` to a special flag value to indicate this).
+ fn maybe_send_extra_ping(&self, peer: &mut Peer) {
+ if peer.awaiting_pong_tick_intervals == 0 {
+ peer.awaiting_pong_tick_intervals = -1;
+ let ping = msgs::Ping {
+ ponglen: 0,
+ byteslen: 64,
+ };
+ self.enqueue_message(peer, &ping);
+ }
+ }
+
/// Send pings to each peer and disconnect those which did not respond to the last round of
/// pings.
///
let node_id_to_descriptor = &mut peers.node_id_to_descriptor;
let peers = &mut peers.peers;
let mut descriptors_needing_disconnect = Vec::new();
+ let peer_count = peers.len();
peers.retain(|descriptor, peer| {
- if peer.awaiting_pong {
+ if !peer.channel_encryptor.is_ready_for_encryption() {
+ // The peer needs to complete its handshake before we can exchange messages
+ return true;
+ }
+
+ if (peer.awaiting_pong_tick_intervals > 0 && !peer.received_message_since_timer_tick)
+ || peer.awaiting_pong_tick_intervals as u64 >
+ MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64
+ {
descriptors_needing_disconnect.push(descriptor.clone());
match peer.their_node_id {
Some(node_id) => {
return false;
}
- if !peer.channel_encryptor.is_ready_for_encryption() {
- // The peer needs to complete its handshake before we can exchange messages
+ peer.received_message_since_timer_tick = false;
+ if peer.awaiting_pong_tick_intervals == -1 {
+ // Magic value set in `maybe_send_extra_ping`.
+ peer.awaiting_pong_tick_intervals = 1;
return true;
}
+ if peer.awaiting_pong_tick_intervals > 0 {
+ peer.awaiting_pong_tick_intervals += 1;
+ return true;
+ }
+
+ peer.awaiting_pong_tick_intervals = 1;
let ping = msgs::Ping {
ponglen: 0,
byteslen: 64,
};
self.enqueue_message(peer, &ping);
+ self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
- let mut descriptor_clone = descriptor.clone();
- self.do_attempt_write_data(&mut descriptor_clone, peer);
-
- peer.awaiting_pong = true;
true
});
// than can fit into a peer's buffer).
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
- // Make each peer to read the messages that the other peer just wrote to them.
- peers[0].process_events();
- peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
- peers[1].process_events();
- peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
+ // Make each peer to read the messages that the other peer just wrote to them. Note that
+ // due to the max-messagse-before-bing limits this may take a few iterations to complete.
+ for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
+ peers[0].process_events();
+ let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert!(!b_read_data.is_empty());
+
+ peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
+ peers[1].process_events();
+
+ let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+ assert!(!a_read_data.is_empty());
+ peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
+
+ peers[1].process_events();
+ assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
+ }
// Check that each peer has received the expected number of channel updates and channel
// announcements.