Merge pull request #1660 from TheBlueMatt/2022-08-cleanup-ratelimits
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Tue, 16 Aug 2022 04:43:02 +0000 (04:43 +0000)
committerGitHub <noreply@github.com>
Tue, 16 Aug 2022 04:43:02 +0000 (04:43 +0000)
Backfill gossip without buffering directly in LDK

lightning-net-tokio/src/lib.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/msgs.rs
lightning/src/ln/peer_handler.rs
lightning/src/routing/gossip.rs
lightning/src/util/test_utils.rs

index f980addefcfd56f2574bc54e969626491cdcbae7..ac9d4bb3bd5899a08e0dfaadebd811fb779e2c04 100644 (file)
@@ -564,8 +564,8 @@ mod tests {
                fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
                fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
                fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
-               fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
-               fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
+               fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { None }
+               fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> { None }
                fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init) { }
                fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
                fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
index 4c91d4f7b1cd9f81208afb5f9367156c18d47b06..ecd1d0ae22850e6e292b4f9b6fa75da388b68448 100644 (file)
@@ -319,20 +319,20 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
                                );
                                let mut chan_progress = 0;
                                loop {
-                                       let orig_announcements = self.gossip_sync.get_next_channel_announcements(chan_progress, 255);
-                                       let deserialized_announcements = gossip_sync.get_next_channel_announcements(chan_progress, 255);
+                                       let orig_announcements = self.gossip_sync.get_next_channel_announcement(chan_progress);
+                                       let deserialized_announcements = gossip_sync.get_next_channel_announcement(chan_progress);
                                        assert!(orig_announcements == deserialized_announcements);
-                                       chan_progress = match orig_announcements.last() {
+                                       chan_progress = match orig_announcements {
                                                Some(announcement) => announcement.0.contents.short_channel_id + 1,
                                                None => break,
                                        };
                                }
                                let mut node_progress = None;
                                loop {
-                                       let orig_announcements = self.gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255);
-                                       let deserialized_announcements = gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255);
+                                       let orig_announcements = self.gossip_sync.get_next_node_announcement(node_progress.as_ref());
+                                       let deserialized_announcements = gossip_sync.get_next_node_announcement(node_progress.as_ref());
                                        assert!(orig_announcements == deserialized_announcements);
-                                       node_progress = match orig_announcements.last() {
+                                       node_progress = match orig_announcements {
                                                Some(announcement) => Some(announcement.contents.node_id),
                                                None => break,
                                        };
index 95c7e61b67426a5420c8d5b20056b5f9d7477f3b..ffc595ccedab5bbf547f0f33d1426e0214e4e6bc 100644 (file)
@@ -915,15 +915,15 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
        /// Handle an incoming channel_update message, returning true if it should be forwarded on,
        /// false or returning an Err otherwise.
        fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError>;
-       /// Gets a subset of the channel announcements and updates required to dump our routing table
-       /// to a remote node, starting at the short_channel_id indicated by starting_point and
-       /// including the batch_amount entries immediately higher in numerical value than starting_point.
-       fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)>;
-       /// Gets a subset of the node announcements required to dump our routing table to a remote node,
-       /// starting at the node *after* the provided publickey and including batch_amount entries
-       /// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
+       /// Gets channel announcements and updates required to dump our routing table to a remote node,
+       /// starting at the short_channel_id indicated by starting_point and including announcements
+       /// for a single channel.
+       fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)>;
+       /// Gets a node announcement required to dump our routing table to a remote node, starting at
+       /// the node *after* the provided pubkey and including up to one announcement immediately
+       /// higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
        /// If None is provided for starting_point, we start at the first node.
-       fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
+       fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement>;
        /// Called when a connection is established with a peer. This can be used to
        /// perform routing table synchronization using a strategy defined by the
        /// implementor.
index d34fdfeb52a1ddcb887bfd42c4cfcddac8e7d0e9..623aa969dfd4262da60773428c6d03aff681a4ab 100644 (file)
@@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
        fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
        fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
-       fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) ->
-               Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { Vec::new() }
-       fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> { Vec::new() }
+       fn get_next_channel_announcement(&self, _starting_point: u64) ->
+               Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { None }
+       fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<msgs::NodeAnnouncement> { None }
        fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
        fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
        fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
@@ -323,6 +323,10 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
 /// tick. Once we have sent this many messages since the last ping, we send a ping right away to
 /// ensures we don't just fill up our send buffer and leave the peer with too many messages to
 /// process before the next ping.
+///
+/// Note that we continue responding to other messages even after we've sent this many messages, so
+/// it's more of a general guideline used for gossip backfill (and gossip forwarding, times
+/// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit.
 const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
 
 struct Peer {
@@ -378,6 +382,29 @@ impl Peer {
                        InitSyncTracker::NodesSyncing(pk) => pk < node_id,
                }
        }
+
+       /// Returns whether we should be reading bytes from this peer, based on whether its outbound
+       /// buffer still has space and we don't need to pause reads to get some writes out.
+       fn should_read(&self) -> bool {
+               self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
+       }
+
+       /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
+       /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
+       /// been drained.
+       fn should_buffer_gossip_backfill(&self) -> bool {
+               self.pending_outbound_buffer.is_empty() &&
+                       self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+       }
+
+       /// Returns whether this peer's buffer is full and we should drop gossip messages.
+       fn buffer_full_drop_gossip(&self) -> bool {
+               if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+                       || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
+                               return false
+               }
+               true
+       }
 }
 
 /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -710,46 +737,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
        fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
                while !peer.awaiting_write_event {
-                       if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
+                       if peer.should_buffer_gossip_backfill() {
                                match peer.sync_status {
                                        InitSyncTracker::NoSyncRequested => {},
                                        InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
-                                               let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
-                                               let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
-                                               for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
-                                                       self.enqueue_message(peer, announce);
-                                                       if let &Some(ref update_a) = update_a_option {
-                                                               self.enqueue_message(peer, update_a);
+                                               if let Some((announce, update_a_option, update_b_option)) =
+                                                       self.message_handler.route_handler.get_next_channel_announcement(c)
+                                               {
+                                                       self.enqueue_message(peer, &announce);
+                                                       if let Some(update_a) = update_a_option {
+                                                               self.enqueue_message(peer, &update_a);
                                                        }
-                                                       if let &Some(ref update_b) = update_b_option {
-                                                               self.enqueue_message(peer, update_b);
+                                                       if let Some(update_b) = update_b_option {
+                                                               self.enqueue_message(peer, &update_b);
                                                        }
                                                        peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
-                                               }
-                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                               } else {
                                                        peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
                                                }
                                        },
                                        InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
-                                               let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
-                                               let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
-                                               for msg in all_messages.iter() {
-                                                       self.enqueue_message(peer, msg);
+                                               if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(None) {
+                                                       self.enqueue_message(peer, &msg);
                                                        peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
-                                               }
-                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                               } else {
                                                        peer.sync_status = InitSyncTracker::NoSyncRequested;
                                                }
                                        },
                                        InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
                                        InitSyncTracker::NodesSyncing(key) => {
-                                               let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8;
-                                               let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
-                                               for msg in all_messages.iter() {
-                                                       self.enqueue_message(peer, msg);
+                                               if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&key)) {
+                                                       self.enqueue_message(peer, &msg);
                                                        peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
-                                               }
-                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                               } else {
                                                        peer.sync_status = InitSyncTracker::NoSyncRequested;
                                                }
                                        },
@@ -759,18 +779,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                self.maybe_send_extra_ping(peer);
                        }
 
-                       if {
-                               let next_buff = match peer.pending_outbound_buffer.front() {
-                                       None => return,
-                                       Some(buff) => buff,
-                               };
+                       let next_buff = match peer.pending_outbound_buffer.front() {
+                               None => return,
+                               Some(buff) => buff,
+                       };
 
-                               let should_be_reading = peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
-                               let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
-                               let data_sent = descriptor.send_data(pending, should_be_reading);
-                               peer.pending_outbound_buffer_first_msg_offset += data_sent;
-                               if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
-                       } {
+                       let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
+                       let data_sent = descriptor.send_data(pending, peer.should_read());
+                       peer.pending_outbound_buffer_first_msg_offset += data_sent;
+                       if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
                                peer.pending_outbound_buffer_first_msg_offset = 0;
                                peer.pending_outbound_buffer.pop_front();
                        } else {
@@ -1045,7 +1062,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        }
                                                }
                                        }
-                                       pause_read = peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE;
+                                       pause_read = !peer.should_read();
 
                                        if let Some(message) = msg_to_handle {
                                                match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -1308,9 +1325,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1334,9 +1349,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_node_announcement(msg.contents.node_id) {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1359,9 +1372,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
-                                       {
+                                       if peer.buffer_full_drop_gossip() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -2060,10 +2071,10 @@ mod tests {
 
                // Check that each peer has received the expected number of channel updates and channel
                // announcements.
-               assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
-               assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
-               assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
-               assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
+               assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
+               assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
+               assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
+               assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
        }
 
        #[test]
index 1757d13730045f9d4246c18a0c150fbd668de412..925cd4cd4d32e6c0597217ff334413bc782d1a0e 100644 (file)
@@ -41,7 +41,7 @@ use core::{cmp, fmt};
 use sync::{RwLock, RwLockReadGuard};
 use core::sync::atomic::{AtomicUsize, Ordering};
 use sync::Mutex;
-use core::ops::Deref;
+use core::ops::{Bound, Deref};
 use bitcoin::hashes::hex::ToHex;
 
 #[cfg(feature = "std")]
@@ -318,56 +318,43 @@ where C::Target: chain::Access, L::Target: Logger
                Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY)
        }
 
-       fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
-               let mut result = Vec::with_capacity(batch_amount as usize);
+       fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
                let channels = self.network_graph.channels.read().unwrap();
-               let mut iter = channels.range(starting_point..);
-               while result.len() < batch_amount as usize {
-                       if let Some((_, ref chan)) = iter.next() {
-                               if chan.announcement_message.is_some() {
-                                       let chan_announcement = chan.announcement_message.clone().unwrap();
-                                       let mut one_to_two_announcement: Option<msgs::ChannelUpdate> = None;
-                                       let mut two_to_one_announcement: Option<msgs::ChannelUpdate> = None;
-                                       if let Some(one_to_two) = chan.one_to_two.as_ref() {
-                                               one_to_two_announcement = one_to_two.last_update_message.clone();
-                                       }
-                                       if let Some(two_to_one) = chan.two_to_one.as_ref() {
-                                               two_to_one_announcement = two_to_one.last_update_message.clone();
-                                       }
-                                       result.push((chan_announcement, one_to_two_announcement, two_to_one_announcement));
-                               } else {
-                                       // TODO: We may end up sending un-announced channel_updates if we are sending
-                                       // initial sync data while receiving announce/updates for this channel.
+               for (_, ref chan) in channels.range(starting_point..) {
+                       if chan.announcement_message.is_some() {
+                               let chan_announcement = chan.announcement_message.clone().unwrap();
+                               let mut one_to_two_announcement: Option<msgs::ChannelUpdate> = None;
+                               let mut two_to_one_announcement: Option<msgs::ChannelUpdate> = None;
+                               if let Some(one_to_two) = chan.one_to_two.as_ref() {
+                                       one_to_two_announcement = one_to_two.last_update_message.clone();
                                }
+                               if let Some(two_to_one) = chan.two_to_one.as_ref() {
+                                       two_to_one_announcement = two_to_one.last_update_message.clone();
+                               }
+                               return Some((chan_announcement, one_to_two_announcement, two_to_one_announcement));
                        } else {
-                               return result;
+                               // TODO: We may end up sending un-announced channel_updates if we are sending
+                               // initial sync data while receiving announce/updates for this channel.
                        }
                }
-               result
+               None
        }
 
-       fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement> {
-               let mut result = Vec::with_capacity(batch_amount as usize);
+       fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> {
                let nodes = self.network_graph.nodes.read().unwrap();
-               let mut iter = if let Some(pubkey) = starting_point {
-                               let mut iter = nodes.range(NodeId::from_pubkey(pubkey)..);
-                               iter.next();
-                               iter
+               let iter = if let Some(pubkey) = starting_point {
+                               nodes.range((Bound::Excluded(NodeId::from_pubkey(pubkey)), Bound::Unbounded))
                        } else {
-                               nodes.range::<NodeId, _>(..)
+                               nodes.range(..)
                        };
-               while result.len() < batch_amount as usize {
-                       if let Some((_, ref node)) = iter.next() {
-                               if let Some(node_info) = node.announcement_info.as_ref() {
-                                       if node_info.announcement_message.is_some() {
-                                               result.push(node_info.announcement_message.clone().unwrap());
-                                       }
+               for (_, ref node) in iter {
+                       if let Some(node_info) = node.announcement_info.as_ref() {
+                               if let Some(msg) = node_info.announcement_message.clone() {
+                                       return Some(msg);
                                }
-                       } else {
-                               return result;
                        }
                }
-               result
+               None
        }
 
        /// Initiates a stateless sync of routing gossip information with a peer
@@ -2412,8 +2399,8 @@ mod tests {
                let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
 
                // Channels were not announced yet.
-               let channels_with_announcements = gossip_sync.get_next_channel_announcements(0, 1);
-               assert_eq!(channels_with_announcements.len(), 0);
+               let channels_with_announcements = gossip_sync.get_next_channel_announcement(0);
+               assert!(channels_with_announcements.is_none());
 
                let short_channel_id;
                {
@@ -2427,17 +2414,15 @@ mod tests {
                }
 
                // Contains initial channel announcement now.
-               let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
-               assert_eq!(channels_with_announcements.len(), 1);
-               if let Some(channel_announcements) = channels_with_announcements.first() {
-                       let &(_, ref update_1, ref update_2) = channel_announcements;
+               let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id);
+               if let Some(channel_announcements) = channels_with_announcements {
+                       let (_, ref update_1, ref update_2) = channel_announcements;
                        assert_eq!(update_1, &None);
                        assert_eq!(update_2, &None);
                } else {
                        panic!();
                }
 
-
                {
                        // Valid channel update
                        let valid_channel_update = get_signed_channel_update(|unsigned_channel_update| {
@@ -2450,10 +2435,9 @@ mod tests {
                }
 
                // Now contains an initial announcement and an update.
-               let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
-               assert_eq!(channels_with_announcements.len(), 1);
-               if let Some(channel_announcements) = channels_with_announcements.first() {
-                       let &(_, ref update_1, ref update_2) = channel_announcements;
+               let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id);
+               if let Some(channel_announcements) = channels_with_announcements {
+                       let (_, ref update_1, ref update_2) = channel_announcements;
                        assert_ne!(update_1, &None);
                        assert_eq!(update_2, &None);
                } else {
@@ -2473,10 +2457,9 @@ mod tests {
                }
 
                // Test that announcements with excess data won't be returned
-               let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
-               assert_eq!(channels_with_announcements.len(), 1);
-               if let Some(channel_announcements) = channels_with_announcements.first() {
-                       let &(_, ref update_1, ref update_2) = channel_announcements;
+               let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id);
+               if let Some(channel_announcements) = channels_with_announcements {
+                       let (_, ref update_1, ref update_2) = channel_announcements;
                        assert_eq!(update_1, &None);
                        assert_eq!(update_2, &None);
                } else {
@@ -2484,8 +2467,8 @@ mod tests {
                }
 
                // Further starting point have no channels after it
-               let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id + 1000, 1);
-               assert_eq!(channels_with_announcements.len(), 0);
+               let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id + 1000);
+               assert!(channels_with_announcements.is_none());
        }
 
        #[test]
@@ -2497,8 +2480,8 @@ mod tests {
                let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
 
                // No nodes yet.
-               let next_announcements = gossip_sync.get_next_node_announcements(None, 10);
-               assert_eq!(next_announcements.len(), 0);
+               let next_announcements = gossip_sync.get_next_node_announcement(None);
+               assert!(next_announcements.is_none());
 
                {
                        // Announce a channel to add 2 nodes
@@ -2509,10 +2492,9 @@ mod tests {
                        };
                }
 
-
                // Nodes were never announced
-               let next_announcements = gossip_sync.get_next_node_announcements(None, 3);
-               assert_eq!(next_announcements.len(), 0);
+               let next_announcements = gossip_sync.get_next_node_announcement(None);
+               assert!(next_announcements.is_none());
 
                {
                        let valid_announcement = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
@@ -2528,12 +2510,12 @@ mod tests {
                        };
                }
 
-               let next_announcements = gossip_sync.get_next_node_announcements(None, 3);
-               assert_eq!(next_announcements.len(), 2);
+               let next_announcements = gossip_sync.get_next_node_announcement(None);
+               assert!(next_announcements.is_some());
 
                // Skip the first node.
-               let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2);
-               assert_eq!(next_announcements.len(), 1);
+               let next_announcements = gossip_sync.get_next_node_announcement(Some(&node_id_1));
+               assert!(next_announcements.is_some());
 
                {
                        // Later announcement which should not be relayed (excess data) prevent us from sharing a node
@@ -2547,8 +2529,8 @@ mod tests {
                        };
                }
 
-               let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2);
-               assert_eq!(next_announcements.len(), 0);
+               let next_announcements = gossip_sync.get_next_node_announcement(Some(&node_id_1));
+               assert!(next_announcements.is_none());
        }
 
        #[test]
index b2bf947e67c5c91f9baa8bfcf6fd8af512f8f6c7..b4d26904706240de647eec14ef97ef5b175ff8cf 100644 (file)
@@ -45,7 +45,7 @@ use prelude::*;
 use core::time::Duration;
 use sync::{Mutex, Arc};
 use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use core::{cmp, mem};
+use core::mem;
 use bitcoin::bech32::u5;
 use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial};
 
@@ -447,23 +447,16 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
                self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
                Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
        }
-       fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
-               let mut chan_anns = Vec::new();
-               const TOTAL_UPDS: u64 = 50;
-               let end: u64 = cmp::min(starting_point + batch_amount as u64, TOTAL_UPDS);
-               for i in starting_point..end {
-                       let chan_upd_1 = get_dummy_channel_update(i);
-                       let chan_upd_2 = get_dummy_channel_update(i);
-                       let chan_ann = get_dummy_channel_announcement(i);
+       fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
+               let chan_upd_1 = get_dummy_channel_update(starting_point);
+               let chan_upd_2 = get_dummy_channel_update(starting_point);
+               let chan_ann = get_dummy_channel_announcement(starting_point);
 
-                       chan_anns.push((chan_ann, Some(chan_upd_1), Some(chan_upd_2)));
-               }
-
-               chan_anns
+               Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
        }
 
-       fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
-               Vec::new()
+       fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<msgs::NodeAnnouncement> {
+               None
        }
 
        fn peer_connected(&self, their_node_id: &PublicKey, init_msg: &msgs::Init) {