pending_outbound_buffer: LinkedList<Vec<u8>>,
pending_outbound_buffer_first_msg_offset: usize,
+ // Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize
+ // channel messages over them.
+ gossip_broadcast_buffer: LinkedList<Vec<u8>>,
awaiting_write_event: bool,
pending_read_buffer: Vec<u8>,
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
}
- /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
- /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
- /// been drained.
+ /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
+ /// outbound buffer. This is checked every time the peer's buffer may have been drained.
fn should_buffer_gossip_backfill(&self) -> bool {
- self.pending_outbound_buffer.is_empty() &&
- self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+ self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
+ && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
}
- /// Returns whether this peer's buffer is full and we should drop gossip messages.
+ /// Determines if we should push additional gossip broadcast messages onto a peer's outbound
+ /// buffer. This is checked every time the peer's buffer may have been drained.
+ fn should_buffer_gossip_broadcast(&self) -> bool {
+ self.pending_outbound_buffer.is_empty()
+ && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+ }
+
+ /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
fn buffer_full_drop_gossip_broadcast(&self) -> bool {
- self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
+ let total_outbound_buffered =
+ self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
+
+ total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
}
}
pending_outbound_buffer: LinkedList::new(),
pending_outbound_buffer_first_msg_offset: 0,
+ gossip_broadcast_buffer: LinkedList::new(),
awaiting_write_event: false,
pending_read_buffer,
pending_outbound_buffer: LinkedList::new(),
pending_outbound_buffer_first_msg_offset: 0,
+ gossip_broadcast_buffer: LinkedList::new(),
awaiting_write_event: false,
pending_read_buffer,
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
while !peer.awaiting_write_event {
+ if peer.should_buffer_gossip_broadcast() {
+ if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
+ peer.pending_outbound_buffer.push_back(msg);
+ }
+ }
if peer.should_buffer_gossip_backfill() {
match peer.sync_status {
InitSyncTracker::NoSyncRequested => {},
}
}
- /// Append a message to a peer's pending outbound/write buffer
- fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
- peer.msgs_sent_since_pong += 1;
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
- }
-
/// Append a message to a peer's pending outbound/write buffer
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
let mut buffer = VecWriter(Vec::with_capacity(2048));
} else {
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()))
}
- self.enqueue_encoded_message(peer, &buffer.0);
+ peer.msgs_sent_since_pong += 1;
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&buffer.0[..]));
+ }
+
+ /// Append a message to a peer's pending outbound/write gossip broadcast buffer
+ fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
+ peer.msgs_sent_since_pong += 1;
+ peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
}
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+ self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
}
},
wire::Message::NodeAnnouncement(ref msg) => {
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+ self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
}
},
wire::Message::ChannelUpdate(ref msg) => {
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+ self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
}
},
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),