fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
- fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
- fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
+ fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { None }
+ fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> { None }
fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init) { }
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
);
let mut chan_progress = 0;
loop {
- let orig_announcements = self.gossip_sync.get_next_channel_announcements(chan_progress, 255);
- let deserialized_announcements = gossip_sync.get_next_channel_announcements(chan_progress, 255);
+ let orig_announcements = self.gossip_sync.get_next_channel_announcement(chan_progress);
+ let deserialized_announcements = gossip_sync.get_next_channel_announcement(chan_progress);
assert!(orig_announcements == deserialized_announcements);
- chan_progress = match orig_announcements.last() {
+ chan_progress = match orig_announcements {
Some(announcement) => announcement.0.contents.short_channel_id + 1,
None => break,
};
}
let mut node_progress = None;
loop {
- let orig_announcements = self.gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255);
- let deserialized_announcements = gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255);
+ let orig_announcements = self.gossip_sync.get_next_node_announcement(node_progress.as_ref());
+ let deserialized_announcements = gossip_sync.get_next_node_announcement(node_progress.as_ref());
assert!(orig_announcements == deserialized_announcements);
- node_progress = match orig_announcements.last() {
+ node_progress = match orig_announcements {
Some(announcement) => Some(announcement.contents.node_id),
None => break,
};
/// Handle an incoming channel_update message, returning true if it should be forwarded on,
/// false or returning an Err otherwise.
fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError>;
- /// Gets a subset of the channel announcements and updates required to dump our routing table
- /// to a remote node, starting at the short_channel_id indicated by starting_point and
- /// including the batch_amount entries immediately higher in numerical value than starting_point.
- fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)>;
- /// Gets a subset of the node announcements required to dump our routing table to a remote node,
- /// starting at the node *after* the provided publickey and including batch_amount entries
- /// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
+ /// Gets channel announcements and updates required to dump our routing table to a remote node,
+ /// starting at the short_channel_id indicated by starting_point and including announcements
+ /// for a single channel.
+ fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)>;
+ /// Gets a node announcement required to dump our routing table to a remote node, starting at
+ /// the node *after* the provided pubkey and including up to one announcement immediately
+ /// higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
/// If None is provided for starting_point, we start at the first node.
- fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
+ fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement>;
/// Called when a connection is established with a peer. This can be used to
/// perform routing table synchronization using a strategy defined by the
/// implementor.
fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
- fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) ->
- Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { Vec::new() }
- fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> { Vec::new() }
+ fn get_next_channel_announcement(&self, _starting_point: u64) ->
+ Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { None }
+ fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<msgs::NodeAnnouncement> { None }
fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
}
}
- /// Returns the number of gossip messages we can fit in this peer's buffer.
- fn gossip_buffer_slots_available(&self) -> usize {
- OUTBOUND_BUFFER_LIMIT_READ_PAUSE.saturating_sub(self.pending_outbound_buffer.len())
- }
-
/// Returns whether we should be reading bytes from this peer, based on whether its outbound
/// buffer still has space and we don't need to pause reads to get some writes out.
fn should_read(&self) -> bool {
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
}
- fn should_backfill_gossip(&self) -> bool {
- self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
+ /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
+ /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
+ /// been drained.
+ fn should_buffer_gossip_backfill(&self) -> bool {
+ self.pending_outbound_buffer.is_empty() &&
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
}
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
while !peer.awaiting_write_event {
- if peer.should_backfill_gossip() {
+ if peer.should_buffer_gossip_backfill() {
match peer.sync_status {
InitSyncTracker::NoSyncRequested => {},
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
- let steps = ((peer.gossip_buffer_slots_available() + 2) / 3) as u8;
- let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
- for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
- self.enqueue_message(peer, announce);
- if let &Some(ref update_a) = update_a_option {
- self.enqueue_message(peer, update_a);
+ if let Some((announce, update_a_option, update_b_option)) =
+ self.message_handler.route_handler.get_next_channel_announcement(c)
+ {
+ self.enqueue_message(peer, &announce);
+ if let Some(update_a) = update_a_option {
+ self.enqueue_message(peer, &update_a);
}
- if let &Some(ref update_b) = update_b_option {
- self.enqueue_message(peer, update_b);
+ if let Some(update_b) = update_b_option {
+ self.enqueue_message(peer, &update_b);
}
peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
- }
- if all_messages.is_empty() || all_messages.len() != steps as usize {
+ } else {
peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
}
},
InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
- let steps = peer.gossip_buffer_slots_available() as u8;
- let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
- for msg in all_messages.iter() {
- self.enqueue_message(peer, msg);
+ if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(None) {
+ self.enqueue_message(peer, &msg);
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
- }
- if all_messages.is_empty() || all_messages.len() != steps as usize {
+ } else {
peer.sync_status = InitSyncTracker::NoSyncRequested;
}
},
InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
InitSyncTracker::NodesSyncing(key) => {
- let steps = peer.gossip_buffer_slots_available() as u8;
- let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
- for msg in all_messages.iter() {
- self.enqueue_message(peer, msg);
+ if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&key)) {
+ self.enqueue_message(peer, &msg);
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
- }
- if all_messages.is_empty() || all_messages.len() != steps as usize {
+ } else {
peer.sync_status = InitSyncTracker::NoSyncRequested;
}
},
// Check that each peer has received the expected number of channel updates and channel
// announcements.
- assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
- assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
- assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
- assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
+ assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
+ assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
+ assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
+ assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
}
#[test]
Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY)
}
- fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
- let mut result = Vec::with_capacity(batch_amount as usize);
+ fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
let channels = self.network_graph.channels.read().unwrap();
let mut iter = channels.range(starting_point..);
- while result.len() < batch_amount as usize {
+ loop {
if let Some((_, ref chan)) = iter.next() {
if chan.announcement_message.is_some() {
let chan_announcement = chan.announcement_message.clone().unwrap();
if let Some(two_to_one) = chan.two_to_one.as_ref() {
two_to_one_announcement = two_to_one.last_update_message.clone();
}
- result.push((chan_announcement, one_to_two_announcement, two_to_one_announcement));
+ return Some((chan_announcement, one_to_two_announcement, two_to_one_announcement));
} else {
// TODO: We may end up sending un-announced channel_updates if we are sending
// initial sync data while receiving announce/updates for this channel.
}
} else {
- return result;
+ return None;
}
}
- result
}
- fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement> {
- let mut result = Vec::with_capacity(batch_amount as usize);
+ fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> {
let nodes = self.network_graph.nodes.read().unwrap();
let mut iter = if let Some(pubkey) = starting_point {
let mut iter = nodes.range(NodeId::from_pubkey(pubkey)..);
} else {
nodes.range::<NodeId, _>(..)
};
- while result.len() < batch_amount as usize {
+ loop {
if let Some((_, ref node)) = iter.next() {
if let Some(node_info) = node.announcement_info.as_ref() {
- if node_info.announcement_message.is_some() {
- result.push(node_info.announcement_message.clone().unwrap());
+ if let Some(msg) = node_info.announcement_message.clone() {
+ return Some(msg);
}
}
} else {
- return result;
+ return None;
}
}
- result
}
/// Initiates a stateless sync of routing gossip information with a peer
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
// Channels were not announced yet.
- let channels_with_announcements = gossip_sync.get_next_channel_announcements(0, 1);
- assert_eq!(channels_with_announcements.len(), 0);
+ let channels_with_announcements = gossip_sync.get_next_channel_announcement(0);
+ assert!(channels_with_announcements.is_none());
let short_channel_id;
{
}
// Contains initial channel announcement now.
- let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
- assert_eq!(channels_with_announcements.len(), 1);
- if let Some(channel_announcements) = channels_with_announcements.first() {
- let &(_, ref update_1, ref update_2) = channel_announcements;
+ let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id);
+ if let Some(channel_announcements) = channels_with_announcements {
+ let (_, ref update_1, ref update_2) = channel_announcements;
assert_eq!(update_1, &None);
assert_eq!(update_2, &None);
} else {
panic!();
}
-
{
// Valid channel update
let valid_channel_update = get_signed_channel_update(|unsigned_channel_update| {
}
// Now contains an initial announcement and an update.
- let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
- assert_eq!(channels_with_announcements.len(), 1);
- if let Some(channel_announcements) = channels_with_announcements.first() {
- let &(_, ref update_1, ref update_2) = channel_announcements;
+ let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id);
+ if let Some(channel_announcements) = channels_with_announcements {
+ let (_, ref update_1, ref update_2) = channel_announcements;
assert_ne!(update_1, &None);
assert_eq!(update_2, &None);
} else {
}
// Test that announcements with excess data won't be returned
- let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
- assert_eq!(channels_with_announcements.len(), 1);
- if let Some(channel_announcements) = channels_with_announcements.first() {
- let &(_, ref update_1, ref update_2) = channel_announcements;
+ let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id);
+ if let Some(channel_announcements) = channels_with_announcements {
+ let (_, ref update_1, ref update_2) = channel_announcements;
assert_eq!(update_1, &None);
assert_eq!(update_2, &None);
} else {
}
// Further starting point have no channels after it
- let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id + 1000, 1);
- assert_eq!(channels_with_announcements.len(), 0);
+ let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id + 1000);
+ assert!(channels_with_announcements.is_none());
}
#[test]
let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
// No nodes yet.
- let next_announcements = gossip_sync.get_next_node_announcements(None, 10);
- assert_eq!(next_announcements.len(), 0);
+ let next_announcements = gossip_sync.get_next_node_announcement(None);
+ assert!(next_announcements.is_none());
{
// Announce a channel to add 2 nodes
};
}
-
// Nodes were never announced
- let next_announcements = gossip_sync.get_next_node_announcements(None, 3);
- assert_eq!(next_announcements.len(), 0);
+ let next_announcements = gossip_sync.get_next_node_announcement(None);
+ assert!(next_announcements.is_none());
{
let valid_announcement = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
};
}
- let next_announcements = gossip_sync.get_next_node_announcements(None, 3);
- assert_eq!(next_announcements.len(), 2);
+ let next_announcements = gossip_sync.get_next_node_announcement(None);
+ assert!(next_announcements.is_some());
// Skip the first node.
- let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2);
- assert_eq!(next_announcements.len(), 1);
+ let next_announcements = gossip_sync.get_next_node_announcement(Some(&node_id_1));
+ assert!(next_announcements.is_some());
{
// Later announcement which should not be relayed (excess data) prevent us from sharing a node
};
}
- let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2);
- assert_eq!(next_announcements.len(), 0);
+ let next_announcements = gossip_sync.get_next_node_announcement(Some(&node_id_1));
+ assert!(next_announcements.is_none());
}
#[test]
use core::time::Duration;
use sync::{Mutex, Arc};
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use core::{cmp, mem};
+use core::mem;
use bitcoin::bech32::u5;
use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial};
self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
}
- fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
- let mut chan_anns = Vec::new();
- const TOTAL_UPDS: u64 = 50;
- let end: u64 = cmp::min(starting_point + batch_amount as u64, TOTAL_UPDS);
- for i in starting_point..end {
- let chan_upd_1 = get_dummy_channel_update(i);
- let chan_upd_2 = get_dummy_channel_update(i);
- let chan_ann = get_dummy_channel_announcement(i);
+ fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
+ let chan_upd_1 = get_dummy_channel_update(starting_point);
+ let chan_upd_2 = get_dummy_channel_update(starting_point);
+ let chan_ann = get_dummy_channel_announcement(starting_point);
- chan_anns.push((chan_ann, Some(chan_upd_1), Some(chan_upd_2)));
- }
-
- chan_anns
+ Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
}
- fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
- Vec::new()
+ fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option<msgs::NodeAnnouncement> {
+ None
}
fn peer_connected(&self, their_node_id: &PublicKey, init_msg: &msgs::Init) {