X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=277eab5894365525e8b483833dc09ae46ef28aec;hb=7717fa23a8cccdd31afa086f5c2487e9ba90bbfc;hp=d34fdfeb52a1ddcb887bfd42c4cfcddac8e7d0e9;hpb=4905df889498ec520932ea6684b0ffabbd226641;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index d34fdfeb..277eab58 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result { Ok(false) } fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result { Ok(false) } fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result { Ok(false) } - fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> - Vec<(msgs::ChannelAnnouncement, Option, Option)> { Vec::new() } - fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } + fn get_next_channel_announcement(&self, _starting_point: u64) -> + Option<(msgs::ChannelAnnouncement, Option, Option)> { None } + fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option { None } fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {} fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } @@ -323,6 +323,10 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4; /// tick. Once we have sent this many messages since the last ping, we send a ping right away to /// ensures we don't just fill up our send buffer and leave the peer with too many messages to /// process before the next ping. +/// +/// Note that we continue responding to other messages even after we've sent this many messages, so +/// it's more of a general guideline used for gossip backfill (and gossip forwarding, times +/// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit. const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; struct Peer { @@ -378,6 +382,29 @@ impl Peer { InitSyncTracker::NodesSyncing(pk) => pk < node_id, } } + + /// Returns whether we should be reading bytes from this peer, based on whether its outbound + /// buffer still has space and we don't need to pause reads to get some writes out. + fn should_read(&self) -> bool { + self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE + } + + /// Determines if we should push additional gossip messages onto a peer's outbound buffer for + /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have + /// been drained. + fn should_buffer_gossip_backfill(&self) -> bool { + self.pending_outbound_buffer.is_empty() && + self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK + } + + /// Returns whether this peer's buffer is full and we should drop gossip messages. + fn buffer_full_drop_gossip(&self) -> bool { + if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO { + return false + } + true + } } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -710,46 +737,39 @@ impl P fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { while !peer.awaiting_write_event { - if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK { + if peer.should_buffer_gossip_backfill() { match peer.sync_status { InitSyncTracker::NoSyncRequested => {}, InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => { - let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8; - let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps); - for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() { - self.enqueue_message(peer, announce); - if let &Some(ref update_a) = update_a_option { - self.enqueue_message(peer, update_a); + if let Some((announce, update_a_option, update_b_option)) = + self.message_handler.route_handler.get_next_channel_announcement(c) + { + self.enqueue_message(peer, &announce); + if let Some(update_a) = update_a_option { + self.enqueue_message(peer, &update_a); } - if let &Some(ref update_b) = update_b_option { - self.enqueue_message(peer, update_b); + if let Some(update_b) = update_b_option { + self.enqueue_message(peer, &update_b); } peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1); - } - if all_messages.is_empty() || all_messages.len() != steps as usize { + } else { peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff); } }, InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => { - let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8; - let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps); - for msg in all_messages.iter() { - self.enqueue_message(peer, msg); + if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(None) { + self.enqueue_message(peer, &msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); - } - if all_messages.is_empty() || all_messages.len() != steps as usize { + } else { peer.sync_status = InitSyncTracker::NoSyncRequested; } }, InitSyncTracker::ChannelsSyncing(_) => unreachable!(), InitSyncTracker::NodesSyncing(key) => { - let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8; - let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps); - for msg in all_messages.iter() { - self.enqueue_message(peer, msg); + if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&key)) { + self.enqueue_message(peer, &msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); - } - if all_messages.is_empty() || all_messages.len() != steps as usize { + } else { peer.sync_status = InitSyncTracker::NoSyncRequested; } }, @@ -765,11 +785,10 @@ impl P Some(buff) => buff, }; - let should_be_reading = peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE; let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; - let data_sent = descriptor.send_data(pending, should_be_reading); + let data_sent = descriptor.send_data(pending, peer.should_read()); peer.pending_outbound_buffer_first_msg_offset += data_sent; - if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false } + peer.pending_outbound_buffer_first_msg_offset == next_buff.len() } { peer.pending_outbound_buffer_first_msg_offset = 0; peer.pending_outbound_buffer.pop_front(); @@ -1045,7 +1064,7 @@ impl P } } } - pause_read = peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE; + pause_read = !peer.should_read(); if let Some(message) = msg_to_handle { match self.handle_message(&peer_mutex, peer_lock, message) { @@ -1308,9 +1327,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1334,9 +1351,7 @@ impl P !peer.should_forward_node_announcement(msg.contents.node_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1359,9 +1374,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -2060,10 +2073,10 @@ mod tests { // Check that each peer has received the expected number of channel updates and channel // announcements. - assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100); - assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50); - assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100); - assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50); + assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108); + assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54); + assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108); + assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54); } #[test]