Don't apply gossip backpressure to non-channel-announcing peers
authorMatt Corallo <git@bluematt.me>
Mon, 30 Jan 2023 17:56:46 +0000 (17:56 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 9 Feb 2023 15:40:43 +0000 (15:40 +0000)
When we apply the new gossip-async-check backpressure on peer
connections, if a peer has never sent us a `channel_announcement`
at all, we really shouldn't delay reading their messages.

This does so by tracking, on a per-peer basis, whether they've sent
us a channel_announcement, and resetting that state whenever we're
not backlogged.

lightning/src/ln/peer_handler.rs

index 1bbb30b6b564e467da1ad6a8caab6a64bcff9792..cecf4332e291c87893e761ec1eed18490abb70c5 100644 (file)
@@ -413,6 +413,12 @@ struct Peer {
        awaiting_pong_timer_tick_intervals: i8,
        received_message_since_timer_tick: bool,
        sent_gossip_timestamp_filter: bool,
+
+       /// Indicates we've received a `channel_announcement` since the last time we had
+       /// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a
+       /// `channel_announcement` at all - we set this unconditionally but unset it every time we
+       /// check if we're gossip-processing-backlogged).
+       received_channel_announce_since_backlogged: bool,
 }
 
 impl Peer {
@@ -449,8 +455,12 @@ impl Peer {
 
        /// Returns whether we should be reading bytes from this peer, based on whether its outbound
        /// buffer still has space and we don't need to pause reads to get some writes out.
-       fn should_read(&self) -> bool {
-               self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
+       fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool {
+               if !gossip_processing_backlogged {
+                       self.received_channel_announce_since_backlogged = false;
+               }
+               self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
+                       (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged)
        }
 
        /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
@@ -799,6 +809,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        awaiting_pong_timer_tick_intervals: 0,
                        received_message_since_timer_tick: false,
                        sent_gossip_timestamp_filter: false,
+
+                       received_channel_announce_since_backlogged: false,
                })).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -846,14 +858,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        awaiting_pong_timer_tick_intervals: 0,
                        received_message_since_timer_tick: false,
                        sent_gossip_timestamp_filter: false,
+
+                       received_channel_announce_since_backlogged: false,
                })).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
                Ok(())
        }
 
-       fn peer_should_read(&self, peer: &Peer) -> bool {
-               !self.gossip_processing_backlogged.load(Ordering::Relaxed) && peer.should_read()
+       fn peer_should_read(&self, peer: &mut Peer) -> bool {
+               peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed))
        }
 
        fn update_gossip_backlogged(&self) {
@@ -922,10 +936,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                self.maybe_send_extra_ping(peer);
                        }
 
+                       let should_read = self.peer_should_read(peer);
                        let next_buff = match peer.pending_outbound_buffer.front() {
                                None => {
                                        if force_one_write && !have_written {
-                                               let should_read = self.peer_should_read(&peer);
                                                if should_read {
                                                        let data_sent = descriptor.send_data(&[], should_read);
                                                        debug_assert_eq!(data_sent, 0, "Can't write more than no data");
@@ -937,7 +951,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        };
 
                        let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
-                       let data_sent = descriptor.send_data(pending, self.peer_should_read(&peer));
+                       let data_sent = descriptor.send_data(pending, should_read);
                        have_written = true;
                        peer.pending_outbound_buffer_first_msg_offset += data_sent;
                        if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
@@ -1220,7 +1234,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                        }
                                                }
                                        }
-                                       pause_read = !self.peer_should_read(&peer);
+                                       pause_read = !self.peer_should_read(peer);
 
                                        if let Some(message) = msg_to_handle {
                                                match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -1306,6 +1320,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        return Ok(None);
                }
 
+               if let wire::Message::ChannelAnnouncement(ref _msg) = message {
+                       peer_lock.received_channel_announce_since_backlogged = true;
+               }
+
                mem::drop(peer_lock);
 
                if is_gossip_msg(message.type_id()) {
@@ -1831,7 +1849,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        }
 
                        for (descriptor, peer_mutex) in peers.iter() {
-                               self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap(), flush_read_disabled);
+                               let mut peer = peer_mutex.lock().unwrap();
+                               if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
+                               self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled);
                        }
                }
                if !peers_to_disconnect.is_empty() {
@@ -1966,6 +1986,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
                        for (descriptor, peer_mutex) in peers_lock.iter() {
                                let mut peer = peer_mutex.lock().unwrap();
+                               if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
+
                                if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
                                        // The peer needs to complete its handshake before we can exchange messages. We
                                        // give peers one timer tick to complete handshake, reusing