X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;fp=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=4fd04c8064ac34541d4a3a230a6b95727d2884c2;hb=201cf624841349bd01fdda8db5bcf81df74622f0;hp=758e16b5a356cdeb02947d30ae7bd73996a7f0b9;hpb=af58b09ab80e45b225274c9004a0c98a95b6aad7;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 758e16b5..4fd04c80 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -298,6 +298,24 @@ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10; /// the peer. const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO; +/// If we've sent a ping, and are still awaiting a response, we (or our peer) may need to churn our +/// (or their) way through the socket receive buffer before receiving the ping. +/// +/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not +/// including any network delays or outbound traffic. +/// +/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks +/// to respond to a ping, as long as they send us at least one message during each tick or if we +/// sent a lot of messages, ensuring we aren't actually just disconnected. With a timer tick +/// interval of five seconds, this translates to about 30 seconds. +pub const MAX_BUFFER_DRAIN_TICK_INTERVALS: i8 = 6; + +/// This is the minimum number of messages we expect a peer to be able to handle within one timer +/// tick. Once we have sent this many messages since the last ping, we send a ping right away to +/// ensures we don't just fill up our send buffer and leave the peer with too many messages to +/// process before the next ping. +pub const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; + struct Peer { channel_encryptor: PeerChannelEncryptor, their_node_id: Option, @@ -313,7 +331,9 @@ struct Peer { sync_status: InitSyncTracker, - awaiting_pong: bool, + msgs_sent_since_pong: usize, + awaiting_pong_tick_intervals: i8, + received_message_since_timer_tick: bool, } impl Peer { @@ -555,7 +575,9 @@ impl P sync_status: InitSyncTracker::NoSyncRequested, - awaiting_pong: false, + msgs_sent_since_pong: 0, + awaiting_pong_tick_intervals: 0, + received_message_since_timer_tick: false, }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -593,7 +615,9 @@ impl P sync_status: InitSyncTracker::NoSyncRequested, - awaiting_pong: false, + msgs_sent_since_pong: 0, + awaiting_pong_tick_intervals: 0, + received_message_since_timer_tick: false, }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -602,7 +626,7 @@ impl P fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { while !peer.awaiting_write_event { - if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE { + if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK { match peer.sync_status { InitSyncTracker::NoSyncRequested => {}, InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => { @@ -647,6 +671,9 @@ impl P }, } } + if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK { + self.maybe_send_extra_ping(peer); + } if { let next_buff = match peer.pending_outbound_buffer.front() { @@ -724,6 +751,7 @@ impl P /// Append a message to a peer's pending outbound/write buffer fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec) { + peer.msgs_sent_since_pong += 1; peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); } @@ -926,6 +954,7 @@ impl P message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap())); + peer.received_message_since_timer_tick = true; // Need an Init as first message if let wire::Message::Init(_) = message { @@ -989,7 +1018,8 @@ impl P } }, wire::Message::Pong(_msg) => { - peer.awaiting_pong = false; + peer.awaiting_pong_tick_intervals = 0; + peer.msgs_sent_since_pong = 0; }, // Channel messages: @@ -1110,7 +1140,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP { + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 { log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1133,7 +1163,7 @@ impl P !peer.should_forward_node_announcement(msg.contents.node_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP { + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 { log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1155,7 +1185,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP { + if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * 2 { log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1455,6 +1485,20 @@ impl P debug_assert!(peers.node_id_to_descriptor.is_empty()); } + /// This is called when we're blocked on sending additional gossip messages until we receive a + /// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting + /// `awaiting_pong_tick_intervals` to a special flag value to indicate this). + fn maybe_send_extra_ping(&self, peer: &mut Peer) { + if peer.awaiting_pong_tick_intervals == 0 { + peer.awaiting_pong_tick_intervals = -1; + let ping = msgs::Ping { + ponglen: 0, + byteslen: 64, + }; + self.enqueue_message(peer, &ping); + } + } + /// Send pings to each peer and disconnect those which did not respond to the last round of /// pings. /// @@ -1475,7 +1519,14 @@ impl P let mut descriptors_needing_disconnect = Vec::new(); peers.retain(|descriptor, peer| { - if peer.awaiting_pong { + if !peer.channel_encryptor.is_ready_for_encryption() { + // The peer needs to complete its handshake before we can exchange messages + return true; + } + + if (peer.awaiting_pong_tick_intervals > 0 && !peer.received_message_since_timer_tick) + || peer.awaiting_pong_tick_intervals > MAX_BUFFER_DRAIN_TICK_INTERVALS + { descriptors_needing_disconnect.push(descriptor.clone()); match peer.their_node_id { Some(node_id) => { @@ -1492,21 +1543,26 @@ impl P return false; } - if !peer.channel_encryptor.is_ready_for_encryption() { - // The peer needs to complete its handshake before we can exchange messages + peer.received_message_since_timer_tick = false; + if peer.awaiting_pong_tick_intervals == -1 { + // Magic value set in `maybe_send_extra_ping`. + peer.awaiting_pong_tick_intervals = 1; + return true; + } + + if peer.awaiting_pong_tick_intervals > 0 { + peer.awaiting_pong_tick_intervals += 1; return true; } + peer.awaiting_pong_tick_intervals = 1; let ping = msgs::Ping { ponglen: 0, byteslen: 64, }; self.enqueue_message(peer, &ping); + self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer); - let mut descriptor_clone = descriptor.clone(); - self.do_attempt_write_data(&mut descriptor_clone, peer); - - peer.awaiting_pong = true; true }); @@ -1665,11 +1721,14 @@ mod tests { // than can fit into a peer's buffer). let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]); - // Make each peer to read the messages that the other peer just wrote to them. - peers[0].process_events(); - peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(); - peers[1].process_events(); - peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(); + // Make each peer to read the messages that the other peer just wrote to them. Note that + // due to the max-messagse-before-bing limits this may take a few iterations to complete. + for _ in 0..10 { + peers[0].process_events(); + peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(); + peers[1].process_events(); + peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(); + } // Check that each peer has received the expected number of channel updates and channel // announcements.