}
/// An address which can be used to connect to a remote peer
-#[derive(Clone, PartialEq)]
+#[derive(Clone)]
pub enum NetAddress {
/// An IPv4 address/port on which the peer is listenting.
IPv4 {
}
}
}
-#[derive(Clone, PartialEq)]
+
+#[derive(Clone)]
// Only exposed as broadcast of node_announcement should be filtered by node_id
/// The unsigned part of a node_announcement
pub struct UnsignedNodeAnnouncement {
pub(crate) excess_address_data: Vec<u8>,
pub(crate) excess_data: Vec<u8>,
}
-#[derive(Clone, PartialEq)]
+#[derive(Clone)]
/// A node_announcement message to be sent or received from a peer
pub struct NodeAnnouncement {
pub(crate) signature: Signature,
/// Handle an incoming error message from the given peer.
fn handle_error(&self, their_node_id: &PublicKey, msg: &ErrorMessage);
}
-#[derive(PartialEq)]
-///Enum used to keep track of syncing information/state of peer and if a sync is required
-pub enum InitSyncTracker{
- ///This indicates if a sync is required or not, false is no sync required, true is sync required but not started
- Sync(bool),
- ///This is the last synced node's public key
- ///During this state it is syncing nodes
- NodeCounter(PublicKey),
- ///This is the last synced channel _id
- ///During this state it is syncing nodes
- ChannelCounter(u64),
-}
/// A trait to describe an object which can receive routing messages.
pub trait RoutingMessageHandler : Send + Sync {
fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, HandleError>;
/// Handle some updates to the route graph that we learned due to an outbound failed payment.
fn handle_htlc_fail_channel_update(&self, update: &HTLCFailChannelUpdate);
- ///Gets a subset of the channel announcements and updates required to dump our routing table to a remote node, starting at the short_channel_id indicated by starting_point.channelcounter and including batch_amount entries
- /// This function will start iterating at 0 if the starting_point is < 0.
- fn get_next_channel_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec<(ChannelAnnouncement, ChannelUpdate,ChannelUpdate)>);
- ///Gets a subset of the node announcements required to dump our routing table to a remote node, starting at the PublicKey indicated by starting_point.nodecounter and including batch_amount entries
- /// This function will start iterating at 0 if the starting_point is < 0.
- fn get_next_node_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec<NodeAnnouncement>);
+ /// Gets a subset of the channel announcements and updates required to dump our routing table
+ /// to a remote node, starting at the short_channel_id indicated by starting_point and
+ /// including batch_amount entries.
+ fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, ChannelUpdate, ChannelUpdate)>;
+ /// Gets a subset of the node announcements required to dump our routing table to a remote node,
+ /// starting at the node *after* the provided publickey and including batch_amount entries.
+ /// If None is provided for starting_point, we start at the first node.
+ fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
}
pub(crate) struct OnionRealm0HopData {
}
}
+enum InitSyncTracker{
+ NoSyncRequested,
+ ChannelsSyncing(u64),
+ NodesSyncing(PublicKey),
+}
+
struct Peer {
channel_encryptor: PeerChannelEncryptor,
outbound: bool,
pending_read_buffer: Vec<u8>,
pending_read_buffer_pos: usize,
pending_read_is_header: bool,
- sync_status : msgs::InitSyncTracker,
+
+ sync_status: InitSyncTracker,
}
impl Peer {
- pub fn require_sync(&self)->bool{
- if let msgs::InitSyncTracker::Sync(i) = self.sync_status {i} else {false}
- }
-
- /// this function checks if the the channel announcements and updates are allowed to be forwarded to a specific peer.
- /// If the peer is in syncing state and the channel_id has not been synced then the function returns false as this info will forward at a later stage and
- /// we dont want to send duplicate messages. If the channel was already synced then we can forward those messages and the function will then return true.
- pub fn is_channel_allowed_to_forward(&self, channel_id : u64)->bool{
+ /// Returns true if the the channel announcements/updates for the given channel should be
+ /// forwarded to this peer.
+ /// If we are sending our routing table to this peer and we have not yet sent channel
+ /// announcements/updates for the given channel_id then we will send it when we get to that
+ /// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
+ /// sent the old versions, we should send the update, and so return true here.
+ fn should_forward_channel(&self, channel_id: u64)->bool{
match self.sync_status {
- msgs::InitSyncTracker::Sync(i) => !i,
- msgs::InitSyncTracker::NodeCounter(_i) => false,
- msgs::InitSyncTracker::ChannelCounter(i) => (i < channel_id),
+ InitSyncTracker::NoSyncRequested => true,
+ InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
+ InitSyncTracker::NodesSyncing(_) => true,
}
}
}
pending_read_buffer: pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,
- sync_status : msgs::InitSyncTracker::Sync(false),
+
+ sync_status: InitSyncTracker::NoSyncRequested,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
pending_read_buffer: pending_read_buffer,
pending_read_buffer_pos: 0,
pending_read_is_header: false,
- sync_status : msgs::InitSyncTracker::Sync(false),
+
+ sync_status: InitSyncTracker::NoSyncRequested,
}).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
macro_rules! encode_and_send_msg {
($msg: expr, $msg_code: expr) => {
{
- log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
+ log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
}
}
}
+ const MSG_BUFF_SIZE: usize = 10;
while !peer.awaiting_write_event {
- if {
- let should_be_reading = peer.pending_outbound_buffer.len() < 10;
- if (peer.require_sync()) &&(should_be_reading){
- match peer.sync_status{
- msgs::InitSyncTracker::ChannelCounter(_c) => {
- let all_messages_tuple = self.message_handler.route_handler.get_next_channel_announcements(&mut peer.sync_status,(10-peer.pending_outbound_buffer.len()) as u8);
- for tuple in all_messages_tuple.iter(){
- encode_and_send_msg!(tuple.0, 256);
- encode_and_send_msg!(tuple.1, 258);
- encode_and_send_msg!(tuple.2, 258);
+ if peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE {
+ match peer.sync_status {
+ InitSyncTracker::NoSyncRequested => {},
+ InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
+ let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
+ let all_messages = self.message_handler.route_handler.get_next_channel_announcements(0, steps);
+ for &(ref announce, ref update_a, ref update_b) in all_messages.iter() {
+ encode_and_send_msg!(announce, 256);
+ encode_and_send_msg!(update_a, 258);
+ encode_and_send_msg!(update_b, 258);
+ peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
}
- },
- _=>{let all_messages = self.message_handler.route_handler.get_next_node_announcements(&mut peer.sync_status,(10-peer.pending_outbound_buffer.len()) as u8);
- for message in all_messages.iter(){
- encode_and_send_msg!(message, 256);
- }},
- };
+ if all_messages.is_empty() || all_messages.len() != steps as usize {
+ peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
+ }
+ },
+ InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
+ let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
+ let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
+ for msg in all_messages.iter() {
+ encode_and_send_msg!(msg, 256);
+ peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
+ }
+ if all_messages.is_empty() || all_messages.len() != steps as usize {
+ peer.sync_status = InitSyncTracker::NoSyncRequested;
+ }
+ },
+ InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
+ InitSyncTracker::NodesSyncing(key) => {
+ let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
+ let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
+ for msg in all_messages.iter() {
+ encode_and_send_msg!(msg, 256);
+ peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
+ }
+ if all_messages.is_empty() || all_messages.len() != steps as usize {
+ peer.sync_status = InitSyncTracker::NoSyncRequested;
+ }
+ },
}
+ }
+
+ if {
let next_buff = match peer.pending_outbound_buffer.front() {
None => return,
Some(buff) => buff,
};
- let data_sent = descriptor.send_data(&next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
+ let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE;
+ let data_sent = descriptor.send_data(next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
peer.pending_outbound_buffer_first_msg_offset += data_sent;
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
} {
if msg.global_features.supports_unknown_bits() { "present" } else { "none" });
if msg.local_features.initial_routing_sync() {
- peer.sync_status = msgs::InitSyncTracker::Sync(true);
+ peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
peers.peers_needing_send.insert(peer_descriptor.clone());
}
peer.their_global_features = Some(msg.global_features);
let encoded_update_msg = encode_msg!(update_msg, 258);
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
- if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||!peer.is_channel_allowed_to_forward(msg.contents.short_channel_id) {
+ if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||
+ !peer.should_forward_channel(msg.contents.short_channel_id) {
continue
}
match peer.their_node_id {
let encoded_msg = encode_msg!(msg, 258);
for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
- if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() || !peer.is_channel_allowed_to_forward(msg.contents.short_channel_id) {
+ if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||
+ !peer.should_forward_channel(msg.contents.short_channel_id) {
continue
}
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
use chain::chaininterface::{ChainError, ChainWatchInterface};
use ln::channelmanager;
-use ln::msgs::{DecodeError,ErrorAction,HandleError,RoutingMessageHandler,NetAddress,GlobalFeatures, InitSyncTracker};
+use ln::msgs::{DecodeError,ErrorAction,HandleError,RoutingMessageHandler,NetAddress,GlobalFeatures};
use ln::msgs;
use util::ser::{Writeable, Readable};
use util::logger::Logger;
use std::cmp;
use std::sync::{RwLock,Arc};
-use std::collections::{HashMap,BinaryHeap, BTreeMap};
+use std::collections::{HashMap,BinaryHeap,BTreeMap};
use std::collections::btree_map::Entry as BtreeEntry;
use std;
htlc_minimum_msat: u64,
fee_base_msat: u32,
fee_proportional_millionths: u32,
- last_update_message : Option<msgs::ChannelUpdate>,
+ last_update_message: Option<msgs::ChannelUpdate>,
}
impl std::fmt::Display for DirectionalChannelInfo {
two_to_one: DirectionalChannelInfo,
//this is cached here so we can send out it later if required by route_init_sync
//keep an eye on this to see if the extra memory is a problem
- announcement_message : Option<msgs::ChannelAnnouncement>,
+ announcement_message: Option<msgs::ChannelAnnouncement>,
}
impl std::fmt::Display for ChannelInfo {
}
}
-#[derive(PartialEq)]
struct NodeInfo {
#[cfg(feature = "non_bitcoin_chain_hash_routing")]
channels: Vec<(u64, Sha256dHash)>,
addresses: Vec<NetAddress>,
//this is cached here so we can send out it later if required by route_init_sync
//keep an eye on this to see if the extra memory is a problem
- announcement_message : Option<msgs::NodeAnnouncement>,
+ announcement_message: Option<msgs::NodeAnnouncement>,
}
impl std::fmt::Display for NodeInfo {
node.rgb = msg.contents.rgb;
node.alias = msg.contents.alias;
node.addresses = msg.contents.addresses.clone();
- node.announcement_message = if msg.contents.excess_data.is_empty(){
- Some(msg.clone())
- }
- else{
- None
- };
- Ok(msg.contents.excess_data.is_empty() && msg.contents.excess_address_data.is_empty() && !msg.contents.features.supports_unknown_bits())
+ let should_relay = msg.contents.excess_data.is_empty() && msg.contents.excess_address_data.is_empty() && !msg.contents.features.supports_unknown_bits();
+ node.announcement_message = if should_relay { Some(msg.clone()) } else { None };
+ Ok(should_relay)
}
}
}
let mut network_lock = self.network_map.write().unwrap();
let network = network_lock.borrow_parts();
+ let should_relay = msg.contents.excess_data.is_empty() && !msg.contents.features.supports_unknown_bits();
+
let chan_info = ChannelInfo {
features: msg.contents.features.clone(),
one_to_two: DirectionalChannelInfo {
htlc_minimum_msat: u64::max_value(),
fee_base_msat: u32::max_value(),
fee_proportional_millionths: u32::max_value(),
- last_update_message : None,
+ last_update_message: None,
},
two_to_one: DirectionalChannelInfo {
src_node_id: msg.contents.node_id_2.clone(),
htlc_minimum_msat: u64::max_value(),
fee_base_msat: u32::max_value(),
fee_proportional_millionths: u32::max_value(),
- last_update_message : None,
- },
- announcement_message : if msg.contents.excess_data.is_empty(){
- Some(msg.clone())
- }
- else{
- None
+ last_update_message: None,
},
+ announcement_message: if should_relay { Some(msg.clone()) } else { None },
};
match network.channels.entry(NetworkMap::get_key(msg.contents.short_channel_id, msg.contents.chain_hash)) {
add_channel_to_node!(msg.contents.node_id_1);
add_channel_to_node!(msg.contents.node_id_2);
- Ok(msg.contents.excess_data.is_empty() && !msg.contents.features.supports_unknown_bits())
+ Ok(should_relay)
}
fn handle_htlc_fail_channel_update(&self, update: &msgs::HTLCFailChannelUpdate) {
$target.htlc_minimum_msat = msg.contents.htlc_minimum_msat;
$target.fee_base_msat = msg.contents.fee_base_msat;
$target.fee_proportional_millionths = msg.contents.fee_proportional_millionths;
- $target.last_update_message = if msg.contents.excess_data.is_empty(){
+ $target.last_update_message = if msg.contents.excess_data.is_empty() {
Some(msg.clone())
- }
- else{
- None
+ } else {
+ None
};
}
}
}
- fn get_next_channel_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec<(msgs::ChannelAnnouncement, msgs::ChannelUpdate,msgs::ChannelUpdate)>){
- let mut result = Vec::new();
+ fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, msgs::ChannelUpdate,msgs::ChannelUpdate)> {
+ let mut result = Vec::with_capacity(batch_amount as usize);
let network = self.network_map.read().unwrap();
- let mut starting_for_next = if let InitSyncTracker::ChannelCounter(i) = *starting_point {i} else {0 as u64};
- let mut iter = network.channels.range((starting_for_next)..);
- for _x in 0..batch_amount{
- if let Some(ref value) = iter.next(){
- if value.1.announcement_message.is_some() && value.1.one_to_two.last_update_message.is_some() && value.1.two_to_one.last_update_message.is_some(){
- let channel_announcement = value.1.announcement_message.clone().unwrap();
- let channel_update1 = value.1.one_to_two.last_update_message.clone().unwrap();
- let channel_update2 = value.1.two_to_one.last_update_message.clone().unwrap();
- result.push((channel_announcement,channel_update1, channel_update2));
+ let mut iter = network.channels.range(starting_point..);
+ while result.len() < batch_amount as usize {
+ if let Some((_, ref chan)) = iter.next() {
+ if chan.announcement_message.is_some() &&
+ chan.one_to_two.last_update_message.is_some() &&
+ chan.two_to_one.last_update_message.is_some() {
+ result.push((chan.announcement_message.clone().unwrap(),
+ chan.one_to_two.last_update_message.clone().unwrap(),
+ chan.two_to_one.last_update_message.clone().unwrap()));
+ } else {
+ // TODO: We may end up sending un-announced channel_updates if we are sending
+ // initial sync data while receiving announce/updates for this channel.
}
- starting_for_next = *(value.0) ;//adjusting start value so we can pass the last used back
+ } else {
+ return result;
}
}
- *starting_point = if result.len() == 0{
- InitSyncTracker::Sync(false) //sync done so disable sync required
- } else {
- InitSyncTracker::ChannelCounter(starting_for_next)
- };
- (result)
+ result
}
- fn get_next_node_announcements(&self, starting_point: &mut InitSyncTracker, batch_amount: u8)->(Vec<msgs::NodeAnnouncement>){
- let mut result = Vec::new();
+ fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
+ let mut result = Vec::with_capacity(batch_amount as usize);
let network = self.network_map.read().unwrap();
- let mut iter = if let InitSyncTracker::NodeCounter(i) = *starting_point {network.nodes.range((i)..)} else {
- let first_item = network.nodes.iter().next();
- if let None = first_item {*starting_point = InitSyncTracker::Sync(false); return result} //for some reason we know of no nodes
- network.nodes.range(*(first_item.unwrap().0)..)};
- for _x in 0..batch_amount{
- if let Some(ref value) = iter.next(){
- if value.1.announcement_message.is_some(){
- let node_announcement = value.1.announcement_message.clone().unwrap();
- result.push(node_announcement);
+ let mut iter = if let Some(pubkey) = starting_point {
+ let iter = network.nodes.range(pubkey..);
+ iter.next();
+ iter
+ } else {
+ network.nodes.range(..)
+ };
+ while result.len() < batch_amount as usize {
+ if let Some((_, ref node)) = iter.next() {
+ if node.announcement_message.is_some() {
+ result.push(node.announcement_message.clone().unwrap());
}
- *starting_point = InitSyncTracker::NodeCounter(*(value.0)) ;//adjusting start value so we can pass the last used back
+ } else {
+ return result;
}
}
- if result.len() == 0{
- *starting_point = InitSyncTracker::ChannelCounter(0) //node syncing done, move on towards channels
- };
result
}
}
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
- announcement_message : None,
+ announcement_message: None,
});
network.channels.insert(NetworkMap::get_key(1, zero_hash.clone()), ChannelInfo {
features: GlobalFeatures::new(),
fee_proportional_millionths: 0,
last_update_message: None,
},
- announcement_message : None,
+ announcement_message: None,
});
network.nodes.insert(node2.clone(), NodeInfo {
channels: vec!(NetworkMap::get_key(2, zero_hash.clone()), NetworkMap::get_key(4, zero_hash.clone())),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
- announcement_message : None,
+ announcement_message: None,
});
network.channels.insert(NetworkMap::get_key(2, zero_hash.clone()), ChannelInfo {
features: GlobalFeatures::new(),
fee_proportional_millionths: 0,
last_update_message: None,
},
- announcement_message : None,
+ announcement_message: None,
});
network.nodes.insert(node8.clone(), NodeInfo {
channels: vec!(NetworkMap::get_key(12, zero_hash.clone()), NetworkMap::get_key(13, zero_hash.clone())),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
- announcement_message : None,
+ announcement_message: None,
});
network.channels.insert(NetworkMap::get_key(12, zero_hash.clone()), ChannelInfo {
features: GlobalFeatures::new(),
fee_proportional_millionths: 0,
last_update_message: None,
},
- announcement_message : None,
+ announcement_message: None,
});
network.nodes.insert(node3.clone(), NodeInfo {
channels: vec!(
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
- announcement_message : None,
+ announcement_message: None,
});
network.channels.insert(NetworkMap::get_key(3, zero_hash.clone()), ChannelInfo {
features: GlobalFeatures::new(),
fee_proportional_millionths: 0,
last_update_message: None,
},
- announcement_message : None,
+ announcement_message: None,
});
network.channels.insert(NetworkMap::get_key(4, zero_hash.clone()), ChannelInfo {
features: GlobalFeatures::new(),
fee_proportional_millionths: 0,
last_update_message: None,
},
- announcement_message : None,
+ announcement_message: None,
});
network.channels.insert(NetworkMap::get_key(13, zero_hash.clone()), ChannelInfo {
features: GlobalFeatures::new(),
fee_proportional_millionths: 0,
last_update_message: None,
},
- announcement_message : None,
+ announcement_message: None,
});
network.nodes.insert(node4.clone(), NodeInfo {
channels: vec!(NetworkMap::get_key(5, zero_hash.clone()), NetworkMap::get_key(11, zero_hash.clone())),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
- announcement_message : None,
+ announcement_message: None,
});
network.channels.insert(NetworkMap::get_key(5, zero_hash.clone()), ChannelInfo {
features: GlobalFeatures::new(),
fee_proportional_millionths: 0,
last_update_message: None,
},
- announcement_message : None,
+ announcement_message: None,
});
network.nodes.insert(node5.clone(), NodeInfo {
channels: vec!(NetworkMap::get_key(6, zero_hash.clone()), NetworkMap::get_key(11, zero_hash.clone())),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
- announcement_message : None,
+ announcement_message: None,
});
network.channels.insert(NetworkMap::get_key(6, zero_hash.clone()), ChannelInfo {
features: GlobalFeatures::new(),
fee_proportional_millionths: 0,
last_update_message: None,
},
- announcement_message : None,
+ announcement_message: None,
});
network.channels.insert(NetworkMap::get_key(11, zero_hash.clone()), ChannelInfo {
features: GlobalFeatures::new(),
fee_proportional_millionths: 0,
last_update_message: None,
},
- announcement_message : None,
+ announcement_message: None,
});
network.nodes.insert(node6.clone(), NodeInfo {
channels: vec!(NetworkMap::get_key(7, zero_hash.clone())),
rgb: [0; 3],
alias: [0; 32],
addresses: Vec::new(),
- announcement_message : None,
+ announcement_message: None,
});
network.channels.insert(NetworkMap::get_key(7, zero_hash.clone()), ChannelInfo {
features: GlobalFeatures::new(),
fee_proportional_millionths: 0,
last_update_message: None,
},
- announcement_message : None,
+ announcement_message: None,
});
}