X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=inline;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=48bad2d69e16d59c5dd3d3a1436dc94436254f2a;hb=84d3630dffd31c133ff1b7380505597812f6c12f;hp=4fd04c8064ac34541d4a3a230a6b95727d2884c2;hpb=201cf624841349bd01fdda8db5bcf81df74622f0;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 4fd04c80..48bad2d6 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -298,23 +298,27 @@ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10; /// the peer. const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO; -/// If we've sent a ping, and are still awaiting a response, we (or our peer) may need to churn our -/// (or their) way through the socket receive buffer before receiving the ping. +/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through +/// the socket receive buffer before receiving the ping. /// /// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not -/// including any network delays or outbound traffic. +/// including any network delays, outbound traffic, or the same for messages from other peers. /// /// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks -/// to respond to a ping, as long as they send us at least one message during each tick or if we -/// sent a lot of messages, ensuring we aren't actually just disconnected. With a timer tick -/// interval of five seconds, this translates to about 30 seconds. -pub const MAX_BUFFER_DRAIN_TICK_INTERVALS: i8 = 6; +/// per connected peer to respond to a ping, as long as they send us at least one message during +/// each tick, ensuring we aren't actually just disconnected. +/// With a timer tick interval of five seconds, this translates to about 30 seconds per connected +/// peer. +/// +/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per +/// two connected peers, assuming most LDK-running systems have at least two cores. +const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 6; /// This is the minimum number of messages we expect a peer to be able to handle within one timer /// tick. Once we have sent this many messages since the last ping, we send a ping right away to /// ensures we don't just fill up our send buffer and leave the peer with too many messages to /// process before the next ping. -pub const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; +const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; struct Peer { channel_encryptor: PeerChannelEncryptor, @@ -1140,7 +1144,9 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 { + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT + { log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1163,7 +1169,9 @@ impl P !peer.should_forward_node_announcement(msg.contents.node_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 { + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT + { log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1185,7 +1193,9 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 { + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT + { log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1517,6 +1527,7 @@ impl P let node_id_to_descriptor = &mut peers.node_id_to_descriptor; let peers = &mut peers.peers; let mut descriptors_needing_disconnect = Vec::new(); + let peer_count = peers.len(); peers.retain(|descriptor, peer| { if !peer.channel_encryptor.is_ready_for_encryption() { @@ -1525,7 +1536,8 @@ impl P } if (peer.awaiting_pong_tick_intervals > 0 && !peer.received_message_since_timer_tick) - || peer.awaiting_pong_tick_intervals > MAX_BUFFER_DRAIN_TICK_INTERVALS + || peer.awaiting_pong_tick_intervals as u64 > + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64 { descriptors_needing_disconnect.push(descriptor.clone()); match peer.their_node_id { @@ -1723,11 +1735,20 @@ mod tests { // Make each peer to read the messages that the other peer just wrote to them. Note that // due to the max-messagse-before-bing limits this may take a few iterations to complete. - for _ in 0..10 { + for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 { peers[0].process_events(); - peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(); + let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0); + assert!(!b_read_data.is_empty()); + + peers[1].read_event(&mut fd_b, &b_read_data).unwrap(); + peers[1].process_events(); + + let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0); + assert!(!a_read_data.is_empty()); + peers[0].read_event(&mut fd_a, &a_read_data).unwrap(); + peers[1].process_events(); - peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(); + assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages"); } // Check that each peer has received the expected number of channel updates and channel