From 7717fa23a8cccdd31afa086f5c2487e9ba90bbfc Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 9 Aug 2022 21:26:16 +0000 Subject: [PATCH] Backfill gossip without buffering directly in LDK Instead of backfilling gossip by buffering (up to) ten messages at a time, only buffer one message at a time, as the peers' outbound socket buffer drains. This moves the outbound backfill messages out of `PeerHandler` and into the operating system buffer, where it arguably belongs. Not buffering causes us to walk the gossip B-Trees somewhat more often, but avoids allocating vecs for the responses. While its probably (without having benchmarked it) a net performance loss, it simplifies buffer tracking and leaves us with more room to play with the buffer sizing constants as we add onion message forwarding which is an important win. Note that because we change how often we check if we're out of messages to send before pinging, we slightly change how many messages are exchanged at once, impacting the `test_do_attempt_write_data` constants. --- lightning-net-tokio/src/lib.rs | 4 +- lightning/src/ln/functional_test_utils.rs | 12 ++-- lightning/src/ln/msgs.rs | 16 ++--- lightning/src/ln/peer_handler.rs | 65 +++++++++----------- lightning/src/routing/gossip.rs | 73 ++++++++++------------- lightning/src/util/test_utils.rs | 23 +++---- 6 files changed, 84 insertions(+), 109 deletions(-) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 645a7434..1f00fcb3 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -562,8 +562,8 @@ mod tests { fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result { Ok(false) } fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result { Ok(false) } fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result { Ok(false) } - fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { Vec::new() } - fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } + fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(ChannelAnnouncement, Option, Option)> { None } + fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option { None } fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init) { } fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 54d199a2..aa2ad217 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -318,20 +318,20 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { ); let mut chan_progress = 0; loop { - let orig_announcements = self.gossip_sync.get_next_channel_announcements(chan_progress, 255); - let deserialized_announcements = gossip_sync.get_next_channel_announcements(chan_progress, 255); + let orig_announcements = self.gossip_sync.get_next_channel_announcement(chan_progress); + let deserialized_announcements = gossip_sync.get_next_channel_announcement(chan_progress); assert!(orig_announcements == deserialized_announcements); - chan_progress = match orig_announcements.last() { + chan_progress = match orig_announcements { Some(announcement) => announcement.0.contents.short_channel_id + 1, None => break, }; } let mut node_progress = None; loop { - let orig_announcements = self.gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255); - let deserialized_announcements = gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255); + let orig_announcements = self.gossip_sync.get_next_node_announcement(node_progress.as_ref()); + let deserialized_announcements = gossip_sync.get_next_node_announcement(node_progress.as_ref()); assert!(orig_announcements == deserialized_announcements); - node_progress = match orig_announcements.last() { + node_progress = match orig_announcements { Some(announcement) => Some(announcement.contents.node_id), None => break, }; diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index 05a336c1..975b40c9 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -915,15 +915,15 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { /// Handle an incoming channel_update message, returning true if it should be forwarded on, /// false or returning an Err otherwise. fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result; - /// Gets a subset of the channel announcements and updates required to dump our routing table - /// to a remote node, starting at the short_channel_id indicated by starting_point and - /// including the batch_amount entries immediately higher in numerical value than starting_point. - fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)>; - /// Gets a subset of the node announcements required to dump our routing table to a remote node, - /// starting at the node *after* the provided publickey and including batch_amount entries - /// immediately higher (as defined by ::cmp) than starting_point. + /// Gets channel announcements and updates required to dump our routing table to a remote node, + /// starting at the short_channel_id indicated by starting_point and including announcements + /// for a single channel. + fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option, Option)>; + /// Gets a node announcement required to dump our routing table to a remote node, starting at + /// the node *after* the provided pubkey and including up to one announcement immediately + /// higher (as defined by ::cmp) than starting_point. /// If None is provided for starting_point, we start at the first node. - fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec; + fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option; /// Called when a connection is established with a peer. This can be used to /// perform routing table synchronization using a strategy defined by the /// implementor. diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 165d607f..277eab58 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result { Ok(false) } fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result { Ok(false) } fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result { Ok(false) } - fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> - Vec<(msgs::ChannelAnnouncement, Option, Option)> { Vec::new() } - fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } + fn get_next_channel_announcement(&self, _starting_point: u64) -> + Option<(msgs::ChannelAnnouncement, Option, Option)> { None } + fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option { None } fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {} fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } @@ -383,19 +383,17 @@ impl Peer { } } - /// Returns the number of gossip messages we can fit in this peer's buffer. - fn gossip_buffer_slots_available(&self) -> usize { - OUTBOUND_BUFFER_LIMIT_READ_PAUSE.saturating_sub(self.pending_outbound_buffer.len()) - } - /// Returns whether we should be reading bytes from this peer, based on whether its outbound /// buffer still has space and we don't need to pause reads to get some writes out. fn should_read(&self) -> bool { self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE } - fn should_backfill_gossip(&self) -> bool { - self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && + /// Determines if we should push additional gossip messages onto a peer's outbound buffer for + /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have + /// been drained. + fn should_buffer_gossip_backfill(&self) -> bool { + self.pending_outbound_buffer.is_empty() && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK } @@ -739,46 +737,39 @@ impl P fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { while !peer.awaiting_write_event { - if peer.should_backfill_gossip() { + if peer.should_buffer_gossip_backfill() { match peer.sync_status { InitSyncTracker::NoSyncRequested => {}, InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => { - let steps = ((peer.gossip_buffer_slots_available() + 2) / 3) as u8; - let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps); - for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() { - self.enqueue_message(peer, announce); - if let &Some(ref update_a) = update_a_option { - self.enqueue_message(peer, update_a); + if let Some((announce, update_a_option, update_b_option)) = + self.message_handler.route_handler.get_next_channel_announcement(c) + { + self.enqueue_message(peer, &announce); + if let Some(update_a) = update_a_option { + self.enqueue_message(peer, &update_a); } - if let &Some(ref update_b) = update_b_option { - self.enqueue_message(peer, update_b); + if let Some(update_b) = update_b_option { + self.enqueue_message(peer, &update_b); } peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1); - } - if all_messages.is_empty() || all_messages.len() != steps as usize { + } else { peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff); } }, InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => { - let steps = peer.gossip_buffer_slots_available() as u8; - let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps); - for msg in all_messages.iter() { - self.enqueue_message(peer, msg); + if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(None) { + self.enqueue_message(peer, &msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); - } - if all_messages.is_empty() || all_messages.len() != steps as usize { + } else { peer.sync_status = InitSyncTracker::NoSyncRequested; } }, InitSyncTracker::ChannelsSyncing(_) => unreachable!(), InitSyncTracker::NodesSyncing(key) => { - let steps = peer.gossip_buffer_slots_available() as u8; - let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps); - for msg in all_messages.iter() { - self.enqueue_message(peer, msg); + if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&key)) { + self.enqueue_message(peer, &msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); - } - if all_messages.is_empty() || all_messages.len() != steps as usize { + } else { peer.sync_status = InitSyncTracker::NoSyncRequested; } }, @@ -2082,10 +2073,10 @@ mod tests { // Check that each peer has received the expected number of channel updates and channel // announcements. - assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100); - assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50); - assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100); - assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50); + assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108); + assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54); + assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108); + assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54); } #[test] diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 1757d137..11a30289 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -318,11 +318,10 @@ where C::Target: chain::Access, L::Target: Logger Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY) } - fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { - let mut result = Vec::with_capacity(batch_amount as usize); + fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option, Option)> { let channels = self.network_graph.channels.read().unwrap(); let mut iter = channels.range(starting_point..); - while result.len() < batch_amount as usize { + loop { if let Some((_, ref chan)) = iter.next() { if chan.announcement_message.is_some() { let chan_announcement = chan.announcement_message.clone().unwrap(); @@ -334,20 +333,18 @@ where C::Target: chain::Access, L::Target: Logger if let Some(two_to_one) = chan.two_to_one.as_ref() { two_to_one_announcement = two_to_one.last_update_message.clone(); } - result.push((chan_announcement, one_to_two_announcement, two_to_one_announcement)); + return Some((chan_announcement, one_to_two_announcement, two_to_one_announcement)); } else { // TODO: We may end up sending un-announced channel_updates if we are sending // initial sync data while receiving announce/updates for this channel. } } else { - return result; + return None; } } - result } - fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec { - let mut result = Vec::with_capacity(batch_amount as usize); + fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option { let nodes = self.network_graph.nodes.read().unwrap(); let mut iter = if let Some(pubkey) = starting_point { let mut iter = nodes.range(NodeId::from_pubkey(pubkey)..); @@ -356,18 +353,17 @@ where C::Target: chain::Access, L::Target: Logger } else { nodes.range::(..) }; - while result.len() < batch_amount as usize { + loop { if let Some((_, ref node)) = iter.next() { if let Some(node_info) = node.announcement_info.as_ref() { - if node_info.announcement_message.is_some() { - result.push(node_info.announcement_message.clone().unwrap()); + if let Some(msg) = node_info.announcement_message.clone() { + return Some(msg); } } } else { - return result; + return None; } } - result } /// Initiates a stateless sync of routing gossip information with a peer @@ -2412,8 +2408,8 @@ mod tests { let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); // Channels were not announced yet. - let channels_with_announcements = gossip_sync.get_next_channel_announcements(0, 1); - assert_eq!(channels_with_announcements.len(), 0); + let channels_with_announcements = gossip_sync.get_next_channel_announcement(0); + assert!(channels_with_announcements.is_none()); let short_channel_id; { @@ -2427,17 +2423,15 @@ mod tests { } // Contains initial channel announcement now. - let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1); - assert_eq!(channels_with_announcements.len(), 1); - if let Some(channel_announcements) = channels_with_announcements.first() { - let &(_, ref update_1, ref update_2) = channel_announcements; + let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id); + if let Some(channel_announcements) = channels_with_announcements { + let (_, ref update_1, ref update_2) = channel_announcements; assert_eq!(update_1, &None); assert_eq!(update_2, &None); } else { panic!(); } - { // Valid channel update let valid_channel_update = get_signed_channel_update(|unsigned_channel_update| { @@ -2450,10 +2444,9 @@ mod tests { } // Now contains an initial announcement and an update. - let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1); - assert_eq!(channels_with_announcements.len(), 1); - if let Some(channel_announcements) = channels_with_announcements.first() { - let &(_, ref update_1, ref update_2) = channel_announcements; + let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id); + if let Some(channel_announcements) = channels_with_announcements { + let (_, ref update_1, ref update_2) = channel_announcements; assert_ne!(update_1, &None); assert_eq!(update_2, &None); } else { @@ -2473,10 +2466,9 @@ mod tests { } // Test that announcements with excess data won't be returned - let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1); - assert_eq!(channels_with_announcements.len(), 1); - if let Some(channel_announcements) = channels_with_announcements.first() { - let &(_, ref update_1, ref update_2) = channel_announcements; + let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id); + if let Some(channel_announcements) = channels_with_announcements { + let (_, ref update_1, ref update_2) = channel_announcements; assert_eq!(update_1, &None); assert_eq!(update_2, &None); } else { @@ -2484,8 +2476,8 @@ mod tests { } // Further starting point have no channels after it - let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id + 1000, 1); - assert_eq!(channels_with_announcements.len(), 0); + let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id + 1000); + assert!(channels_with_announcements.is_none()); } #[test] @@ -2497,8 +2489,8 @@ mod tests { let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_privkey); // No nodes yet. - let next_announcements = gossip_sync.get_next_node_announcements(None, 10); - assert_eq!(next_announcements.len(), 0); + let next_announcements = gossip_sync.get_next_node_announcement(None); + assert!(next_announcements.is_none()); { // Announce a channel to add 2 nodes @@ -2509,10 +2501,9 @@ mod tests { }; } - // Nodes were never announced - let next_announcements = gossip_sync.get_next_node_announcements(None, 3); - assert_eq!(next_announcements.len(), 0); + let next_announcements = gossip_sync.get_next_node_announcement(None); + assert!(next_announcements.is_none()); { let valid_announcement = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx); @@ -2528,12 +2519,12 @@ mod tests { }; } - let next_announcements = gossip_sync.get_next_node_announcements(None, 3); - assert_eq!(next_announcements.len(), 2); + let next_announcements = gossip_sync.get_next_node_announcement(None); + assert!(next_announcements.is_some()); // Skip the first node. - let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2); - assert_eq!(next_announcements.len(), 1); + let next_announcements = gossip_sync.get_next_node_announcement(Some(&node_id_1)); + assert!(next_announcements.is_some()); { // Later announcement which should not be relayed (excess data) prevent us from sharing a node @@ -2547,8 +2538,8 @@ mod tests { }; } - let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2); - assert_eq!(next_announcements.len(), 0); + let next_announcements = gossip_sync.get_next_node_announcement(Some(&node_id_1)); + assert!(next_announcements.is_none()); } #[test] diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 2f80fb73..fe009cc1 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -45,7 +45,7 @@ use prelude::*; use core::time::Duration; use sync::{Mutex, Arc}; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use core::{cmp, mem}; +use core::mem; use bitcoin::bech32::u5; use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial}; @@ -445,23 +445,16 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel); Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError }) } - fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option, Option)> { - let mut chan_anns = Vec::new(); - const TOTAL_UPDS: u64 = 50; - let end: u64 = cmp::min(starting_point + batch_amount as u64, TOTAL_UPDS); - for i in starting_point..end { - let chan_upd_1 = get_dummy_channel_update(i); - let chan_upd_2 = get_dummy_channel_update(i); - let chan_ann = get_dummy_channel_announcement(i); + fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option, Option)> { + let chan_upd_1 = get_dummy_channel_update(starting_point); + let chan_upd_2 = get_dummy_channel_update(starting_point); + let chan_ann = get_dummy_channel_announcement(starting_point); - chan_anns.push((chan_ann, Some(chan_upd_1), Some(chan_upd_2))); - } - - chan_anns + Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2))) } - fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { - Vec::new() + fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option { + None } fn peer_connected(&self, their_node_id: &PublicKey, init_msg: &msgs::Init) { -- 2.30.2