From 4a1ee5f9a984c9b0c0892025d624ade734337b1a Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Tue, 9 Aug 2022 21:20:56 +0000 Subject: [PATCH] Use util methods in `Peer` to decide when to forward This consolidates our various checks on peer buffer space into the `Peer` impl itself, making the thresholds at which we stop taking various actions on a peer more readable as a whole. This commit was primarily authored by `Valentine Wallace ` with some amendments by `Matt Corallo `. --- lightning/src/ln/peer_handler.rs | 52 +++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index d34fdfeb..6c9a608b 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -378,6 +378,31 @@ impl Peer { InitSyncTracker::NodesSyncing(pk) => pk < node_id, } } + + /// Returns the number of gossip messages we can fit in this peer's buffer. + fn gossip_buffer_slots_available(&self) -> usize { + OUTBOUND_BUFFER_LIMIT_READ_PAUSE.saturating_sub(self.pending_outbound_buffer.len()) + } + + /// Returns whether we should be reading bytes from this peer, based on whether its outbound + /// buffer still has space and we don't need to pause reads to get some writes out. + fn should_read(&self) -> bool { + self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE + } + + fn should_backfill_gossip(&self) -> bool { + self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && + self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK + } + + /// Returns whether this peer's buffer is full and we should drop gossip messages. + fn buffer_full_drop_gossip(&self) -> bool { + if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO { + return false + } + true + } } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -710,11 +735,11 @@ impl P fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { while !peer.awaiting_write_event { - if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK { + if peer.should_backfill_gossip() { match peer.sync_status { InitSyncTracker::NoSyncRequested => {}, InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => { - let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8; + let steps = ((peer.gossip_buffer_slots_available() + 2) / 3) as u8; let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps); for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() { self.enqueue_message(peer, announce); @@ -731,7 +756,7 @@ impl P } }, InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => { - let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8; + let steps = peer.gossip_buffer_slots_available() as u8; let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps); for msg in all_messages.iter() { self.enqueue_message(peer, msg); @@ -743,7 +768,7 @@ impl P }, InitSyncTracker::ChannelsSyncing(_) => unreachable!(), InitSyncTracker::NodesSyncing(key) => { - let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8; + let steps = peer.gossip_buffer_slots_available() as u8; let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps); for msg in all_messages.iter() { self.enqueue_message(peer, msg); @@ -765,11 +790,10 @@ impl P Some(buff) => buff, }; - let should_be_reading = peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE; let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; - let data_sent = descriptor.send_data(pending, should_be_reading); + let data_sent = descriptor.send_data(pending, peer.should_read()); peer.pending_outbound_buffer_first_msg_offset += data_sent; - if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false } + peer.pending_outbound_buffer_first_msg_offset == next_buff.len() } { peer.pending_outbound_buffer_first_msg_offset = 0; peer.pending_outbound_buffer.pop_front(); @@ -1045,7 +1069,7 @@ impl P } } } - pause_read = peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE; + pause_read = !peer.should_read(); if let Some(message) = msg_to_handle { match self.handle_message(&peer_mutex, peer_lock, message) { @@ -1308,9 +1332,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1334,9 +1356,7 @@ impl P !peer.should_forward_node_announcement(msg.contents.node_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1359,9 +1379,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } -- 2.30.2