Refactor message broadcasting out into a utility method
[rust-lightning] / lightning / src / ln / peer_handler.rs
index 6e997e53b156770d237bfbe615eff0096b77c17a..0497fae158b45006cc697935fe7cab87f3f0cd6a 100644 (file)
@@ -30,25 +30,140 @@ use util::events::{MessageSendEvent, MessageSendEventsProvider};
 use util::logger::Logger;
 use routing::network_graph::NetGraphMsgHandler;
 
-use std::collections::{HashMap,hash_map,HashSet,LinkedList};
+use prelude::*;
+use alloc::collections::LinkedList;
 use std::sync::{Arc, Mutex};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::{cmp,error,hash,fmt};
-use std::ops::Deref;
+use core::sync::atomic::{AtomicUsize, Ordering};
+use core::{cmp, hash, fmt, mem};
+use core::ops::Deref;
+use std::error;
 
 use bitcoin::hashes::sha256::Hash as Sha256;
 use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
 use bitcoin::hashes::{HashEngine, Hash};
 
+/// A dummy struct which implements `RoutingMessageHandler` without storing any routing information
+/// or doing any processing. You can provide one of these as the route_handler in a MessageHandler.
+pub struct IgnoringMessageHandler{}
+impl MessageSendEventsProvider for IgnoringMessageHandler {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> { Vec::new() }
+}
+impl RoutingMessageHandler for IgnoringMessageHandler {
+       fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
+       fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
+       fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
+       fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {}
+       fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) ->
+               Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { Vec::new() }
+       fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> { Vec::new() }
+       fn sync_routing_table(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
+       fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
+       fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
+       fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
+       fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
+}
+impl Deref for IgnoringMessageHandler {
+       type Target = IgnoringMessageHandler;
+       fn deref(&self) -> &Self { self }
+}
+
+/// A dummy struct which implements `ChannelMessageHandler` without having any channels.
+/// You can provide one of these as the route_handler in a MessageHandler.
+pub struct ErroringMessageHandler {
+       message_queue: Mutex<Vec<MessageSendEvent>>
+}
+impl ErroringMessageHandler {
+       /// Constructs a new ErroringMessageHandler
+       pub fn new() -> Self {
+               Self { message_queue: Mutex::new(Vec::new()) }
+       }
+       fn push_error(&self, node_id: &PublicKey, channel_id: [u8; 32]) {
+               self.message_queue.lock().unwrap().push(MessageSendEvent::HandleError {
+                       action: msgs::ErrorAction::SendErrorMessage {
+                               msg: msgs::ErrorMessage { channel_id, data: "We do not support channel messages, sorry.".to_owned() },
+                       },
+                       node_id: node_id.clone(),
+               });
+       }
+}
+impl MessageSendEventsProvider for ErroringMessageHandler {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+               let mut res = Vec::new();
+               mem::swap(&mut res, &mut self.message_queue.lock().unwrap());
+               res
+       }
+}
+impl ChannelMessageHandler for ErroringMessageHandler {
+       // Any messages which are related to a specific channel generate an error message to let the
+       // peer know we don't care about channels.
+       fn handle_open_channel(&self, their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::OpenChannel) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
+       }
+       fn handle_accept_channel(&self, their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::AcceptChannel) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
+       }
+       fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
+       }
+       fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_shutdown(&self, their_node_id: &PublicKey, _their_features: &InitFeatures, msg: &msgs::Shutdown) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+       // msgs::ChannelUpdate does not contain the channel_id field, so we just drop them.
+       fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {}
+       fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
+       fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {}
+       fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {}
+}
+impl Deref for ErroringMessageHandler {
+       type Target = ErroringMessageHandler;
+       fn deref(&self) -> &Self { self }
+}
+
 /// Provides references to trait impls which handle different types of messages.
 pub struct MessageHandler<CM: Deref, RM: Deref> where
                CM::Target: ChannelMessageHandler,
                RM::Target: RoutingMessageHandler {
        /// A message handler which handles messages specific to channels. Usually this is just a
-       /// ChannelManager object.
+       /// ChannelManager object or a ErroringMessageHandler.
        pub chan_handler: CM,
        /// A message handler which handles messages updating our knowledge of the network channel
-       /// graph. Usually this is just a NetGraphMsgHandlerMonitor object.
+       /// graph. Usually this is just a NetGraphMsgHandlerMonitor object or an IgnoringMessageHandler.
        pub route_handler: RM,
 }
 
@@ -88,10 +203,9 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
 }
 
 /// Error for PeerManager errors. If you get one of these, you must disconnect the socket and
-/// generate no further read_event/write_buffer_space_avail calls for the descriptor, only
-/// triggering a single socket_disconnected call (unless it was provided in response to a
-/// new_*_connection event, in which case no such socket_disconnected() must be called and the
-/// socket silently disconencted).
+/// generate no further read_event/write_buffer_space_avail/socket_disconnected calls for the
+/// descriptor.
+#[derive(Clone)]
 pub struct PeerHandleError {
        /// Used to indicate that we probably can't make any future connections to this peer, implying
        /// we should go ahead and force-close any channels we have with it.
@@ -121,7 +235,6 @@ enum InitSyncTracker{
 
 struct Peer {
        channel_encryptor: PeerChannelEncryptor,
-       outbound: bool,
        their_node_id: Option<PublicKey>,
        their_features: Option<InitFeatures>,
 
@@ -183,7 +296,7 @@ fn _check_usize_is_32_or_64() {
 /// lifetimes). Other times you can afford a reference, which is more efficient, in which case
 /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
 /// issues such as overly long function definitions.
-pub type SimpleArcPeerManager<SD, M, T, F, C, L> = Arc<PeerManager<SD, SimpleArcChannelManager<M, T, F, L>, Arc<NetGraphMsgHandler<Arc<C>, Arc<L>>>, Arc<L>>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<NetGraphMsgHandler<Arc<C>, Arc<L>>>, Arc<L>>;
 
 /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
 /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
@@ -243,6 +356,44 @@ macro_rules! encode_msg {
        }}
 }
 
+impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor, CM, IgnoringMessageHandler, L> where
+               CM::Target: ChannelMessageHandler,
+               L::Target: Logger {
+       /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message
+       /// handler is used and network graph messages are ignored.
+       ///
+       /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
+       /// cryptographically secure random bytes.
+       ///
+       /// (C-not exported) as we can't export a PeerManager with a dummy route handler
+       pub fn new_channel_only(channel_message_handler: CM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
+               Self::new(MessageHandler {
+                       chan_handler: channel_message_handler,
+                       route_handler: IgnoringMessageHandler{},
+               }, our_node_secret, ephemeral_random_data, logger)
+       }
+}
+
+impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor, ErroringMessageHandler, RM, L> where
+               RM::Target: RoutingMessageHandler,
+               L::Target: Logger {
+       /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message
+       /// handler is used and messages related to channels will be ignored (or generate error
+       /// messages). Note that some other lightning implementations time-out connections after some
+       /// time if no channel is built with the peer.
+       ///
+       /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
+       /// cryptographically secure random bytes.
+       ///
+       /// (C-not exported) as we can't export a PeerManager with a dummy channel handler
+       pub fn new_routing_only(routing_message_handler: RM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
+               Self::new(MessageHandler {
+                       chan_handler: ErroringMessageHandler::new(),
+                       route_handler: routing_message_handler,
+               }, our_node_secret, ephemeral_random_data, logger)
+       }
+}
+
 /// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds.
 /// PeerIds may repeat, but only after socket_disconnected() has been called.
 impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<Descriptor, CM, RM, L> where
@@ -315,7 +466,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                let mut peers = self.peers.lock().unwrap();
                if peers.peers.insert(descriptor, Peer {
                        channel_encryptor: peer_encryptor,
-                       outbound: true,
                        their_node_id: None,
                        their_features: None,
 
@@ -352,7 +502,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                let mut peers = self.peers.lock().unwrap();
                if peers.peers.insert(descriptor, Peer {
                        channel_encryptor: peer_encryptor,
-                       outbound: false,
                        their_node_id: None,
                        their_features: None,
 
@@ -584,11 +733,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
 
                                                                        peer.their_node_id = Some(their_node_id);
                                                                        insert_node_id!();
-                                                                       let mut features = InitFeatures::known();
-                                                                       if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
-                                                                               features.clear_initial_routing_sync();
-                                                                       }
-
+                                                                       let features = InitFeatures::known();
                                                                        let resp = msgs::Init { features };
                                                                        self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
                                                                },
@@ -598,6 +743,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                        peer.pending_read_is_header = true;
                                                                        peer.their_node_id = Some(their_node_id);
                                                                        insert_node_id!();
+                                                                       let features = InitFeatures::known();
+                                                                       let resp = msgs::Init { features };
+                                                                       self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
                                                                },
                                                                NextNoiseStep::NoiseComplete => {
                                                                        if peer.pending_read_is_header {
@@ -637,6 +785,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                                                        }
                                                                                                        msgs::DecodeError::BadLengthDescriptor => return Err(PeerHandleError { no_connection_possible: false }),
                                                                                                        msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }),
+                                                                                                       msgs::DecodeError::UnsupportedCompression => {
+                                                                                                               log_debug!(self.logger, "We don't support zlib-compressed message fields, ignoring message");
+                                                                                                               continue;
+                                                                                                       }
                                                                                                }
                                                                                        }
                                                                                };
@@ -655,8 +807,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                }
                                        }
 
-                                       self.do_attempt_write_data(peer_descriptor, peer);
-
                                        peer.pending_outbound_buffer.len() > 10 // pause_read
                                }
                        };
@@ -682,11 +832,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                        // Setup and Control messages:
                        wire::Message::Init(msg) => {
                                if msg.features.requires_unknown_bits() {
-                                       log_info!(self.logger, "Peer global features required unknown version bits");
-                                       return Err(PeerHandleError{ no_connection_possible: true }.into());
-                               }
-                               if msg.features.requires_unknown_bits() {
-                                       log_info!(self.logger, "Peer local features required unknown version bits");
+                                       log_info!(self.logger, "Peer features required unknown version bits");
                                        return Err(PeerHandleError{ no_connection_possible: true }.into());
                                }
                                if peer.their_features.is_some() {
@@ -694,10 +840,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                }
 
                                log_info!(
-                                       self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unknown flags (local and global): {}",
+                                       self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, gossip_queries: {}, static_remote_key: {}, unknown flags (local and global): {}",
                                        if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
                                        if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
                                        if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
+                                       if msg.features.supports_gossip_queries() { "supported" } else { "not supported" },
                                        if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
                                        if msg.features.supports_unknown_bits() { "present" } else { "none" }
                                );
@@ -711,15 +858,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                        return Err(PeerHandleError{ no_connection_possible: true }.into());
                                }
 
-                               if !peer.outbound {
-                                       let mut features = InitFeatures::known();
-                                       if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
-                                               features.clear_initial_routing_sync();
-                                       }
-
-                                       let resp = msgs::Init { features };
-                                       self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
-                               }
+                               self.message_handler.route_handler.sync_routing_table(&peer.their_node_id.unwrap(), &msg);
 
                                self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
                                peer.their_features = Some(msg.features);
@@ -773,7 +912,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                        },
 
                        wire::Message::Shutdown(msg) => {
-                               self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg);
+                               self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), peer.their_features.as_ref().unwrap(), &msg);
                        },
                        wire::Message::ClosingSigned(msg) => {
                                self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg);
@@ -831,6 +970,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                }
                        },
                        wire::Message::ChannelUpdate(msg) => {
+                               self.message_handler.chan_handler.handle_channel_update(&peer.their_node_id.unwrap(), &msg);
                                let should_forward = match self.message_handler.route_handler.handle_channel_update(&msg) {
                                        Ok(v) => v,
                                        Err(e) => { return Err(e.into()); },
@@ -840,6 +980,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                        // TODO: forward msg along to all our other peers!
                                }
                        },
+                       wire::Message::QueryShortChannelIds(msg) => {
+                               self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?;
+                       },
+                       wire::Message::ReplyShortChannelIdsEnd(msg) => {
+                               self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?;
+                       },
+                       wire::Message::QueryChannelRange(msg) => {
+                               self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?;
+                       },
+                       wire::Message::ReplyChannelRange(msg) => {
+                               self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
+                       },
+                       wire::Message::GossipTimestampFilter(_msg) => {
+                               // TODO: handle message
+                       },
 
                        // Unknown messages:
                        wire::Message::Unknown(msg_type) if msg_type.is_even() => {
@@ -854,6 +1009,64 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                Ok(())
        }
 
+       fn forward_broadcast_msg(&self, peers: &mut PeerHolder<Descriptor>, msg: &wire::Message, except_node: Option<&PublicKey>) {
+               match msg {
+                       wire::Message::ChannelAnnouncement(ref msg) => {
+                               let encoded_msg = encode_msg!(msg);
+
+                               for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
+                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                                       !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
+                                               continue
+                                       }
+                                       if peer.their_node_id.as_ref() == Some(&msg.contents.node_id_1) ||
+                                          peer.their_node_id.as_ref() == Some(&msg.contents.node_id_2) {
+                                               continue;
+                                       }
+                                       if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
+                                               continue;
+                                       }
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                                       peers.peers_needing_send.insert((*descriptor).clone());
+                               }
+                       },
+                       wire::Message::NodeAnnouncement(ref msg) => {
+                               let encoded_msg = encode_msg!(msg);
+
+                               for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
+                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                                       !peer.should_forward_node_announcement(msg.contents.node_id) {
+                                               continue
+                                       }
+                                       if peer.their_node_id.as_ref() == Some(&msg.contents.node_id) {
+                                               continue;
+                                       }
+                                       if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
+                                               continue;
+                                       }
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                                       peers.peers_needing_send.insert((*descriptor).clone());
+                               }
+                       },
+                       wire::Message::ChannelUpdate(ref msg) => {
+                               let encoded_msg = encode_msg!(msg);
+
+                               for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
+                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                                       !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
+                                               continue
+                                       }
+                                       if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
+                                               continue;
+                                       }
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                                       peers.peers_needing_send.insert((*descriptor).clone());
+                               }
+                       },
+                       _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
+               }
+       }
+
        /// Checks for any events generated by our handlers and processes them. Includes sending most
        /// response messages as well as messages generated by calls to handler functions directly (eg
        /// functions like ChannelManager::process_pending_htlc_forward or send_payment).
@@ -863,24 +1076,23 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                        // buffer by doing things like announcing channels on another node. We should be willing to
                        // drop optional-ish messages when send buffers get full!
 
-                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
                        let mut peers_lock = self.peers.lock().unwrap();
+                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+                       events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
                        let peers = &mut *peers_lock;
                        for event in events_generated.drain(..) {
                                macro_rules! get_peer_for_forwarding {
-                                       ($node_id: expr, $handle_no_such_peer: block) => {
+                                       ($node_id: expr) => {
                                                {
                                                        let descriptor = match peers.node_id_to_descriptor.get($node_id) {
                                                                Some(descriptor) => descriptor.clone(),
                                                                None => {
-                                                                       $handle_no_such_peer;
                                                                        continue;
                                                                },
                                                        };
                                                        match peers.peers.get_mut(&descriptor) {
                                                                Some(peer) => {
                                                                        if peer.their_features.is_none() {
-                                                                               $handle_no_such_peer;
                                                                                continue;
                                                                        }
                                                                        (descriptor, peer)
@@ -895,9 +1107,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -905,9 +1115,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -916,10 +1124,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id),
                                                                log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
-                                                               //they should just throw away this funding transaction
-                                                       });
+                                               // TODO: If the peer is gone we should generate a DiscardFunding event
+                                               // indicating to the wallet that they should just throw away this funding transaction
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -927,10 +1134,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
-                                                               //they should just throw away this funding transaction
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -938,9 +1142,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -948,10 +1150,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
-                                                               //they should just throw away this funding transaction
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -962,9 +1161,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                update_fulfill_htlcs.len(),
                                                                update_fail_htlcs.len(),
                                                                log_bytes!(commitment_signed.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                for msg in update_add_htlcs {
                                                        peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                }
@@ -987,9 +1184,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -997,9 +1192,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1007,9 +1200,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1017,65 +1208,27 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
-                                       MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
+                                       MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
                                                log_trace!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
-                                               if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg);
-                                                       let encoded_update_msg = encode_msg!(update_msg);
-
-                                                       for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
-                                                                       continue
-                                                               }
-                                                               match peer.their_node_id {
-                                                                       None => continue,
-                                                                       Some(their_node_id) => {
-                                                                               if their_node_id == msg.contents.node_id_1 || their_node_id == msg.contents.node_id_2 {
-                                                                                       continue
-                                                                               }
-                                                                       }
-                                                               }
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
-                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
-                                                       }
+                                               if self.message_handler.route_handler.handle_channel_announcement(&msg).is_ok() && self.message_handler.route_handler.handle_channel_update(&update_msg).is_ok() {
+                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None);
+                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None);
                                                }
                                        },
-                                       MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
+                                       MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
                                                log_trace!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler");
-                                               if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg);
-
-                                                       for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_node_announcement(msg.contents.node_id) {
-                                                                       continue
-                                                               }
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
-                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
-                                                       }
+                                               if self.message_handler.route_handler.handle_node_announcement(&msg).is_ok() {
+                                                       self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None);
                                                }
                                        },
-                                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
+                                       MessageSendEvent::BroadcastChannelUpdate { msg } => {
                                                log_trace!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
-                                               if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg);
-
-                                                       for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
-                                                                       continue
-                                                               }
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
-                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
-                                                       }
+                                               if self.message_handler.route_handler.handle_channel_update(&msg).is_ok() {
+                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None);
                                                }
                                        },
                                        MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => {
@@ -1108,13 +1261,32 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
                                                                                log_pubkey!(node_id),
                                                                                msg.data);
-                                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                                       //TODO: Do whatever we're gonna do for handling dropped messages
-                                                               });
+                                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                                self.do_attempt_write_data(&mut descriptor, peer);
                                                        },
                                                }
+                                       },
+                                       MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+                                               self.do_attempt_write_data(&mut descriptor, peer);
+                                       },
+                                       MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+                                               self.do_attempt_write_data(&mut descriptor, peer);
+                                       }
+                                       MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
+                                               log_trace!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
+                                                       log_pubkey!(node_id),
+                                                       msg.short_channel_ids.len(),
+                                                       msg.first_blocknum,
+                                                       msg.number_of_blocks,
+                                                       msg.sync_complete);
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+                                               self.do_attempt_write_data(&mut descriptor, peer);
                                        }
                                }
                        }
@@ -1158,11 +1330,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                };
        }
 
+       /// Disconnect a peer given its node id.
+       ///
+       /// Set no_connection_possible to true to prevent any further connection with this peer,
+       /// force-closing any channels we have with it.
+       ///
+       /// If a peer is connected, this will call `disconnect_socket` on the descriptor for the peer,
+       /// so be careful about reentrancy issues.
+       pub fn disconnect_by_node_id(&self, node_id: PublicKey, no_connection_possible: bool) {
+               let mut peers_lock = self.peers.lock().unwrap();
+               if let Some(mut descriptor) = peers_lock.node_id_to_descriptor.remove(&node_id) {
+                       log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id);
+                       peers_lock.peers.remove(&descriptor);
+                       peers_lock.peers_needing_send.remove(&descriptor);
+                       self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
+                       descriptor.disconnect_socket();
+               }
+       }
+
        /// This function should be called roughly once every 30 seconds.
        /// It will send pings to each peer and disconnect those which did not respond to the last round of pings.
 
        /// Will most likely call send_data on all of the registered descriptors, thus, be very careful with reentrancy issues!
-       pub fn timer_tick_occured(&self) {
+       pub fn timer_tick_occurred(&self) {
                let mut peers_lock = self.peers.lock().unwrap();
                {
                        let peers = &mut *peers_lock;
@@ -1225,9 +1415,9 @@ mod tests {
        use bitcoin::secp256k1::Secp256k1;
        use bitcoin::secp256k1::key::{SecretKey, PublicKey};
 
-       use std;
+       use prelude::*;
        use std::sync::{Arc, Mutex};
-       use std::sync::atomic::Ordering;
+       use core::sync::atomic::Ordering;
 
        #[derive(Clone)]
        struct FileDescriptor {
@@ -1240,8 +1430,8 @@ mod tests {
                }
        }
        impl Eq for FileDescriptor { }
-       impl std::hash::Hash for FileDescriptor {
-               fn hash<H: std::hash::Hasher>(&self, hasher: &mut H) {
+       impl core::hash::Hash for FileDescriptor {
+               fn hash<H: core::hash::Hasher>(&self, hasher: &mut H) {
                        self.fd.hash(hasher)
                }
        }
@@ -1297,14 +1487,9 @@ mod tests {
                let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone()).unwrap();
                peer_a.new_inbound_connection(fd_a.clone()).unwrap();
                assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
+               peer_a.process_events();
                assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
-               assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
-               (fd_a.clone(), fd_b.clone())
-       }
-
-       fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
-               let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b);
-               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               peer_b.process_events();
                assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
                (fd_a.clone(), fd_b.clone())
        }
@@ -1342,11 +1527,13 @@ mod tests {
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
 
                // peers[0] awaiting_pong is set to true, but the Peer is still connected
-               peers[0].timer_tick_occured();
+               peers[0].timer_tick_occurred();
+               peers[0].process_events();
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
 
-               // Since timer_tick_occured() is called again when awaiting_pong is true, all Peers are disconnected
-               peers[0].timer_tick_occured();
+               // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected
+               peers[0].timer_tick_occurred();
+               peers[0].process_events();
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
        }
 
@@ -1367,7 +1554,9 @@ mod tests {
                let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
 
                // Make each peer to read the messages that the other peer just wrote to them.
+               peers[0].process_events();
                peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
+               peers[1].process_events();
                peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
 
                // Check that each peer has received the expected number of channel updates and channel
@@ -1377,41 +1566,4 @@ mod tests {
                assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
                assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
        }
-
-       #[test]
-       fn limit_initial_routing_sync_requests() {
-               // Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not.
-               {
-                       let cfgs = create_peermgr_cfgs(2);
-                       cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
-                       let peers = create_network(2, &cfgs);
-                       let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
-
-                       let peer_0 = peers[0].peers.lock().unwrap();
-                       let peer_1 = peers[1].peers.lock().unwrap();
-
-                       let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
-                       let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
-
-                       assert!(peer_0_features.unwrap().initial_routing_sync());
-                       assert!(!peer_1_features.unwrap().initial_routing_sync());
-               }
-
-               // Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not.
-               {
-                       let cfgs = create_peermgr_cfgs(2);
-                       cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
-                       let peers = create_network(2, &cfgs);
-                       let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
-
-                       let peer_0 = peers[0].peers.lock().unwrap();
-                       let peer_1 = peers[1].peers.lock().unwrap();
-
-                       let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
-                       let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
-
-                       assert!(!peer_0_features.unwrap().initial_routing_sync());
-                       assert!(peer_1_features.unwrap().initial_routing_sync());
-               }
-       }
 }