From fda10964219013be151b2c9861f9771e321176d6 Mon Sep 17 00:00:00 2001 From: Schalk van Heerden Date: Tue, 2 Oct 2018 16:02:17 +0200 Subject: [PATCH] Handle-initial_routing_sync-requests-from-peers-in-their-Init-messages --- src/ln/msgs.rs | 23 ++++++- src/ln/peer_handler.rs | 95 +++++++++++++++++++------- src/ln/router.rs | 150 ++++++++++++++++++++++++++++++++++++----- src/util/test_utils.rs | 9 ++- 4 files changed, 232 insertions(+), 45 deletions(-) diff --git a/src/ln/msgs.rs b/src/ln/msgs.rs index 5c5d32f39..c25ce8805 100644 --- a/src/ln/msgs.rs +++ b/src/ln/msgs.rs @@ -334,7 +334,7 @@ pub struct AnnouncementSignatures { } /// An address which can be used to connect to a remote peer -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub enum NetAddress { /// An IPv4 address/port on which the peer is listenting. IPv4 { @@ -381,7 +381,7 @@ impl NetAddress { } } } - +#[derive(Clone, PartialEq)] // Only exposed as broadcast of node_announcement should be filtered by node_id /// The unsigned part of a node_announcement pub struct UnsignedNodeAnnouncement { @@ -398,6 +398,7 @@ pub struct UnsignedNodeAnnouncement { pub(crate) excess_address_data: Vec, pub(crate) excess_data: Vec, } +#[derive(Clone, PartialEq)] /// A node_announcement message to be sent or received from a peer pub struct NodeAnnouncement { pub(crate) signature: Signature, @@ -574,6 +575,18 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn /// Handle an incoming error message from the given peer. fn handle_error(&self, their_node_id: &PublicKey, msg: &ErrorMessage); } +#[derive(PartialEq)] +///Enum used to keep track of syncing information/state of peer and if a sync is required +pub enum InitSyncTracker{ + ///This indicates if a sync is required or not, false is no sync required, true is sync required but not started + Sync(bool), + ///This is the last synced node's public key + ///During this state it is syncing nodes + NodeCounter(PublicKey), + ///This is the last synced channel _id + ///During this state it is syncing nodes + ChannelCounter(u64), +} /// A trait to describe an object which can receive routing messages. pub trait RoutingMessageHandler : Send + Sync { @@ -588,6 +601,12 @@ pub trait RoutingMessageHandler : Send + Sync { fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result; /// Handle some updates to the route graph that we learned due to an outbound failed payment. fn handle_htlc_fail_channel_update(&self, update: &HTLCFailChannelUpdate); + ///Gets a subset of the channel announcements and updates required to dump our routing table to a remote node, starting at the short_channel_id indicated by starting_point.channelcounter and including batch_amount entries + /// This function will start iterating at 0 if the starting_point is < 0. + fn get_next_channel_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec<(ChannelAnnouncement, ChannelUpdate,ChannelUpdate)>); + ///Gets a subset of the node announcements required to dump our routing table to a remote node, starting at the PublicKey indicated by starting_point.nodecounter and including batch_amount entries + /// This function will start iterating at 0 if the starting_point is < 0. + fn get_next_node_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec); } pub(crate) struct OnionRealm0HopData { diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index 56649c462..90e2f9193 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -102,6 +102,24 @@ struct Peer { pending_read_buffer: Vec, pending_read_buffer_pos: usize, pending_read_is_header: bool, + sync_status : msgs::InitSyncTracker, +} + +impl Peer { + pub fn require_sync(&self)->bool{ + if let msgs::InitSyncTracker::Sync(i) = self.sync_status {i} else {false} + } + + /// this function checks if the the channel announcements and updates are allowed to be forwarded to a specific peer. + /// If the peer is in syncing state and the channel_id has not been synced then the function returns false as this info will forward at a later stage and + /// we dont want to send duplicate messages. If the channel was already synced then we can forward those messages and the function will then return true. + pub fn is_channel_allowed_to_forward(&self, channel_id : u64)->bool{ + match self.sync_status { + msgs::InitSyncTracker::Sync(i) => !i, + msgs::InitSyncTracker::NodeCounter(_i) => false, + msgs::InitSyncTracker::ChannelCounter(i) => (i < channel_id), + } + } } struct PeerHolder { @@ -221,6 +239,7 @@ impl PeerManager { pending_read_buffer: pending_read_buffer, pending_read_buffer_pos: 0, pending_read_is_header: false, + sync_status : msgs::InitSyncTracker::Sync(false), }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; @@ -255,22 +274,47 @@ impl PeerManager { pending_read_buffer: pending_read_buffer, pending_read_buffer_pos: 0, pending_read_is_header: false, + sync_status : msgs::InitSyncTracker::Sync(false), }).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; Ok(()) } - fn do_attempt_write_data(descriptor: &mut Descriptor, peer: &mut Peer) { + fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { + macro_rules! encode_and_send_msg { + ($msg: expr, $msg_code: expr) => { + { + log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap())); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..])); + } + } + } while !peer.awaiting_write_event { if { + let should_be_reading = peer.pending_outbound_buffer.len() < 10; + if (peer.require_sync()) &&(should_be_reading){ + match peer.sync_status{ + msgs::InitSyncTracker::ChannelCounter(_c) => { + let all_messages_tuple = self.message_handler.route_handler.get_next_channel_announcements(&mut peer.sync_status,(10-peer.pending_outbound_buffer.len()) as u8); + for tuple in all_messages_tuple.iter(){ + encode_and_send_msg!(tuple.0, 256); + encode_and_send_msg!(tuple.1, 258); + encode_and_send_msg!(tuple.2, 258); + } + }, + _=>{let all_messages = self.message_handler.route_handler.get_next_node_announcements(&mut peer.sync_status,(10-peer.pending_outbound_buffer.len()) as u8); + for message in all_messages.iter(){ + encode_and_send_msg!(message, 256); + }}, + }; + } let next_buff = match peer.pending_outbound_buffer.front() { None => return, Some(buff) => buff, }; - let should_be_reading = peer.pending_outbound_buffer.len() < 10; - let data_sent = descriptor.send_data(next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading); + let data_sent = descriptor.send_data(&next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading); peer.pending_outbound_buffer_first_msg_offset += data_sent; if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false } } { @@ -297,7 +341,7 @@ impl PeerManager { None => panic!("Descriptor for write_event is not already known to PeerManager"), Some(peer) => { peer.awaiting_write_event = false; - Self::do_attempt_write_data(descriptor, peer); + self.do_attempt_write_data(descriptor, peer); } }; Ok(()) @@ -522,6 +566,10 @@ impl PeerManager { if msg.local_features.supports_unknown_bits() { "present" } else { "none" }, if msg.global_features.supports_unknown_bits() { "present" } else { "none" }); + if msg.local_features.initial_routing_sync() { + peer.sync_status = msgs::InitSyncTracker::Sync(true); + peers.peers_needing_send.insert(peer_descriptor.clone()); + } peer.their_global_features = Some(msg.global_features); peer.their_local_features = Some(msg.local_features); @@ -531,6 +579,7 @@ impl PeerManager { self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel); local_features.set_initial_routing_sync(); } + encode_and_send_msg!(msgs::Init { global_features: msgs::GlobalFeatures::new(), local_features, @@ -678,7 +727,7 @@ impl PeerManager { } } - Self::do_attempt_write_data(peer_descriptor, peer); + self.do_attempt_write_data(peer_descriptor, peer); peer.pending_outbound_buffer.len() > 10 // pause_read } @@ -735,7 +784,7 @@ impl PeerManager { //TODO: Drop the pending channel? (or just let it timeout, but that sucks) }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", @@ -745,7 +794,7 @@ impl PeerManager { //TODO: Drop the pending channel? (or just let it timeout, but that sucks) }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", @@ -757,7 +806,7 @@ impl PeerManager { //they should just throw away this funding transaction }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { log_trace!(self, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", @@ -768,7 +817,7 @@ impl PeerManager { //they should just throw away this funding transaction }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => { log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {} for channel {}", @@ -778,7 +827,7 @@ impl PeerManager { //TODO: Do whatever we're gonna do for handling dropped messages }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { log_trace!(self, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", @@ -789,7 +838,7 @@ impl PeerManager { //they should just throw away this funding transaction }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", @@ -817,7 +866,7 @@ impl PeerManager { peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 134))); } peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", @@ -827,7 +876,7 @@ impl PeerManager { //TODO: Do whatever we're gonna do for handling dropped messages }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", @@ -837,7 +886,7 @@ impl PeerManager { //TODO: Do whatever we're gonna do for handling dropped messages }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}", @@ -847,7 +896,7 @@ impl PeerManager { //TODO: Do whatever we're gonna do for handling dropped messages }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", @@ -857,7 +906,7 @@ impl PeerManager { //TODO: Do whatever we're gonna do for handling dropped messages }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); @@ -866,7 +915,7 @@ impl PeerManager { let encoded_update_msg = encode_msg!(update_msg, 258); for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { - if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() { + if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||!peer.is_channel_allowed_to_forward(msg.contents.short_channel_id) { continue } match peer.their_node_id { @@ -879,7 +928,7 @@ impl PeerManager { } peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..])); - Self::do_attempt_write_data(&mut (*descriptor).clone(), peer); + self.do_attempt_write_data(&mut (*descriptor).clone(), peer); } } }, @@ -889,11 +938,11 @@ impl PeerManager { let encoded_msg = encode_msg!(msg, 258); for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { - if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() { + if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() || !peer.is_channel_allowed_to_forward(msg.contents.short_channel_id) { continue } peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); - Self::do_attempt_write_data(&mut (*descriptor).clone(), peer); + self.do_attempt_write_data(&mut (*descriptor).clone(), peer); } } }, @@ -914,7 +963,7 @@ impl PeerManager { peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17))); // This isn't guaranteed to work, but if there is enough free // room in the send buffer, put the error message there... - Self::do_attempt_write_data(&mut descriptor, &mut peer); + self.do_attempt_write_data(&mut descriptor, &mut peer); } else { log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id)); } @@ -932,7 +981,7 @@ impl PeerManager { //TODO: Do whatever we're gonna do for handling dropped messages }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17))); - Self::do_attempt_write_data(&mut descriptor, peer); + self.do_attempt_write_data(&mut descriptor, peer); }, } } else { @@ -944,7 +993,7 @@ impl PeerManager { for mut descriptor in peers.peers_needing_send.drain() { match peers.peers.get_mut(&descriptor) { - Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer), + Some(peer) => self.do_attempt_write_data(&mut descriptor, peer), None => panic!("Inconsistent peers set state!"), } } diff --git a/src/ln/router.rs b/src/ln/router.rs index 9fe2cfb50..ba04011a2 100644 --- a/src/ln/router.rs +++ b/src/ln/router.rs @@ -13,15 +13,15 @@ use bitcoin::blockdata::opcodes; use chain::chaininterface::{ChainError, ChainWatchInterface}; use ln::channelmanager; -use ln::msgs::{DecodeError,ErrorAction,HandleError,RoutingMessageHandler,NetAddress,GlobalFeatures}; +use ln::msgs::{DecodeError,ErrorAction,HandleError,RoutingMessageHandler,NetAddress,GlobalFeatures, InitSyncTracker}; use ln::msgs; use util::ser::{Writeable, Readable}; use util::logger::Logger; use std::cmp; use std::sync::{RwLock,Arc}; -use std::collections::{HashMap,BinaryHeap}; -use std::collections::hash_map::Entry; +use std::collections::{HashMap,BinaryHeap, BTreeMap}; +use std::collections::btree_map::Entry as BtreeEntry; use std; /// A hop in a route @@ -86,6 +86,7 @@ struct DirectionalChannelInfo { htlc_minimum_msat: u64, fee_base_msat: u32, fee_proportional_millionths: u32, + last_update_message : Option, } impl std::fmt::Display for DirectionalChannelInfo { @@ -99,6 +100,9 @@ struct ChannelInfo { features: GlobalFeatures, one_to_two: DirectionalChannelInfo, two_to_one: DirectionalChannelInfo, + //this is cached here so we can send out it later if required by route_init_sync + //keep an eye on this to see if the extra memory is a problem + announcement_message : Option, } impl std::fmt::Display for ChannelInfo { @@ -108,6 +112,7 @@ impl std::fmt::Display for ChannelInfo { } } +#[derive(PartialEq)] struct NodeInfo { #[cfg(feature = "non_bitcoin_chain_hash_routing")] channels: Vec<(u64, Sha256dHash)>, @@ -122,6 +127,9 @@ struct NodeInfo { rgb: [u8; 3], alias: [u8; 32], addresses: Vec, + //this is cached here so we can send out it later if required by route_init_sync + //keep an eye on this to see if the extra memory is a problem + announcement_message : Option, } impl std::fmt::Display for NodeInfo { @@ -133,19 +141,19 @@ impl std::fmt::Display for NodeInfo { struct NetworkMap { #[cfg(feature = "non_bitcoin_chain_hash_routing")] - channels: HashMap<(u64, Sha256dHash), ChannelInfo>, + channels: BTreeMap<(u64, Sha256dHash), ChannelInfo>, #[cfg(not(feature = "non_bitcoin_chain_hash_routing"))] - channels: HashMap, + channels: BTreeMap, our_node_id: PublicKey, - nodes: HashMap, + nodes: BTreeMap, } struct MutNetworkMap<'a> { #[cfg(feature = "non_bitcoin_chain_hash_routing")] - channels: &'a mut HashMap<(u64, Sha256dHash), ChannelInfo>, + channels: &'a mut BTreeMap<(u64, Sha256dHash), ChannelInfo>, #[cfg(not(feature = "non_bitcoin_chain_hash_routing"))] - channels: &'a mut HashMap, - nodes: &'a mut HashMap, + channels: &'a mut BTreeMap, + nodes: &'a mut BTreeMap, } impl NetworkMap { fn borrow_parts(&mut self) -> MutNetworkMap { @@ -252,6 +260,13 @@ impl RoutingMessageHandler for Router { node.rgb = msg.contents.rgb; node.alias = msg.contents.alias; node.addresses = msg.contents.addresses.clone(); + node.announcement_message = if msg.contents.excess_data.is_empty(){ + Some(msg.clone()) + } + else{ + None + }; + Ok(msg.contents.excess_data.is_empty() && msg.contents.excess_address_data.is_empty() && !msg.contents.features.supports_unknown_bits()) } } @@ -310,6 +325,7 @@ impl RoutingMessageHandler for Router { htlc_minimum_msat: u64::max_value(), fee_base_msat: u32::max_value(), fee_proportional_millionths: u32::max_value(), + last_update_message : None, }, two_to_one: DirectionalChannelInfo { src_node_id: msg.contents.node_id_2.clone(), @@ -319,11 +335,18 @@ impl RoutingMessageHandler for Router { htlc_minimum_msat: u64::max_value(), fee_base_msat: u32::max_value(), fee_proportional_millionths: u32::max_value(), + last_update_message : None, + }, + announcement_message : if msg.contents.excess_data.is_empty(){ + Some(msg.clone()) } + else{ + None + }, }; match network.channels.entry(NetworkMap::get_key(msg.contents.short_channel_id, msg.contents.chain_hash)) { - Entry::Occupied(mut entry) => { + BtreeEntry::Occupied(mut entry) => { //TODO: because asking the blockchain if short_channel_id is valid is only optional //in the blockchain API, we need to handle it smartly here, though its unclear //exactly how... @@ -342,7 +365,7 @@ impl RoutingMessageHandler for Router { return Err(HandleError{err: "Already have knowledge of channel", action: Some(ErrorAction::IgnoreError)}) } }, - Entry::Vacant(entry) => { + BtreeEntry::Vacant(entry) => { entry.insert(chan_info); } }; @@ -350,10 +373,10 @@ impl RoutingMessageHandler for Router { macro_rules! add_channel_to_node { ( $node_id: expr ) => { match network.nodes.entry($node_id) { - Entry::Occupied(node_entry) => { + BtreeEntry::Occupied(node_entry) => { node_entry.into_mut().channels.push(NetworkMap::get_key(msg.contents.short_channel_id, msg.contents.chain_hash)); }, - Entry::Vacant(node_entry) => { + BtreeEntry::Vacant(node_entry) => { node_entry.insert(NodeInfo { channels: vec!(NetworkMap::get_key(msg.contents.short_channel_id, msg.contents.chain_hash)), lowest_inbound_channel_fee_base_msat: u32::max_value(), @@ -363,6 +386,7 @@ impl RoutingMessageHandler for Router { rgb: [0; 3], alias: [0; 32], addresses: Vec::new(), + announcement_message: None, }); } } @@ -424,9 +448,14 @@ impl RoutingMessageHandler for Router { $target.htlc_minimum_msat = msg.contents.htlc_minimum_msat; $target.fee_base_msat = msg.contents.fee_base_msat; $target.fee_proportional_millionths = msg.contents.fee_proportional_millionths; + $target.last_update_message = if msg.contents.excess_data.is_empty(){ + Some(msg.clone()) + } + else{ + None + }; } } - let msg_hash = Message::from_slice(&Sha256dHash::from_data(&msg.contents.encode()[..])[..]).unwrap(); if msg.contents.flags & 1 == 1 { dest_node_id = channel.one_to_two.src_node_id.clone(); @@ -471,6 +500,53 @@ impl RoutingMessageHandler for Router { Ok(msg.contents.excess_data.is_empty()) } + + + fn get_next_channel_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec<(msgs::ChannelAnnouncement, msgs::ChannelUpdate,msgs::ChannelUpdate)>){ + let mut result = Vec::new(); + let network = self.network_map.read().unwrap(); + let mut starting_for_next = if let InitSyncTracker::ChannelCounter(i) = *starting_point {i} else {0 as u64}; + let mut iter = network.channels.range((starting_for_next)..); + for _x in 0..batch_amount{ + if let Some(ref value) = iter.next(){ + if value.1.announcement_message.is_some() && value.1.one_to_two.last_update_message.is_some() && value.1.two_to_one.last_update_message.is_some(){ + let channel_announcement = value.1.announcement_message.clone().unwrap(); + let channel_update1 = value.1.one_to_two.last_update_message.clone().unwrap(); + let channel_update2 = value.1.two_to_one.last_update_message.clone().unwrap(); + result.push((channel_announcement,channel_update1, channel_update2)); + } + starting_for_next = *(value.0) ;//adjusting start value so we can pass the last used back + } + } + *starting_point = if result.len() == 0{ + InitSyncTracker::Sync(false) //sync done so disable sync required + } else { + InitSyncTracker::ChannelCounter(starting_for_next) + }; + (result) + } + + fn get_next_node_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec){ + let mut result = Vec::new(); + let network = self.network_map.read().unwrap(); + let mut iter = if let InitSyncTracker::NodeCounter(i) = *starting_point {network.nodes.range((i)..)} else { + let first_item = network.nodes.iter().next(); + if let None = first_item {*starting_point = InitSyncTracker::Sync(false); return result} //for some reason we know of no nodes + network.nodes.range(*(first_item.unwrap().0)..)}; + for _x in 0..batch_amount{ + if let Some(ref value) = iter.next(){ + if value.1.announcement_message.is_some(){ + let node_announcement = value.1.announcement_message.clone().unwrap(); + result.push(node_announcement); + } + *starting_point = InitSyncTracker::NodeCounter(*(value.0)) ;//adjusting start value so we can pass the last used back + } + } + if result.len() == 0{ + *starting_point = InitSyncTracker::ChannelCounter(0) //node syncing done, move on towards channels + }; + result + } } #[derive(Eq, PartialEq)] @@ -504,7 +580,7 @@ struct DummyDirectionalChannelInfo { impl Router { /// Creates a new router with the given node_id to be used as the source for get_route() pub fn new(our_pubkey: PublicKey, chain_monitor: Arc, logger: Arc) -> Router { - let mut nodes = HashMap::new(); + let mut nodes = BTreeMap::new(); nodes.insert(our_pubkey.clone(), NodeInfo { channels: Vec::new(), lowest_inbound_channel_fee_base_msat: u32::max_value(), @@ -514,11 +590,12 @@ impl Router { rgb: [0; 3], alias: [0; 32], addresses: Vec::new(), + announcement_message: None, }); Router { secp_ctx: Secp256k1::verification_only(), network_map: RwLock::new(NetworkMap { - channels: HashMap::new(), + channels: BTreeMap::new(), our_node_id: our_pubkey, nodes: nodes, }), @@ -549,10 +626,10 @@ impl Router { unimplemented!(); } - fn remove_channel_in_nodes(nodes: &mut HashMap, chan: &ChannelInfo, short_channel_id: u64) { + fn remove_channel_in_nodes(nodes: &mut BTreeMap, chan: &ChannelInfo, short_channel_id: u64) { macro_rules! remove_from_node { ($node_id: expr) => { - if let Entry::Occupied(mut entry) = nodes.entry($node_id) { + if let BtreeEntry::Occupied(mut entry) = nodes.entry($node_id) { entry.get_mut().channels.retain(|chan_id| { short_channel_id != *NetworkMap::get_short_id(chan_id) }); @@ -877,6 +954,7 @@ mod tests { rgb: [0; 3], alias: [0; 32], addresses: Vec::new(), + announcement_message : None, }); network.channels.insert(NetworkMap::get_key(1, zero_hash.clone()), ChannelInfo { features: GlobalFeatures::new(), @@ -888,6 +966,7 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: u32::max_value(), // This value should be ignored fee_proportional_millionths: u32::max_value(), // This value should be ignored + last_update_message: None, }, two_to_one: DirectionalChannelInfo { src_node_id: node1.clone(), last_update: 0, @@ -896,7 +975,9 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, + announcement_message : None, }); network.nodes.insert(node2.clone(), NodeInfo { channels: vec!(NetworkMap::get_key(2, zero_hash.clone()), NetworkMap::get_key(4, zero_hash.clone())), @@ -907,6 +988,7 @@ mod tests { rgb: [0; 3], alias: [0; 32], addresses: Vec::new(), + announcement_message : None, }); network.channels.insert(NetworkMap::get_key(2, zero_hash.clone()), ChannelInfo { features: GlobalFeatures::new(), @@ -918,6 +1000,7 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: u32::max_value(), // This value should be ignored fee_proportional_millionths: u32::max_value(), // This value should be ignored + last_update_message: None, }, two_to_one: DirectionalChannelInfo { src_node_id: node2.clone(), last_update: 0, @@ -926,7 +1009,9 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, + announcement_message : None, }); network.nodes.insert(node8.clone(), NodeInfo { channels: vec!(NetworkMap::get_key(12, zero_hash.clone()), NetworkMap::get_key(13, zero_hash.clone())), @@ -937,6 +1022,7 @@ mod tests { rgb: [0; 3], alias: [0; 32], addresses: Vec::new(), + announcement_message : None, }); network.channels.insert(NetworkMap::get_key(12, zero_hash.clone()), ChannelInfo { features: GlobalFeatures::new(), @@ -948,6 +1034,7 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: u32::max_value(), // This value should be ignored fee_proportional_millionths: u32::max_value(), // This value should be ignored + last_update_message: None, }, two_to_one: DirectionalChannelInfo { src_node_id: node8.clone(), last_update: 0, @@ -956,7 +1043,9 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, + announcement_message : None, }); network.nodes.insert(node3.clone(), NodeInfo { channels: vec!( @@ -973,6 +1062,7 @@ mod tests { rgb: [0; 3], alias: [0; 32], addresses: Vec::new(), + announcement_message : None, }); network.channels.insert(NetworkMap::get_key(3, zero_hash.clone()), ChannelInfo { features: GlobalFeatures::new(), @@ -984,6 +1074,7 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, two_to_one: DirectionalChannelInfo { src_node_id: node3.clone(), last_update: 0, @@ -992,7 +1083,9 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 100, fee_proportional_millionths: 0, + last_update_message: None, }, + announcement_message : None, }); network.channels.insert(NetworkMap::get_key(4, zero_hash.clone()), ChannelInfo { features: GlobalFeatures::new(), @@ -1004,6 +1097,7 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 1000000, + last_update_message: None, }, two_to_one: DirectionalChannelInfo { src_node_id: node3.clone(), last_update: 0, @@ -1012,7 +1106,9 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, + announcement_message : None, }); network.channels.insert(NetworkMap::get_key(13, zero_hash.clone()), ChannelInfo { features: GlobalFeatures::new(), @@ -1024,6 +1120,7 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 2000000, + last_update_message: None, }, two_to_one: DirectionalChannelInfo { src_node_id: node3.clone(), last_update: 0, @@ -1032,7 +1129,9 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, + announcement_message : None, }); network.nodes.insert(node4.clone(), NodeInfo { channels: vec!(NetworkMap::get_key(5, zero_hash.clone()), NetworkMap::get_key(11, zero_hash.clone())), @@ -1043,6 +1142,7 @@ mod tests { rgb: [0; 3], alias: [0; 32], addresses: Vec::new(), + announcement_message : None, }); network.channels.insert(NetworkMap::get_key(5, zero_hash.clone()), ChannelInfo { features: GlobalFeatures::new(), @@ -1054,6 +1154,7 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 100, fee_proportional_millionths: 0, + last_update_message: None, }, two_to_one: DirectionalChannelInfo { src_node_id: node4.clone(), last_update: 0, @@ -1062,7 +1163,9 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, + announcement_message : None, }); network.nodes.insert(node5.clone(), NodeInfo { channels: vec!(NetworkMap::get_key(6, zero_hash.clone()), NetworkMap::get_key(11, zero_hash.clone())), @@ -1073,6 +1176,7 @@ mod tests { rgb: [0; 3], alias: [0; 32], addresses: Vec::new(), + announcement_message : None, }); network.channels.insert(NetworkMap::get_key(6, zero_hash.clone()), ChannelInfo { features: GlobalFeatures::new(), @@ -1084,6 +1188,7 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, two_to_one: DirectionalChannelInfo { src_node_id: node5.clone(), last_update: 0, @@ -1092,7 +1197,9 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, + announcement_message : None, }); network.channels.insert(NetworkMap::get_key(11, zero_hash.clone()), ChannelInfo { features: GlobalFeatures::new(), @@ -1104,6 +1211,7 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, two_to_one: DirectionalChannelInfo { src_node_id: node4.clone(), last_update: 0, @@ -1112,7 +1220,9 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, + announcement_message : None, }); network.nodes.insert(node6.clone(), NodeInfo { channels: vec!(NetworkMap::get_key(7, zero_hash.clone())), @@ -1123,6 +1233,7 @@ mod tests { rgb: [0; 3], alias: [0; 32], addresses: Vec::new(), + announcement_message : None, }); network.channels.insert(NetworkMap::get_key(7, zero_hash.clone()), ChannelInfo { features: GlobalFeatures::new(), @@ -1134,6 +1245,7 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 1000000, + last_update_message: None, }, two_to_one: DirectionalChannelInfo { src_node_id: node6.clone(), last_update: 0, @@ -1142,7 +1254,9 @@ mod tests { htlc_minimum_msat: 0, fee_base_msat: 0, fee_proportional_millionths: 0, + last_update_message: None, }, + announcement_message : None, }); } diff --git a/src/util/test_utils.rs b/src/util/test_utils.rs index 5da647efe..0ebe8e9a8 100644 --- a/src/util/test_utils.rs +++ b/src/util/test_utils.rs @@ -3,7 +3,7 @@ use chain::chaininterface::ConfirmationTarget; use chain::transaction::OutPoint; use ln::channelmonitor; use ln::msgs; -use ln::msgs::{HandleError}; +use ln::msgs::{HandleError, InitSyncTracker}; use util::events; use util::logger::{Logger, Level, Record}; use util::ser::{ReadableArgs, Writer}; @@ -157,7 +157,6 @@ impl TestRoutingMessageHandler { TestRoutingMessageHandler {} } } - impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result { Err(HandleError { err: "", action: None }) @@ -169,6 +168,12 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { Err(HandleError { err: "", action: None }) } fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {} + fn get_next_channel_announcements(&self, _starting_point: &mut InitSyncTracker, _batch_amount: u8)->(Vec<(msgs::ChannelAnnouncement, msgs::ChannelUpdate,msgs::ChannelUpdate)>){ + Vec::new() + } + fn get_next_node_announcements(&self, _starting_point: &mut InitSyncTracker, _batch_amount: u8)->(Vec){ + Vec::new() + } } pub struct TestLogger { -- 2.39.5