Merge pull request #1673 from TheBlueMatt/2022-08-may-learn-preimage-balance
[rust-lightning] / lightning / src / ln / peer_handler.rs
index d86cddb13dbc66162c52af7128f0c3c180688532..623aa969dfd4262da60773428c6d03aff681a4ab 100644 (file)
@@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
        fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
        fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
-       fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) ->
-               Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { Vec::new() }
-       fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> { Vec::new() }
+       fn get_next_channel_announcement(&self, _starting_point: u64) ->
+               Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { None }
+       fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<msgs::NodeAnnouncement> { None }
        fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
        fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
        fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
@@ -323,6 +323,10 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
 /// tick. Once we have sent this many messages since the last ping, we send a ping right away to
 /// ensures we don't just fill up our send buffer and leave the peer with too many messages to
 /// process before the next ping.
+///
+/// Note that we continue responding to other messages even after we've sent this many messages, so
+/// it's more of a general guideline used for gossip backfill (and gossip forwarding, times
+/// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit.
 const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
 
 struct Peer {
@@ -378,6 +382,29 @@ impl Peer {
                        InitSyncTracker::NodesSyncing(pk) => pk < node_id,
                }
        }
+
+       /// Returns whether we should be reading bytes from this peer, based on whether its outbound
+       /// buffer still has space and we don't need to pause reads to get some writes out.
+       fn should_read(&self) -> bool {
+               self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
+       }
+
+       /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
+       /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
+       /// been drained.
+       fn should_buffer_gossip_backfill(&self) -> bool {
+               self.pending_outbound_buffer.is_empty() &&
+                       self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+       }
+
+       /// Returns whether this peer's buffer is full and we should drop gossip messages.
+       fn buffer_full_drop_gossip(&self) -> bool {
+               if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+                       || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
+                               return false
+               }
+               true
+       }
 }
 
 /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -387,7 +414,7 @@ impl Peer {
 /// issues such as overly long function definitions.
 ///
 /// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph>, Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
 
 /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
 /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
@@ -397,7 +424,7 @@ pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArc
 /// helps with issues such as long function definitions.
 ///
 /// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e P2PGossipSync<&'g NetworkGraph, &'h C, &'f L>, &'f L, IgnoringMessageHandler>;
+pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'f L, IgnoringMessageHandler>;
 
 /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
 /// socket events into messages which it passes on to its [`MessageHandler`].
@@ -622,8 +649,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// peer using the init message.
        /// The user should pass the remote network address of the host they are connected to.
        ///
-       /// Note that if an Err is returned here you MUST NOT call socket_disconnected for the new
-       /// descriptor but must disconnect the connection immediately.
+       /// If an `Err` is returned here you must disconnect the connection immediately.
        ///
        /// Returns a small number of bytes to send to the remote node (currently always 50).
        ///
@@ -671,9 +697,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// The user should pass the remote network address of the host they are connected to.
        ///
        /// May refuse the connection by returning an Err, but will never write bytes to the remote end
-       /// (outbound connector always speaks first). Note that if an Err is returned here you MUST NOT
-       /// call socket_disconnected for the new descriptor but must disconnect the connection
-       /// immediately.
+       /// (outbound connector always speaks first). If an `Err` is returned here you must disconnect
+       /// the connection immediately.
        ///
        /// Panics if descriptor is duplicative with some other descriptor which has not yet been
        /// [`socket_disconnected()`].
@@ -712,46 +737,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
        fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
                while !peer.awaiting_write_event {
-                       if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
+                       if peer.should_buffer_gossip_backfill() {
                                match peer.sync_status {
                                        InitSyncTracker::NoSyncRequested => {},
                                        InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
-                                               let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
-                                               let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
-                                               for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
-                                                       self.enqueue_message(peer, announce);
-                                                       if let &Some(ref update_a) = update_a_option {
-                                                               self.enqueue_message(peer, update_a);
+                                               if let Some((announce, update_a_option, update_b_option)) =
+                                                       self.message_handler.route_handler.get_next_channel_announcement(c)
+                                               {
+                                                       self.enqueue_message(peer, &announce);
+                                                       if let Some(update_a) = update_a_option {
+                                                               self.enqueue_message(peer, &update_a);
                                                        }
-                                                       if let &Some(ref update_b) = update_b_option {
-                                                               self.enqueue_message(peer, update_b);
+                                                       if let Some(update_b) = update_b_option {
+                                                               self.enqueue_message(peer, &update_b);
                                                        }
                                                        peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
-                                               }
-                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                               } else {
                                                        peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
                                                }
                                        },
                                        InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
-                                               let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
-                                               let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
-                                               for msg in all_messages.iter() {
-                                                       self.enqueue_message(peer, msg);
+                                               if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(None) {
+                                                       self.enqueue_message(peer, &msg);
                                                        peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
-                                               }
-                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                               } else {
                                                        peer.sync_status = InitSyncTracker::NoSyncRequested;
                                                }
                                        },
                                        InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
                                        InitSyncTracker::NodesSyncing(key) => {
-                                               let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
-                                               let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
-                                               for msg in all_messages.iter() {
-                                                       self.enqueue_message(peer, msg);
+                                               if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&key)) {
+                                                       self.enqueue_message(peer, &msg);
                                                        peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
-                                               }
-                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                               } else {
                                                        peer.sync_status = InitSyncTracker::NoSyncRequested;
                                                }
                                        },
@@ -761,18 +779,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                self.maybe_send_extra_ping(peer);
                        }
 
-                       if {
-                               let next_buff = match peer.pending_outbound_buffer.front() {
-                                       None => return,
-                                       Some(buff) => buff,
-                               };
+                       let next_buff = match peer.pending_outbound_buffer.front() {
+                               None => return,
+                               Some(buff) => buff,
+                       };
 
-                               let should_be_reading = peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
-                               let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
-                               let data_sent = descriptor.send_data(pending, should_be_reading);
-                               peer.pending_outbound_buffer_first_msg_offset += data_sent;
-                               if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
-                       } {
+                       let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
+                       let data_sent = descriptor.send_data(pending, peer.should_read());
+                       peer.pending_outbound_buffer_first_msg_offset += data_sent;
+                       if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
                                peer.pending_outbound_buffer_first_msg_offset = 0;
                                peer.pending_outbound_buffer.pop_front();
                        } else {
@@ -1047,7 +1062,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        }
                                                }
                                        }
-                                       pause_read = peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
+                                       pause_read = !peer.should_read();
 
                                        if let Some(message) = msg_to_handle {
                                                match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -1310,9 +1325,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1336,9 +1349,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_node_announcement(msg.contents.node_id) {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1361,9 +1372,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1926,11 +1935,18 @@ mod tests {
                peer_a.new_inbound_connection(fd_a.clone(), None).unwrap();
                assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
                peer_a.process_events();
-               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
                peer_b.process_events();
-               assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_a.read_event(&mut fd_a, &b_data).unwrap(), false);
+
                peer_a.process_events();
-               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
                (fd_a.clone(), fd_b.clone())
        }
 
@@ -2055,10 +2071,10 @@ mod tests {
 
                // Check that each peer has received the expected number of channel updates and channel
                // announcements.
-               assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
-               assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
-               assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
-               assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
+               assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
+               assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
+               assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
+               assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
        }
 
        #[test]
@@ -2084,14 +2100,16 @@ mod tests {
 
                assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
                peers[0].process_events();
-               assert_eq!(peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
                peers[1].process_events();
 
                // ...but if we get a second timer tick, we should disconnect the peer
                peers[0].timer_tick_occurred();
                assert_eq!(peers[0].peers.read().unwrap().len(), 0);
 
-               assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err());
+               let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+               assert!(peers[0].read_event(&mut fd_a, &b_data).is_err());
        }
 
        #[test]