Use util methods in `Peer` to decide when to forward
authorValentine Wallace <vwallace@protonmail.com>
Tue, 9 Aug 2022 21:20:56 +0000 (21:20 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 10 Aug 2022 19:29:39 +0000 (19:29 +0000)
This consolidates our various checks on peer buffer space into the
`Peer` impl itself, making the thresholds at which we stop taking
various actions on a peer more readable as a whole.

This commit was primarily authored by `Valentine Wallace
<vwallace@protonmail.com>` with some amendments by `Matt Corallo
<git@bluematt.me>`.

lightning/src/ln/peer_handler.rs

index d34fdfeb52a1ddcb887bfd42c4cfcddac8e7d0e9..6c9a608b624861d88fbb3b4e3aab165e40ddc182 100644 (file)
@@ -378,6 +378,31 @@ impl Peer {
                        InitSyncTracker::NodesSyncing(pk) => pk < node_id,
                }
        }
+
+       /// Returns the number of gossip messages we can fit in this peer's buffer.
+       fn gossip_buffer_slots_available(&self) -> usize {
+               OUTBOUND_BUFFER_LIMIT_READ_PAUSE.saturating_sub(self.pending_outbound_buffer.len())
+       }
+
+       /// Returns whether we should be reading bytes from this peer, based on whether its outbound
+       /// buffer still has space and we don't need to pause reads to get some writes out.
+       fn should_read(&self) -> bool {
+               self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
+       }
+
+       fn should_backfill_gossip(&self) -> bool {
+               self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
+                       self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+       }
+
+       /// Returns whether this peer's buffer is full and we should drop gossip messages.
+       fn buffer_full_drop_gossip(&self) -> bool {
+               if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+                       || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
+                               return false
+               }
+               true
+       }
 }
 
 /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -710,11 +735,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
        fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
                while !peer.awaiting_write_event {
-                       if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
+                       if peer.should_backfill_gossip() {
                                match peer.sync_status {
                                        InitSyncTracker::NoSyncRequested => {},
                                        InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
-                                               let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
+                                               let steps = ((peer.gossip_buffer_slots_available() + 2) / 3) as u8;
                                                let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
                                                for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
                                                        self.enqueue_message(peer, announce);
@@ -731,7 +756,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                }
                                        },
                                        InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
-                                               let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
+                                               let steps = peer.gossip_buffer_slots_available() as u8;
                                                let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
                                                for msg in all_messages.iter() {
                                                        self.enqueue_message(peer, msg);
@@ -743,7 +768,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        },
                                        InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
                                        InitSyncTracker::NodesSyncing(key) => {
-                                               let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
+                                               let steps = peer.gossip_buffer_slots_available() as u8;
                                                let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
                                                for msg in all_messages.iter() {
                                                        self.enqueue_message(peer, msg);
@@ -765,11 +790,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        Some(buff) => buff,
                                };
 
-                               let should_be_reading = peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
                                let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
-                               let data_sent = descriptor.send_data(pending, should_be_reading);
+                               let data_sent = descriptor.send_data(pending, peer.should_read());
                                peer.pending_outbound_buffer_first_msg_offset += data_sent;
-                               if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
+                               peer.pending_outbound_buffer_first_msg_offset == next_buff.len()
                        } {
                                peer.pending_outbound_buffer_first_msg_offset = 0;
                                peer.pending_outbound_buffer.pop_front();
@@ -1045,7 +1069,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        }
                                                }
                                        }
-                                       pause_read = peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
+                                       pause_read = !peer.should_read();
 
                                        if let Some(message) = msg_to_handle {
                                                match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -1308,9 +1332,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1334,9 +1356,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_node_announcement(msg.contents.node_id) {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1359,9 +1379,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }