Merge pull request #1683 from valentinewallace/2022-08-multiqueue-peerman
[rust-lightning] / lightning / src / ln / peer_handler.rs
index 623aa969dfd4262da60773428c6d03aff681a4ab..573e910ab083849b74587517540ff2210be04983 100644 (file)
@@ -337,6 +337,9 @@ struct Peer {
 
        pending_outbound_buffer: LinkedList<Vec<u8>>,
        pending_outbound_buffer_first_msg_offset: usize,
+       // Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize
+       // channel messages over them.
+       gossip_broadcast_buffer: LinkedList<Vec<u8>>,
        awaiting_write_event: bool,
 
        pending_read_buffer: Vec<u8>,
@@ -389,21 +392,27 @@ impl Peer {
                self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
        }
 
-       /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
-       /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
-       /// been drained.
+       /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
+       /// outbound buffer. This is checked every time the peer's buffer may have been drained.
        fn should_buffer_gossip_backfill(&self) -> bool {
-               self.pending_outbound_buffer.is_empty() &&
-                       self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+               self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
+                       && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
        }
 
-       /// Returns whether this peer's buffer is full and we should drop gossip messages.
-       fn buffer_full_drop_gossip(&self) -> bool {
-               if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                       || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
-                               return false
-               }
-               true
+       /// Determines if we should push additional gossip broadcast messages onto a peer's outbound
+       /// buffer. This is checked every time the peer's buffer may have been drained.
+       fn should_buffer_gossip_broadcast(&self) -> bool {
+               self.pending_outbound_buffer.is_empty()
+                       && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+       }
+
+       /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
+       fn buffer_full_drop_gossip_broadcast(&self) -> bool {
+               let total_outbound_buffered =
+                       self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
+
+               total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
+                       self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
        }
 }
 
@@ -671,6 +680,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
                        pending_outbound_buffer: LinkedList::new(),
                        pending_outbound_buffer_first_msg_offset: 0,
+                       gossip_broadcast_buffer: LinkedList::new(),
                        awaiting_write_event: false,
 
                        pending_read_buffer,
@@ -717,6 +727,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
                        pending_outbound_buffer: LinkedList::new(),
                        pending_outbound_buffer_first_msg_offset: 0,
+                       gossip_broadcast_buffer: LinkedList::new(),
                        awaiting_write_event: false,
 
                        pending_read_buffer,
@@ -737,6 +748,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
        fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
                while !peer.awaiting_write_event {
+                       if peer.should_buffer_gossip_broadcast() {
+                               if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
+                                       peer.pending_outbound_buffer.push_back(msg);
+                               }
+                       }
                        if peer.should_buffer_gossip_backfill() {
                                match peer.sync_status {
                                        InitSyncTracker::NoSyncRequested => {},
@@ -851,12 +867,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                }
        }
 
-       /// Append a message to a peer's pending outbound/write buffer
-       fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
-               peer.msgs_sent_since_pong += 1;
-               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
-       }
-
        /// Append a message to a peer's pending outbound/write buffer
        fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
                let mut buffer = VecWriter(Vec::with_capacity(2048));
@@ -867,7 +877,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                } else {
                        log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()))
                }
-               self.enqueue_encoded_message(peer, &buffer.0);
+               peer.msgs_sent_since_pong += 1;
+               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&buffer.0[..]));
+       }
+
+       /// Append a message to a peer's pending outbound/write gossip broadcast buffer
+       fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
+               peer.msgs_sent_since_pong += 1;
+               peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
        }
 
        fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
@@ -1325,7 +1342,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
                                                continue
                                        }
-                                       if peer.buffer_full_drop_gossip() {
+                                       if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1336,7 +1353,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
                                }
                        },
                        wire::Message::NodeAnnouncement(ref msg) => {
@@ -1349,7 +1366,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_node_announcement(msg.contents.node_id) {
                                                continue
                                        }
-                                       if peer.buffer_full_drop_gossip() {
+                                       if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1359,7 +1376,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
                                }
                        },
                        wire::Message::ChannelUpdate(ref msg) => {
@@ -1372,14 +1389,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
                                                continue
                                        }
-                                       if peer.buffer_full_drop_gossip() {
+                                       if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
                                        if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
                                }
                        },
                        _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),