From: Matt Corallo Date: Sun, 22 Jan 2023 18:08:33 +0000 (+0000) Subject: Apply backpressure when we have too many gossip checks in-flight X-Git-Tag: v0.0.114-beta~22^2~5 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=00a70c25f9111f8f733f2ca4a0a61ba67d2d56a8;p=rust-lightning Apply backpressure when we have too many gossip checks in-flight Now that the `RoutingMessageHandler` can signal that it needs to apply message backpressure, we implement it here in the `PeerManager`. There's not much complicated here, aside from noting that we need to add the ability to call `send_data` with no data to indicate that reading should resume (and track when we may need to make such calls when updating the routing-backpressure state). --- diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 11f120633..1bbb30b6b 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -563,6 +563,9 @@ pub struct PeerManager bool { + !self.gossip_processing_backlogged.load(Ordering::Relaxed) && peer.should_read() + } + + fn update_gossip_backlogged(&self) { + let new_state = self.message_handler.route_handler.processing_queue_high(); + let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed); + if prev_state && !new_state { + self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed); + } + } + + fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool) { + let mut have_written = false; while !peer.awaiting_write_event { if peer.should_buffer_onion_message() { if let Some(peer_node_id) = peer.their_node_id { @@ -905,12 +923,22 @@ impl return, + None => { + if force_one_write && !have_written { + let should_read = self.peer_should_read(&peer); + if should_read { + let data_sent = descriptor.send_data(&[], should_read); + debug_assert_eq!(data_sent, 0, "Can't write more than no data"); + } + } + return + }, Some(buff) => buff, }; let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; - let data_sent = descriptor.send_data(pending, peer.should_read()); + let data_sent = descriptor.send_data(pending, self.peer_should_read(&peer)); + have_written = true; peer.pending_outbound_buffer_first_msg_offset += data_sent; if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { peer.pending_outbound_buffer_first_msg_offset = 0; @@ -945,7 +973,7 @@ impl { let mut peer = peer_mutex.lock().unwrap(); peer.awaiting_write_event = false; - self.do_attempt_write_data(descriptor, &mut peer); + self.do_attempt_write_data(descriptor, &mut peer, false); } }; Ok(()) @@ -1192,7 +1220,7 @@ impl MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelAnnouncement(msg)); } + self.update_gossip_backlogged(); }, wire::Message::NodeAnnouncement(msg) => { if self.message_handler.route_handler.handle_node_announcement(&msg) .map_err(|e| -> MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::NodeAnnouncement(msg)); } + self.update_gossip_backlogged(); }, wire::Message::ChannelUpdate(msg) => { self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg); @@ -1417,6 +1447,7 @@ impl MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelUpdate(msg)); } + self.update_gossip_backlogged(); }, wire::Message::QueryShortChannelIds(msg) => { self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?; @@ -1568,6 +1599,9 @@ impl 0 && !peer.received_message_since_timer_tick) + || peer.awaiting_pong_timer_tick_intervals as u64 > + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 + { + descriptors_needing_disconnect.push(descriptor.clone()); + break; + } peer.received_message_since_timer_tick = false; - continue; - } - if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick) - || peer.awaiting_pong_timer_tick_intervals as u64 > - MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 - { - descriptors_needing_disconnect.push(descriptor.clone()); - continue; - } - peer.received_message_since_timer_tick = false; + if peer.awaiting_pong_timer_tick_intervals > 0 { + peer.awaiting_pong_timer_tick_intervals += 1; + break; + } - if peer.awaiting_pong_timer_tick_intervals > 0 { - peer.awaiting_pong_timer_tick_intervals += 1; - continue; + peer.awaiting_pong_timer_tick_intervals = 1; + let ping = msgs::Ping { + ponglen: 0, + byteslen: 64, + }; + self.enqueue_message(&mut *peer, &ping); + break; } - - peer.awaiting_pong_timer_tick_intervals = 1; - let ping = msgs::Ping { - ponglen: 0, - byteslen: 64, - }; - self.enqueue_message(&mut *peer, &ping); - self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer); + self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer, flush_read_disabled); } }