/// the peer.
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
-/// If we've sent a ping, and are still awaiting a response, we (or our peer) may need to churn our
-/// (or their) way through the socket receive buffer before receiving the ping.
+/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
+/// the socket receive buffer before receiving the ping.
///
/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
-/// including any network delays or outbound traffic.
+/// including any network delays, outbound traffic, or the same for messages from other peers.
///
/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
-/// to respond to a ping, as long as they send us at least one message during each tick or if we
-/// sent a lot of messages, ensuring we aren't actually just disconnected. With a timer tick
-/// interval of five seconds, this translates to about 30 seconds.
-pub const MAX_BUFFER_DRAIN_TICK_INTERVALS: i8 = 6;
+/// per connected peer to respond to a ping, as long as they send us at least one message during
+/// each tick, ensuring we aren't actually just disconnected.
+/// With a timer tick interval of five seconds, this translates to about 30 seconds per connected
+/// peer.
+///
+/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per
+/// two connected peers, assuming most LDK-running systems have at least two cores.
+const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 6;
/// This is the minimum number of messages we expect a peer to be able to handle within one timer
/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
/// process before the next ping.
-pub const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
+const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
struct Peer {
channel_encryptor: PeerChannelEncryptor,
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 {
+ if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+ || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT
+ {
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
!peer.should_forward_node_announcement(msg.contents.node_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 {
+ if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+ || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT
+ {
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 {
+ if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+ || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT
+ {
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
let node_id_to_descriptor = &mut peers.node_id_to_descriptor;
let peers = &mut peers.peers;
let mut descriptors_needing_disconnect = Vec::new();
+ let peer_count = peers.len();
peers.retain(|descriptor, peer| {
if !peer.channel_encryptor.is_ready_for_encryption() {
}
if (peer.awaiting_pong_tick_intervals > 0 && !peer.received_message_since_timer_tick)
- || peer.awaiting_pong_tick_intervals > MAX_BUFFER_DRAIN_TICK_INTERVALS
+ || peer.awaiting_pong_tick_intervals as u64 >
+ MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64
{
descriptors_needing_disconnect.push(descriptor.clone());
match peer.their_node_id {
// Make each peer to read the messages that the other peer just wrote to them. Note that
// due to the max-messagse-before-bing limits this may take a few iterations to complete.
- for _ in 0..10 {
+ for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
peers[0].process_events();
- peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
+ let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+ assert!(!b_read_data.is_empty());
+
+ peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
+ peers[1].process_events();
+
+ let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+ assert!(!a_read_data.is_empty());
+ peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
+
peers[1].process_events();
- peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
+ assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
}
// Check that each peer has received the expected number of channel updates and channel