InitSyncTracker::NodesSyncing(pk) => pk < node_id,
}
}
+
+ /// Returns the number of gossip messages we can fit in this peer's buffer.
+ fn gossip_buffer_slots_available(&self) -> usize {
+ OUTBOUND_BUFFER_LIMIT_READ_PAUSE.saturating_sub(self.pending_outbound_buffer.len())
+ }
+
+ /// Returns whether we should be reading bytes from this peer, based on whether its outbound
+ /// buffer still has space and we don't need to pause reads to get some writes out.
+ fn should_read(&self) -> bool {
+ self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
+ }
+
+ fn should_backfill_gossip(&self) -> bool {
+ self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
+ self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+ }
+
+ /// Returns whether this peer's buffer is full and we should drop gossip messages.
+ fn buffer_full_drop_gossip(&self) -> bool {
+ if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+ || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
+ return false
+ }
+ true
+ }
}
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
while !peer.awaiting_write_event {
- if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
+ if peer.should_backfill_gossip() {
match peer.sync_status {
InitSyncTracker::NoSyncRequested => {},
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
- let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
+ let steps = ((peer.gossip_buffer_slots_available() + 2) / 3) as u8;
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
self.enqueue_message(peer, announce);
}
},
InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
- let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
+ let steps = peer.gossip_buffer_slots_available() as u8;
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
for msg in all_messages.iter() {
self.enqueue_message(peer, msg);
},
InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
InitSyncTracker::NodesSyncing(key) => {
- let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
+ let steps = peer.gossip_buffer_slots_available() as u8;
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
for msg in all_messages.iter() {
self.enqueue_message(peer, msg);
Some(buff) => buff,
};
- let should_be_reading = peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
- let data_sent = descriptor.send_data(pending, should_be_reading);
+ let data_sent = descriptor.send_data(pending, peer.should_read());
peer.pending_outbound_buffer_first_msg_offset += data_sent;
- if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
+ peer.pending_outbound_buffer_first_msg_offset == next_buff.len()
} {
peer.pending_outbound_buffer_first_msg_offset = 0;
peer.pending_outbound_buffer.pop_front();
}
}
}
- pause_read = peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
+ pause_read = !peer.should_read();
if let Some(message) = msg_to_handle {
match self.handle_message(&peer_mutex, peer_lock, message) {
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
- || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
- {
+ if peer.buffer_full_drop_gossip() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
!peer.should_forward_node_announcement(msg.contents.node_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
- || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
- {
+ if peer.buffer_full_drop_gossip() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
- if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
- || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
- {
+ if peer.buffer_full_drop_gossip() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}