Handle-initial_routing_sync-requests-from-peers-in-their-Init-messages 2018-11-202-redux
authorSchalk van Heerden <swvheerden@gmail.com>
Tue, 2 Oct 2018 14:02:17 +0000 (16:02 +0200)
committerMatt Corallo <git@bluematt.me>
Fri, 9 Nov 2018 03:23:13 +0000 (13:53 +1030)
src/ln/msgs.rs
src/ln/peer_handler.rs
src/ln/router.rs
src/util/test_utils.rs

index 5c5d32f39d51483a1a8c0776d40527565ccfbb78..354376649c7b69ed79304ff51e872fb6d68d88df 100644 (file)
@@ -382,6 +382,7 @@ impl NetAddress {
        }
 }
 
+#[derive(Clone)]
 // Only exposed as broadcast of node_announcement should be filtered by node_id
 /// The unsigned part of a node_announcement
 pub struct UnsignedNodeAnnouncement {
@@ -398,6 +399,7 @@ pub struct UnsignedNodeAnnouncement {
        pub(crate) excess_address_data: Vec<u8>,
        pub(crate) excess_data: Vec<u8>,
 }
+#[derive(Clone)]
 /// A node_announcement message to be sent or received from a peer
 pub struct NodeAnnouncement {
        pub(crate) signature: Signature,
@@ -588,6 +590,14 @@ pub trait RoutingMessageHandler : Send + Sync {
        fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, HandleError>;
        /// Handle some updates to the route graph that we learned due to an outbound failed payment.
        fn handle_htlc_fail_channel_update(&self, update: &HTLCFailChannelUpdate);
+       /// Gets a subset of the channel announcements and updates required to dump our routing table
+       /// to a remote node, starting at the short_channel_id indicated by starting_point and
+       /// including batch_amount entries.
+       fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, ChannelUpdate, ChannelUpdate)>;
+       /// Gets a subset of the node announcements required to dump our routing table to a remote node,
+       /// starting at the node *after* the provided publickey and including batch_amount entries.
+       /// If None is provided for starting_point, we start at the first node.
+       fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
 }
 
 pub(crate) struct OnionRealm0HopData {
index 56649c462dbcf61ba14c4fe63dc5bc7949fb9746..87a2e6d1c242b64f6e9b15fb0e9de9ef7cb8f74e 100644 (file)
@@ -88,6 +88,12 @@ impl error::Error for PeerHandleError {
        }
 }
 
+enum InitSyncTracker{
+       NoSyncRequested,
+       ChannelsSyncing(u64),
+       NodesSyncing(PublicKey),
+}
+
 struct Peer {
        channel_encryptor: PeerChannelEncryptor,
        outbound: bool,
@@ -102,6 +108,24 @@ struct Peer {
        pending_read_buffer: Vec<u8>,
        pending_read_buffer_pos: usize,
        pending_read_is_header: bool,
+
+       sync_status: InitSyncTracker,
+}
+
+impl Peer {
+       /// Returns true if the the channel announcements/updates for the given channel should be
+       /// forwarded to this peer.
+       /// If we are sending our routing table to this peer and we have not yet sent channel
+       /// announcements/updates for the given channel_id then we will send it when we get to that
+       /// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
+       /// sent the old versions, we should send the update, and so return true here.
+       fn should_forward_channel(&self, channel_id: u64)->bool{
+               match self.sync_status {
+                       InitSyncTracker::NoSyncRequested => true,
+                       InitSyncTracker::ChannelsSyncing(i) => i < channel_id,
+                       InitSyncTracker::NodesSyncing(_) => true,
+               }
+       }
 }
 
 struct PeerHolder<Descriptor: SocketDescriptor> {
@@ -221,6 +245,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        pending_read_buffer: pending_read_buffer,
                        pending_read_buffer_pos: 0,
                        pending_read_is_header: false,
+
+                       sync_status: InitSyncTracker::NoSyncRequested,
                }).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -255,21 +281,74 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        pending_read_buffer: pending_read_buffer,
                        pending_read_buffer_pos: 0,
                        pending_read_is_header: false,
+
+                       sync_status: InitSyncTracker::NoSyncRequested,
                }).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
                Ok(())
        }
 
-       fn do_attempt_write_data(descriptor: &mut Descriptor, peer: &mut Peer) {
+       fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
+               macro_rules! encode_and_send_msg {
+                       ($msg: expr, $msg_code: expr) => {
+                               {
+                                       log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
+                               }
+                       }
+               }
+               const MSG_BUFF_SIZE: usize = 10;
                while !peer.awaiting_write_event {
+                       if peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE {
+                               match peer.sync_status {
+                                       InitSyncTracker::NoSyncRequested => {},
+                                       InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
+                                               let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
+                                               let all_messages = self.message_handler.route_handler.get_next_channel_announcements(0, steps);
+                                               for &(ref announce, ref update_a, ref update_b) in all_messages.iter() {
+                                                       encode_and_send_msg!(announce, 256);
+                                                       encode_and_send_msg!(update_a, 258);
+                                                       encode_and_send_msg!(update_b, 258);
+                                                       peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
+                                               }
+                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                                       peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
+                                               }
+                                       },
+                                       InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
+                                               let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
+                                               let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
+                                               for msg in all_messages.iter() {
+                                                       encode_and_send_msg!(msg, 256);
+                                                       peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
+                                               }
+                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                                       peer.sync_status = InitSyncTracker::NoSyncRequested;
+                                               }
+                                       },
+                                       InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
+                                       InitSyncTracker::NodesSyncing(key) => {
+                                               let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
+                                               let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
+                                               for msg in all_messages.iter() {
+                                                       encode_and_send_msg!(msg, 256);
+                                                       peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
+                                               }
+                                               if all_messages.is_empty() || all_messages.len() != steps as usize {
+                                                       peer.sync_status = InitSyncTracker::NoSyncRequested;
+                                               }
+                                       },
+                               }
+                       }
+
                        if {
                                let next_buff = match peer.pending_outbound_buffer.front() {
                                        None => return,
                                        Some(buff) => buff,
                                };
-                               let should_be_reading = peer.pending_outbound_buffer.len() < 10;
 
+                               let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE;
                                let data_sent = descriptor.send_data(next_buff, peer.pending_outbound_buffer_first_msg_offset, should_be_reading);
                                peer.pending_outbound_buffer_first_msg_offset += data_sent;
                                if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
@@ -297,7 +376,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        None => panic!("Descriptor for write_event is not already known to PeerManager"),
                        Some(peer) => {
                                peer.awaiting_write_event = false;
-                               Self::do_attempt_write_data(descriptor, peer);
+                               self.do_attempt_write_data(descriptor, peer);
                        }
                };
                Ok(())
@@ -522,6 +601,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                                        if msg.local_features.supports_unknown_bits() { "present" } else { "none" },
                                                                                                        if msg.global_features.supports_unknown_bits() { "present" } else { "none" });
 
+                                                                                               if msg.local_features.initial_routing_sync() {
+                                                                                                       peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
+                                                                                                       peers.peers_needing_send.insert(peer_descriptor.clone());
+                                                                                               }
                                                                                                peer.their_global_features = Some(msg.global_features);
                                                                                                peer.their_local_features = Some(msg.local_features);
 
@@ -531,6 +614,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                                                self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel);
                                                                                                                local_features.set_initial_routing_sync();
                                                                                                        }
+
                                                                                                        encode_and_send_msg!(msgs::Init {
                                                                                                                global_features: msgs::GlobalFeatures::new(),
                                                                                                                local_features,
@@ -678,7 +762,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                }
                                        }
 
-                                       Self::do_attempt_write_data(peer_descriptor, peer);
+                                       self.do_attempt_write_data(peer_descriptor, peer);
 
                                        peer.pending_outbound_buffer.len() > 10 // pause_read
                                }
@@ -735,7 +819,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
@@ -745,7 +829,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
@@ -757,7 +841,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                //they should just throw away this funding transaction
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
@@ -768,7 +852,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                //they should just throw away this funding transaction
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
@@ -778,7 +862,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
@@ -789,7 +873,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                //they should just throw away this funding transaction
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                                log_trace!(self, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
@@ -817,7 +901,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 134)));
                                                }
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
@@ -827,7 +911,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
@@ -837,7 +921,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
@@ -847,7 +931,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
@@ -857,7 +941,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
-                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                                log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
@@ -866,7 +950,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        let encoded_update_msg = encode_msg!(update_msg, 258);
 
                                                        for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() {
+                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||
+                                                                               !peer.should_forward_channel(msg.contents.short_channel_id) {
                                                                        continue
                                                                }
                                                                match peer.their_node_id {
@@ -879,7 +964,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                }
                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
-                                                               Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
+                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
                                                        }
                                                }
                                        },
@@ -889,11 +974,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        let encoded_msg = encode_msg!(msg, 258);
 
                                                        for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() {
+                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_global_features.is_none() ||
+                                                                               !peer.should_forward_channel(msg.contents.short_channel_id)  {
                                                                        continue
                                                                }
                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
-                                                               Self::do_attempt_write_data(&mut (*descriptor).clone(), peer);
+                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
                                                        }
                                                }
                                        },
@@ -914,7 +1000,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
                                                                                                // This isn't guaranteed to work, but if there is enough free
                                                                                                // room in the send buffer, put the error message there...
-                                                                                               Self::do_attempt_write_data(&mut descriptor, &mut peer);
+                                                                                               self.do_attempt_write_data(&mut descriptor, &mut peer);
                                                                                        } else {
                                                                                                log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
                                                                                        }
@@ -932,7 +1018,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                                        });
                                                                        peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
-                                                                       Self::do_attempt_write_data(&mut descriptor, peer);
+                                                                       self.do_attempt_write_data(&mut descriptor, peer);
                                                                },
                                                        }
                                                } else {
@@ -944,7 +1030,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
                        for mut descriptor in peers.peers_needing_send.drain() {
                                match peers.peers.get_mut(&descriptor) {
-                                       Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer),
+                                       Some(peer) => self.do_attempt_write_data(&mut descriptor, peer),
                                        None => panic!("Inconsistent peers set state!"),
                                }
                        }
index 9fe2cfb50012b3619d1b10ef0edcae65f0fddee9..3de73ccc059ae8159b09f856ca4508fbedf7a278 100644 (file)
@@ -20,8 +20,8 @@ use util::logger::Logger;
 
 use std::cmp;
 use std::sync::{RwLock,Arc};
-use std::collections::{HashMap,BinaryHeap};
-use std::collections::hash_map::Entry;
+use std::collections::{HashMap,BinaryHeap,BTreeMap};
+use std::collections::btree_map::Entry as BtreeEntry;
 use std;
 
 /// A hop in a route
@@ -86,6 +86,7 @@ struct DirectionalChannelInfo {
        htlc_minimum_msat: u64,
        fee_base_msat: u32,
        fee_proportional_millionths: u32,
+       last_update_message: Option<msgs::ChannelUpdate>,
 }
 
 impl std::fmt::Display for DirectionalChannelInfo {
@@ -99,6 +100,9 @@ struct ChannelInfo {
        features: GlobalFeatures,
        one_to_two: DirectionalChannelInfo,
        two_to_one: DirectionalChannelInfo,
+       //this is cached here so we can send out it later if required by route_init_sync
+       //keep an eye on this to see if the extra memory is a problem
+       announcement_message: Option<msgs::ChannelAnnouncement>,
 }
 
 impl std::fmt::Display for ChannelInfo {
@@ -122,6 +126,9 @@ struct NodeInfo {
        rgb: [u8; 3],
        alias: [u8; 32],
        addresses: Vec<NetAddress>,
+       //this is cached here so we can send out it later if required by route_init_sync
+       //keep an eye on this to see if the extra memory is a problem
+       announcement_message: Option<msgs::NodeAnnouncement>,
 }
 
 impl std::fmt::Display for NodeInfo {
@@ -133,19 +140,19 @@ impl std::fmt::Display for NodeInfo {
 
 struct NetworkMap {
        #[cfg(feature = "non_bitcoin_chain_hash_routing")]
-       channels: HashMap<(u64, Sha256dHash), ChannelInfo>,
+       channels: BTreeMap<(u64, Sha256dHash), ChannelInfo>,
        #[cfg(not(feature = "non_bitcoin_chain_hash_routing"))]
-       channels: HashMap<u64, ChannelInfo>,
+       channels: BTreeMap<u64, ChannelInfo>,
 
        our_node_id: PublicKey,
-       nodes: HashMap<PublicKey, NodeInfo>,
+       nodes: BTreeMap<PublicKey, NodeInfo>,
 }
 struct MutNetworkMap<'a> {
        #[cfg(feature = "non_bitcoin_chain_hash_routing")]
-       channels: &'a mut HashMap<(u64, Sha256dHash), ChannelInfo>,
+       channels: &'a mut BTreeMap<(u64, Sha256dHash), ChannelInfo>,
        #[cfg(not(feature = "non_bitcoin_chain_hash_routing"))]
-       channels: &'a mut HashMap<u64, ChannelInfo>,
-       nodes: &'a mut HashMap<PublicKey, NodeInfo>,
+       channels: &'a mut BTreeMap<u64, ChannelInfo>,
+       nodes: &'a mut BTreeMap<PublicKey, NodeInfo>,
 }
 impl NetworkMap {
        fn borrow_parts(&mut self) -> MutNetworkMap {
@@ -252,7 +259,10 @@ impl RoutingMessageHandler for Router {
                                node.rgb = msg.contents.rgb;
                                node.alias = msg.contents.alias;
                                node.addresses = msg.contents.addresses.clone();
-                               Ok(msg.contents.excess_data.is_empty() && msg.contents.excess_address_data.is_empty() && !msg.contents.features.supports_unknown_bits())
+
+                               let should_relay = msg.contents.excess_data.is_empty() && msg.contents.excess_address_data.is_empty() && !msg.contents.features.supports_unknown_bits();
+                               node.announcement_message = if should_relay { Some(msg.clone()) } else { None };
+                               Ok(should_relay)
                        }
                }
        }
@@ -300,6 +310,8 @@ impl RoutingMessageHandler for Router {
                let mut network_lock = self.network_map.write().unwrap();
                let network = network_lock.borrow_parts();
 
+               let should_relay = msg.contents.excess_data.is_empty() && !msg.contents.features.supports_unknown_bits();
+
                let chan_info = ChannelInfo {
                                features: msg.contents.features.clone(),
                                one_to_two: DirectionalChannelInfo {
@@ -310,6 +322,7 @@ impl RoutingMessageHandler for Router {
                                        htlc_minimum_msat: u64::max_value(),
                                        fee_base_msat: u32::max_value(),
                                        fee_proportional_millionths: u32::max_value(),
+                                       last_update_message: None,
                                },
                                two_to_one: DirectionalChannelInfo {
                                        src_node_id: msg.contents.node_id_2.clone(),
@@ -319,11 +332,13 @@ impl RoutingMessageHandler for Router {
                                        htlc_minimum_msat: u64::max_value(),
                                        fee_base_msat: u32::max_value(),
                                        fee_proportional_millionths: u32::max_value(),
-                               }
+                                       last_update_message: None,
+                               },
+                               announcement_message: if should_relay { Some(msg.clone()) } else { None },
                        };
 
                match network.channels.entry(NetworkMap::get_key(msg.contents.short_channel_id, msg.contents.chain_hash)) {
-                       Entry::Occupied(mut entry) => {
+                       BtreeEntry::Occupied(mut entry) => {
                                //TODO: because asking the blockchain if short_channel_id is valid is only optional
                                //in the blockchain API, we need to handle it smartly here, though its unclear
                                //exactly how...
@@ -342,7 +357,7 @@ impl RoutingMessageHandler for Router {
                                        return Err(HandleError{err: "Already have knowledge of channel", action: Some(ErrorAction::IgnoreError)})
                                }
                        },
-                       Entry::Vacant(entry) => {
+                       BtreeEntry::Vacant(entry) => {
                                entry.insert(chan_info);
                        }
                };
@@ -350,10 +365,10 @@ impl RoutingMessageHandler for Router {
                macro_rules! add_channel_to_node {
                        ( $node_id: expr ) => {
                                match network.nodes.entry($node_id) {
-                                       Entry::Occupied(node_entry) => {
+                                       BtreeEntry::Occupied(node_entry) => {
                                                node_entry.into_mut().channels.push(NetworkMap::get_key(msg.contents.short_channel_id, msg.contents.chain_hash));
                                        },
-                                       Entry::Vacant(node_entry) => {
+                                       BtreeEntry::Vacant(node_entry) => {
                                                node_entry.insert(NodeInfo {
                                                        channels: vec!(NetworkMap::get_key(msg.contents.short_channel_id, msg.contents.chain_hash)),
                                                        lowest_inbound_channel_fee_base_msat: u32::max_value(),
@@ -363,6 +378,7 @@ impl RoutingMessageHandler for Router {
                                                        rgb: [0; 3],
                                                        alias: [0; 32],
                                                        addresses: Vec::new(),
+                                                       announcement_message: None,
                                                });
                                        }
                                }
@@ -372,7 +388,7 @@ impl RoutingMessageHandler for Router {
                add_channel_to_node!(msg.contents.node_id_1);
                add_channel_to_node!(msg.contents.node_id_2);
 
-               Ok(msg.contents.excess_data.is_empty() && !msg.contents.features.supports_unknown_bits())
+               Ok(should_relay)
        }
 
        fn handle_htlc_fail_channel_update(&self, update: &msgs::HTLCFailChannelUpdate) {
@@ -424,9 +440,13 @@ impl RoutingMessageHandler for Router {
                                                $target.htlc_minimum_msat = msg.contents.htlc_minimum_msat;
                                                $target.fee_base_msat = msg.contents.fee_base_msat;
                                                $target.fee_proportional_millionths = msg.contents.fee_proportional_millionths;
+                                               $target.last_update_message = if msg.contents.excess_data.is_empty() {
+                                                       Some(msg.clone())
+                                               } else {
+                                                       None
+                                               };
                                        }
                                }
-
                                let msg_hash = Message::from_slice(&Sha256dHash::from_data(&msg.contents.encode()[..])[..]).unwrap();
                                if msg.contents.flags & 1 == 1 {
                                        dest_node_id = channel.one_to_two.src_node_id.clone();
@@ -471,6 +491,52 @@ impl RoutingMessageHandler for Router {
 
                Ok(msg.contents.excess_data.is_empty())
        }
+
+
+       fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, msgs::ChannelUpdate,msgs::ChannelUpdate)> {
+               let mut result = Vec::with_capacity(batch_amount as usize);
+               let network = self.network_map.read().unwrap();
+               let mut iter = network.channels.range(starting_point..);
+               while result.len() < batch_amount as usize {
+                       if let Some((_, ref chan)) = iter.next() {
+                               if chan.announcement_message.is_some() &&
+                                               chan.one_to_two.last_update_message.is_some() &&
+                                               chan.two_to_one.last_update_message.is_some() {
+                                       result.push((chan.announcement_message.clone().unwrap(),
+                                               chan.one_to_two.last_update_message.clone().unwrap(),
+                                               chan.two_to_one.last_update_message.clone().unwrap()));
+                               } else {
+                                       // TODO: We may end up sending un-announced channel_updates if we are sending
+                                       // initial sync data while receiving announce/updates for this channel.
+                               }
+                       } else {
+                               return result;
+                       }
+               }
+               result
+       }
+
+       fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
+               let mut result = Vec::with_capacity(batch_amount as usize);
+               let network = self.network_map.read().unwrap();
+               let mut iter = if let Some(pubkey) = starting_point {
+                               let mut iter = network.nodes.range((*pubkey)..);
+                               iter.next();
+                               iter
+                       } else {
+                               network.nodes.range(..)
+                       };
+               while result.len() < batch_amount as usize {
+                       if let Some((_, ref node)) = iter.next() {
+                               if node.announcement_message.is_some() {
+                                       result.push(node.announcement_message.clone().unwrap());
+                               }
+                       } else {
+                               return result;
+                       }
+               }
+               result
+       }
 }
 
 #[derive(Eq, PartialEq)]
@@ -504,7 +570,7 @@ struct DummyDirectionalChannelInfo {
 impl Router {
        /// Creates a new router with the given node_id to be used as the source for get_route()
        pub fn new(our_pubkey: PublicKey, chain_monitor: Arc<ChainWatchInterface>, logger: Arc<Logger>) -> Router {
-               let mut nodes = HashMap::new();
+               let mut nodes = BTreeMap::new();
                nodes.insert(our_pubkey.clone(), NodeInfo {
                        channels: Vec::new(),
                        lowest_inbound_channel_fee_base_msat: u32::max_value(),
@@ -514,11 +580,12 @@ impl Router {
                        rgb: [0; 3],
                        alias: [0; 32],
                        addresses: Vec::new(),
+                       announcement_message: None,
                });
                Router {
                        secp_ctx: Secp256k1::verification_only(),
                        network_map: RwLock::new(NetworkMap {
-                               channels: HashMap::new(),
+                               channels: BTreeMap::new(),
                                our_node_id: our_pubkey,
                                nodes: nodes,
                        }),
@@ -549,10 +616,10 @@ impl Router {
                unimplemented!();
        }
 
-       fn remove_channel_in_nodes(nodes: &mut HashMap<PublicKey, NodeInfo>, chan: &ChannelInfo, short_channel_id: u64) {
+       fn remove_channel_in_nodes(nodes: &mut BTreeMap<PublicKey, NodeInfo>, chan: &ChannelInfo, short_channel_id: u64) {
                macro_rules! remove_from_node {
                        ($node_id: expr) => {
-                               if let Entry::Occupied(mut entry) = nodes.entry($node_id) {
+                               if let BtreeEntry::Occupied(mut entry) = nodes.entry($node_id) {
                                        entry.get_mut().channels.retain(|chan_id| {
                                                short_channel_id != *NetworkMap::get_short_id(chan_id)
                                        });
@@ -877,6 +944,7 @@ mod tests {
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
+                               announcement_message: None,
                        });
                        network.channels.insert(NetworkMap::get_key(1, zero_hash.clone()), ChannelInfo {
                                features: GlobalFeatures::new(),
@@ -888,6 +956,7 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: u32::max_value(), // This value should be ignored
                                        fee_proportional_millionths: u32::max_value(), // This value should be ignored
+                                       last_update_message: None,
                                }, two_to_one: DirectionalChannelInfo {
                                        src_node_id: node1.clone(),
                                        last_update: 0,
@@ -896,7 +965,9 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                },
+                               announcement_message: None,
                        });
                        network.nodes.insert(node2.clone(), NodeInfo {
                                channels: vec!(NetworkMap::get_key(2, zero_hash.clone()), NetworkMap::get_key(4, zero_hash.clone())),
@@ -907,6 +978,7 @@ mod tests {
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
+                               announcement_message: None,
                        });
                        network.channels.insert(NetworkMap::get_key(2, zero_hash.clone()), ChannelInfo {
                                features: GlobalFeatures::new(),
@@ -918,6 +990,7 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: u32::max_value(), // This value should be ignored
                                        fee_proportional_millionths: u32::max_value(), // This value should be ignored
+                                       last_update_message: None,
                                }, two_to_one: DirectionalChannelInfo {
                                        src_node_id: node2.clone(),
                                        last_update: 0,
@@ -926,7 +999,9 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                },
+                               announcement_message: None,
                        });
                        network.nodes.insert(node8.clone(), NodeInfo {
                                channels: vec!(NetworkMap::get_key(12, zero_hash.clone()), NetworkMap::get_key(13, zero_hash.clone())),
@@ -937,6 +1012,7 @@ mod tests {
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
+                               announcement_message: None,
                        });
                        network.channels.insert(NetworkMap::get_key(12, zero_hash.clone()), ChannelInfo {
                                features: GlobalFeatures::new(),
@@ -948,6 +1024,7 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: u32::max_value(), // This value should be ignored
                                        fee_proportional_millionths: u32::max_value(), // This value should be ignored
+                                       last_update_message: None,
                                }, two_to_one: DirectionalChannelInfo {
                                        src_node_id: node8.clone(),
                                        last_update: 0,
@@ -956,7 +1033,9 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                },
+                               announcement_message: None,
                        });
                        network.nodes.insert(node3.clone(), NodeInfo {
                                channels: vec!(
@@ -973,6 +1052,7 @@ mod tests {
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
+                               announcement_message: None,
                        });
                        network.channels.insert(NetworkMap::get_key(3, zero_hash.clone()), ChannelInfo {
                                features: GlobalFeatures::new(),
@@ -984,6 +1064,7 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                }, two_to_one: DirectionalChannelInfo {
                                        src_node_id: node3.clone(),
                                        last_update: 0,
@@ -992,7 +1073,9 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 100,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                },
+                               announcement_message: None,
                        });
                        network.channels.insert(NetworkMap::get_key(4, zero_hash.clone()), ChannelInfo {
                                features: GlobalFeatures::new(),
@@ -1004,6 +1087,7 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 1000000,
+                                       last_update_message: None,
                                }, two_to_one: DirectionalChannelInfo {
                                        src_node_id: node3.clone(),
                                        last_update: 0,
@@ -1012,7 +1096,9 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                },
+                               announcement_message: None,
                        });
                        network.channels.insert(NetworkMap::get_key(13, zero_hash.clone()), ChannelInfo {
                                features: GlobalFeatures::new(),
@@ -1024,6 +1110,7 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 2000000,
+                                       last_update_message: None,
                                }, two_to_one: DirectionalChannelInfo {
                                        src_node_id: node3.clone(),
                                        last_update: 0,
@@ -1032,7 +1119,9 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                },
+                               announcement_message: None,
                        });
                        network.nodes.insert(node4.clone(), NodeInfo {
                                channels: vec!(NetworkMap::get_key(5, zero_hash.clone()), NetworkMap::get_key(11, zero_hash.clone())),
@@ -1043,6 +1132,7 @@ mod tests {
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
+                               announcement_message: None,
                        });
                        network.channels.insert(NetworkMap::get_key(5, zero_hash.clone()), ChannelInfo {
                                features: GlobalFeatures::new(),
@@ -1054,6 +1144,7 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 100,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                }, two_to_one: DirectionalChannelInfo {
                                        src_node_id: node4.clone(),
                                        last_update: 0,
@@ -1062,7 +1153,9 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                },
+                               announcement_message: None,
                        });
                        network.nodes.insert(node5.clone(), NodeInfo {
                                channels: vec!(NetworkMap::get_key(6, zero_hash.clone()), NetworkMap::get_key(11, zero_hash.clone())),
@@ -1073,6 +1166,7 @@ mod tests {
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
+                               announcement_message: None,
                        });
                        network.channels.insert(NetworkMap::get_key(6, zero_hash.clone()), ChannelInfo {
                                features: GlobalFeatures::new(),
@@ -1084,6 +1178,7 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                }, two_to_one: DirectionalChannelInfo {
                                        src_node_id: node5.clone(),
                                        last_update: 0,
@@ -1092,7 +1187,9 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                },
+                               announcement_message: None,
                        });
                        network.channels.insert(NetworkMap::get_key(11, zero_hash.clone()), ChannelInfo {
                                features: GlobalFeatures::new(),
@@ -1104,6 +1201,7 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                }, two_to_one: DirectionalChannelInfo {
                                        src_node_id: node4.clone(),
                                        last_update: 0,
@@ -1112,7 +1210,9 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                },
+                               announcement_message: None,
                        });
                        network.nodes.insert(node6.clone(), NodeInfo {
                                channels: vec!(NetworkMap::get_key(7, zero_hash.clone())),
@@ -1123,6 +1223,7 @@ mod tests {
                                rgb: [0; 3],
                                alias: [0; 32],
                                addresses: Vec::new(),
+                               announcement_message: None,
                        });
                        network.channels.insert(NetworkMap::get_key(7, zero_hash.clone()), ChannelInfo {
                                features: GlobalFeatures::new(),
@@ -1134,6 +1235,7 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 1000000,
+                                       last_update_message: None,
                                }, two_to_one: DirectionalChannelInfo {
                                        src_node_id: node6.clone(),
                                        last_update: 0,
@@ -1142,7 +1244,9 @@ mod tests {
                                        htlc_minimum_msat: 0,
                                        fee_base_msat: 0,
                                        fee_proportional_millionths: 0,
+                                       last_update_message: None,
                                },
+                               announcement_message: None,
                        });
                }
 
index 5da647efebd96eda15341074b022240b17c181e2..33fb63b5c3d2d061fd1d20f0478fd2e030419c07 100644 (file)
@@ -157,7 +157,6 @@ impl TestRoutingMessageHandler {
                TestRoutingMessageHandler {}
        }
 }
-
 impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
        fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, HandleError> {
                Err(HandleError { err: "", action: None })
@@ -169,6 +168,12 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
                Err(HandleError { err: "", action: None })
        }
        fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {}
+       fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, msgs::ChannelUpdate,msgs::ChannelUpdate)> {
+               Vec::new()
+       }
+       fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
+               Vec::new()
+       }
 }
 
 pub struct TestLogger {