Backfill gossip without buffering directly in LDK
[rust-lightning] / lightning / src / ln / peer_handler.rs
index 68a7ef952702b31436ea18ff27bb83f3b67b3fe3..277eab5894365525e8b483833dc09ae46ef28aec 100644 (file)
@@ -12,8 +12,8 @@
 //! Instead of actually servicing sockets ourselves we require that you implement the
 //! SocketDescriptor interface and use that to receive actions which you should perform on the
 //! socket, and call into PeerManager with bytes read from the socket. The PeerManager will then
-//! call into the provided message handlers (probably a ChannelManager and NetGraphmsgHandler) with messages
-//! they should handle, and encoding/sending response messages.
+//! call into the provided message handlers (probably a ChannelManager and P2PGossipSync) with
+//! messages they should handle, and encoding/sending response messages.
 
 use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
 
@@ -25,10 +25,10 @@ use util::ser::{VecWriter, Writeable, Writer};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
 use ln::wire;
 use ln::wire::Encode;
+use routing::gossip::{NetworkGraph, P2PGossipSync};
 use util::atomic_counter::AtomicCounter;
 use util::events::{MessageSendEvent, MessageSendEventsProvider};
 use util::logger::Logger;
-use routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
 
 use prelude::*;
 use io;
@@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
        fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
        fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
-       fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) ->
-               Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { Vec::new() }
-       fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> { Vec::new() }
+       fn get_next_channel_announcement(&self, _starting_point: u64) ->
+               Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { None }
+       fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<msgs::NodeAnnouncement> { None }
        fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
        fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
        fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
@@ -208,10 +208,9 @@ pub struct MessageHandler<CM: Deref, RM: Deref> where
        /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
        pub chan_handler: CM,
        /// A message handler which handles messages updating our knowledge of the network channel
-       /// graph. Usually this is just a [`NetGraphMsgHandler`] object or an
-       /// [`IgnoringMessageHandler`].
+       /// graph. Usually this is just a [`P2PGossipSync`] object or an [`IgnoringMessageHandler`].
        ///
-       /// [`NetGraphMsgHandler`]: crate::routing::network_graph::NetGraphMsgHandler
+       /// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync
        pub route_handler: RM,
 }
 
@@ -324,6 +323,10 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
 /// tick. Once we have sent this many messages since the last ping, we send a ping right away to
 /// ensures we don't just fill up our send buffer and leave the peer with too many messages to
 /// process before the next ping.
+///
+/// Note that we continue responding to other messages even after we've sent this many messages, so
+/// it's more of a general guideline used for gossip backfill (and gossip forwarding, times
+/// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit.
 const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
 
 struct Peer {
@@ -379,6 +382,29 @@ impl Peer {
                        InitSyncTracker::NodesSyncing(pk) => pk < node_id,
                }
        }
+
+       /// Returns whether we should be reading bytes from this peer, based on whether its outbound
+       /// buffer still has space and we don't need to pause reads to get some writes out.
+       fn should_read(&self) -> bool {
+               self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
+       }
+
+       /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
+       /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
+       /// been drained.
+       fn should_buffer_gossip_backfill(&self) -> bool {
+               self.pending_outbound_buffer.is_empty() &&
+                       self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+       }
+
+       /// Returns whether this peer's buffer is full and we should drop gossip messages.
+       fn buffer_full_drop_gossip(&self) -> bool {
+               if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+                       || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
+                               return false
+               }
+               true
+       }
 }
 
 /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -388,7 +414,7 @@ impl Peer {
 /// issues such as overly long function definitions.
 ///
 /// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<NetGraphMsgHandler<Arc<NetworkGraph>, Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
 
 /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
 /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
@@ -398,7 +424,7 @@ pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArc
 /// helps with issues such as long function definitions.
 ///
 /// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e NetGraphMsgHandler<&'g NetworkGraph, &'h C, &'f L>, &'f L, IgnoringMessageHandler>;
+pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'f L, IgnoringMessageHandler>;
 
 /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
 /// socket events into messages which it passes on to its [`MessageHandler`].
@@ -623,8 +649,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// peer using the init message.
        /// The user should pass the remote network address of the host they are connected to.
        ///
-       /// Note that if an Err is returned here you MUST NOT call socket_disconnected for the new
-       /// descriptor but must disconnect the connection immediately.
+       /// If an `Err` is returned here you must disconnect the connection immediately.
        ///
        /// Returns a small number of bytes to send to the remote node (currently always 50).
        ///
@@ -672,9 +697,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// The user should pass the remote network address of the host they are connected to.
        ///
        /// May refuse the connection by returning an Err, but will never write bytes to the remote end
-       /// (outbound connector always speaks first). Note that if an Err is returned here you MUST NOT
-       /// call socket_disconnected for the new descriptor but must disconnect the connection
-       /// immediately.
+       /// (outbound connector always speaks first). If an `Err` is returned here you must disconnect
+       /// the connection immediately.
        ///
        /// Panics if descriptor is duplicative with some other descriptor which has not yet been
        /// [`socket_disconnected()`].
@@ -713,46 +737,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
        fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
                while !peer.awaiting_write_event {
-                       if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
+                       if peer.should_buffer_gossip_backfill() {
                                match peer.sync_status {
                                        InitSyncTracker::NoSyncRequested => {},
                                        InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
-                                               let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
-                                               let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
-                                               for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
-                                                       self.enqueue_message(peer, announce);
-                                                       if let &Some(ref update_a) = update_a_option {
-                                                               self.enqueue_message(peer, update_a);
+                                               if let Some((announce, update_a_option, update_b_option)) =
+                                                       self.message_handler.route_handler.get_next_channel_announcement(c)
+                                               {
+                                                       self.enqueue_message(peer, &announce);
+                                                       if let Some(update_a) = update_a_option {
+                                                               self.enqueue_message(peer, &update_a);
                                                        }
-                                                       if let &Some(ref update_b) = update_b_option {
-                                                               self.enqueue_message(peer, update_b);
+                                                       if let Some(update_b) = update_b_option {
+                                                               self.enqueue_message(peer, &update_b);
                                                        }
                                                        peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
-                                               }
-                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                               } else {
                                                        peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
                                                }
                                        },
                                        InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
-                                               let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
-                                               let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
-                                               for msg in all_messages.iter() {
-                                                       self.enqueue_message(peer, msg);
+                                               if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(None) {
+                                                       self.enqueue_message(peer, &msg);
                                                        peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
-                                               }
-                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                               } else {
                                                        peer.sync_status = InitSyncTracker::NoSyncRequested;
                                                }
                                        },
                                        InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
                                        InitSyncTracker::NodesSyncing(key) => {
-                                               let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
-                                               let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
-                                               for msg in all_messages.iter() {
-                                                       self.enqueue_message(peer, msg);
+                                               if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&key)) {
+                                                       self.enqueue_message(peer, &msg);
                                                        peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
-                                               }
-                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                               } else {
                                                        peer.sync_status = InitSyncTracker::NoSyncRequested;
                                                }
                                        },
@@ -768,11 +785,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        Some(buff) => buff,
                                };
 
-                               let should_be_reading = peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
                                let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
-                               let data_sent = descriptor.send_data(pending, should_be_reading);
+                               let data_sent = descriptor.send_data(pending, peer.should_read());
                                peer.pending_outbound_buffer_first_msg_offset += data_sent;
-                               if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
+                               peer.pending_outbound_buffer_first_msg_offset == next_buff.len()
                        } {
                                peer.pending_outbound_buffer_first_msg_offset = 0;
                                peer.pending_outbound_buffer.pop_front();
@@ -1048,7 +1064,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        }
                                                }
                                        }
-                                       pause_read = peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
+                                       pause_read = !peer.should_read();
 
                                        if let Some(message) = msg_to_handle {
                                                match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -1311,9 +1327,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1337,9 +1351,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_node_announcement(msg.contents.node_id) {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1362,9 +1374,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1927,11 +1937,18 @@ mod tests {
                peer_a.new_inbound_connection(fd_a.clone(), None).unwrap();
                assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
                peer_a.process_events();
-               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
                peer_b.process_events();
-               assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_a.read_event(&mut fd_a, &b_data).unwrap(), false);
+
                peer_a.process_events();
-               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
                (fd_a.clone(), fd_b.clone())
        }
 
@@ -2056,10 +2073,10 @@ mod tests {
 
                // Check that each peer has received the expected number of channel updates and channel
                // announcements.
-               assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
-               assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
-               assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
-               assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
+               assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
+               assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
+               assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
+               assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
        }
 
        #[test]
@@ -2085,14 +2102,16 @@ mod tests {
 
                assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
                peers[0].process_events();
-               assert_eq!(peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
                peers[1].process_events();
 
                // ...but if we get a second timer tick, we should disconnect the peer
                peers[0].timer_tick_occurred();
                assert_eq!(peers[0].peers.read().unwrap().len(), 0);
 
-               assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err());
+               let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+               assert!(peers[0].read_event(&mut fd_a, &b_data).is_err());
        }
 
        #[test]