f redo max tick interval logic, this much better matches reality
[rust-lightning] / lightning / src / ln / peer_handler.rs
index 1d164e6e684ad153e4229b016a9e5920db153430..48bad2d69e16d59c5dd3d3a1436dc94436254f2a 100644 (file)
@@ -24,7 +24,6 @@ use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
 use util::ser::{VecWriter, Writeable, Writer};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
 use ln::wire;
-use ln::wire::MessageType;
 use util::byte_utils;
 use util::events::{MessageSendEvent, MessageSendEventsProvider};
 use util::logger::Logger;
@@ -33,11 +32,11 @@ use routing::network_graph::NetGraphMsgHandler;
 use prelude::*;
 use io;
 use alloc::collections::LinkedList;
-use alloc::fmt::Debug;
 use sync::{Arc, Mutex};
 use core::sync::atomic::{AtomicUsize, Ordering};
 use core::{cmp, hash, fmt, mem};
 use core::ops::Deref;
+use core::convert::Infallible;
 #[cfg(feature = "std")] use std::error;
 
 use bitcoin::hashes::sha256::Hash as Sha256;
@@ -48,7 +47,7 @@ use bitcoin::hashes::{HashEngine, Hash};
 pub trait CustomMessageHandler: wire::CustomMessageReader {
        /// Called with the message type that was received and the buffer to be read.
        /// Can return a `MessageHandlingError` if the message could not be handled.
-       fn handle_custom_message(&self, msg: Self::CustomMessage) -> Result<(), LightningError>;
+       fn handle_custom_message(&self, msg: Self::CustomMessage, sender_node_id: &PublicKey) -> Result<(), LightningError>;
 
        /// Gets the list of pending messages which were generated by the custom message
        /// handler, clearing the list in the process. The first tuple element must
@@ -67,7 +66,6 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
        fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
        fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
-       fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {}
        fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) ->
                Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { Vec::new() }
        fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> { Vec::new() }
@@ -82,28 +80,28 @@ impl Deref for IgnoringMessageHandler {
        fn deref(&self) -> &Self { self }
 }
 
-impl wire::Type for () {
-       fn type_id(&self) -> MessageType {
-               // We should never call this for `DummyCustomType`
+// Implement Type for Infallible, note that it cannot be constructed, and thus you can never call a
+// method that takes self for it.
+impl wire::Type for Infallible {
+       fn type_id(&self) -> u16 {
                unreachable!();
        }
 }
-
-impl Writeable for () {
+impl Writeable for Infallible {
        fn write<W: Writer>(&self, _: &mut W) -> Result<(), io::Error> {
                unreachable!();
        }
 }
 
 impl wire::CustomMessageReader for IgnoringMessageHandler {
-       type CustomMessage = ();
+       type CustomMessage = Infallible;
        fn read<R: io::Read>(&self, _message_type: u16, _buffer: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
                Ok(None)
        }
 }
 
 impl CustomMessageHandler for IgnoringMessageHandler {
-       fn handle_custom_message(&self, _msg: Self::CustomMessage) -> Result<(), LightningError> {
+       fn handle_custom_message(&self, _msg: Infallible, _sender_node_id: &PublicKey) -> Result<(), LightningError> {
                // Since we always return `None` in the read the handle method should never be called.
                unreachable!();
        }
@@ -287,6 +285,10 @@ enum InitSyncTracker{
        NodesSyncing(PublicKey),
 }
 
+/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
+/// forwarding gossip messages to peers altogether.
+const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
+
 /// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
 /// we have fewer than this many messages in the outbound buffer again.
 /// We also use this as the target number of outbound gossip messages to keep in the write buffer,
@@ -294,7 +296,29 @@ enum InitSyncTracker{
 const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
 /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
 /// the peer.
-const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = 20;
+const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
+
+/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
+/// the socket receive buffer before receiving the ping.
+///
+/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
+/// including any network delays, outbound traffic, or the same for messages from other peers.
+///
+/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
+/// per connected peer to respond to a ping, as long as they send us at least one message during
+/// each tick, ensuring we aren't actually just disconnected.
+/// With a timer tick interval of five seconds, this translates to about 30 seconds per connected
+/// peer.
+///
+/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per
+/// two connected peers, assuming most LDK-running systems have at least two cores.
+const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 6;
+
+/// This is the minimum number of messages we expect a peer to be able to handle within one timer
+/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
+/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
+/// process before the next ping.
+const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
 
 struct Peer {
        channel_encryptor: PeerChannelEncryptor,
@@ -311,7 +335,9 @@ struct Peer {
 
        sync_status: InitSyncTracker,
 
-       awaiting_pong: bool,
+       msgs_sent_since_pong: usize,
+       awaiting_pong_tick_intervals: i8,
+       received_message_since_timer_tick: bool,
 }
 
 impl Peer {
@@ -471,7 +497,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                CM::Target: ChannelMessageHandler,
                RM::Target: RoutingMessageHandler,
                L::Target: Logger,
-               CMH::Target: CustomMessageHandler + wire::CustomMessageReader {
+               CMH::Target: CustomMessageHandler {
        /// Constructs a new PeerManager with the given message handlers and node_id secret key
        /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
        /// cryptographically secure random bytes.
@@ -553,7 +579,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
                        sync_status: InitSyncTracker::NoSyncRequested,
 
-                       awaiting_pong: false,
+                       msgs_sent_since_pong: 0,
+                       awaiting_pong_tick_intervals: 0,
+                       received_message_since_timer_tick: false,
                }).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -591,7 +619,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
                        sync_status: InitSyncTracker::NoSyncRequested,
 
-                       awaiting_pong: false,
+                       msgs_sent_since_pong: 0,
+                       awaiting_pong_tick_intervals: 0,
+                       received_message_since_timer_tick: false,
                }).is_some() {
                        panic!("PeerManager driver duplicated descriptors!");
                };
@@ -600,7 +630,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
        fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
                while !peer.awaiting_write_event {
-                       if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
+                       if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
                                match peer.sync_status {
                                        InitSyncTracker::NoSyncRequested => {},
                                        InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
@@ -645,6 +675,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        },
                                }
                        }
+                       if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
+                               self.maybe_send_extra_ping(peer);
+                       }
 
                        if {
                                let next_buff = match peer.pending_outbound_buffer.front() {
@@ -720,14 +753,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                }
        }
 
-       /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
-       fn enqueue_message<M: wire::Type + Writeable + Debug>(&self, peer: &mut Peer, message: &M) {
-               let mut buffer = VecWriter(Vec::new());
-               wire::write(message, &mut buffer).unwrap(); // crash if the write failed
-               let encoded_message = buffer.0;
+       /// Append a message to a peer's pending outbound/write buffer
+       fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
+               peer.msgs_sent_since_pong += 1;
+               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
+       }
 
+       /// Append a message to a peer's pending outbound/write buffer
+       fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
+               let mut buffer = VecWriter(Vec::with_capacity(2048));
+               wire::write(message, &mut buffer).unwrap(); // crash if the write failed
                log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()));
-               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
+               self.enqueue_encoded_message(peer, &buffer.0);
        }
 
        fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
@@ -785,7 +822,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                                                                },
                                                                                        }
                                                                                }
-                                                                       };
+                                                                       }
                                                                }
                                                        }
 
@@ -921,6 +958,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
        ) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
                log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap()));
+               peer.received_message_since_timer_tick = true;
 
                // Need an Init as first message
                if let wire::Message::Init(_) = message {
@@ -984,7 +1022,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                }
                        },
                        wire::Message::Pong(_msg) => {
-                               peer.awaiting_pong = false;
+                               peer.awaiting_pong_tick_intervals = 0;
+                               peer.msgs_sent_since_pong = 0;
                        },
 
                        // Channel messages:
@@ -1079,16 +1118,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                        },
 
                        // Unknown messages:
-                       wire::Message::Unknown(msg_type) if msg_type.is_even() => {
-                               log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type);
+                       wire::Message::Unknown(type_id) if message.is_even() => {
+                               log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id);
                                // Fail the channel if message is an even, unknown type as per BOLT #1.
                                return Err(PeerHandleError{ no_connection_possible: true }.into());
                        },
-                       wire::Message::Unknown(msg_type) => {
-                               log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type);
+                       wire::Message::Unknown(type_id) => {
+                               log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id);
                        },
                        wire::Message::Custom(custom) => {
-                               self.custom_message_handler.handle_custom_message(custom)?;
+                               self.custom_message_handler.handle_custom_message(custom, &peer.their_node_id.unwrap())?;
                        },
                };
                Ok(should_forward)
@@ -1105,7 +1144,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
+                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT
+                                       {
                                                log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1116,7 +1157,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
                                                continue;
                                        }
-                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                                       self.enqueue_encoded_message(peer, &encoded_msg);
                                }
                        },
                        wire::Message::NodeAnnouncement(ref msg) => {
@@ -1128,7 +1169,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_node_announcement(msg.contents.node_id) {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
+                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT
+                                       {
                                                log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1138,7 +1181,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
                                                continue;
                                        }
-                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                                       self.enqueue_encoded_message(peer, &encoded_msg);
                                }
                        },
                        wire::Message::ChannelUpdate(ref msg) => {
@@ -1150,14 +1193,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
                                                continue
                                        }
-                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
+                                       if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
+                                               || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT
+                                       {
                                                log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
                                        if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
                                                continue;
                                        }
-                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                                       self.enqueue_encoded_message(peer, &encoded_msg);
                                }
                        },
                        _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
@@ -1171,6 +1216,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// May call [`send_data`] on [`SocketDescriptor`]s. Thus, be very careful with reentrancy
        /// issues!
        ///
+       /// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
+       /// or one of the other clients provided in our language bindings.
+       ///
        /// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
        /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
        /// [`send_data`]: SocketDescriptor::send_data
@@ -1319,9 +1367,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                let peer = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                        },
-                                       MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => {
-                                               self.message_handler.route_handler.handle_htlc_fail_channel_update(update);
-                                       },
                                        MessageSendEvent::HandleError { ref node_id, ref action } => {
                                                match *action {
                                                        msgs::ErrorAction::DisconnectPeer { ref msg } => {
@@ -1433,6 +1478,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                }
        }
 
+       /// Disconnects all currently-connected peers. This is useful on platforms where there may be
+       /// an indication that TCP sockets have stalled even if we weren't around to time them out
+       /// using regular ping/pongs.
+       pub fn disconnect_all_peers(&self) {
+               let mut peers_lock = self.peers.lock().unwrap();
+               let peers = &mut *peers_lock;
+               for (mut descriptor, peer) in peers.peers.drain() {
+                       if let Some(node_id) = peer.their_node_id {
+                               log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
+                               peers.node_id_to_descriptor.remove(&node_id);
+                               self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+                       }
+                       descriptor.disconnect_socket();
+               }
+               debug_assert!(peers.node_id_to_descriptor.is_empty());
+       }
+
+       /// This is called when we're blocked on sending additional gossip messages until we receive a
+       /// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting
+       /// `awaiting_pong_tick_intervals` to a special flag value to indicate this).
+       fn maybe_send_extra_ping(&self, peer: &mut Peer) {
+               if peer.awaiting_pong_tick_intervals == 0 {
+                       peer.awaiting_pong_tick_intervals = -1;
+                       let ping = msgs::Ping {
+                               ponglen: 0,
+                               byteslen: 64,
+                       };
+                       self.enqueue_message(peer, &ping);
+               }
+       }
+
        /// Send pings to each peer and disconnect those which did not respond to the last round of
        /// pings.
        ///
@@ -1451,9 +1527,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                        let node_id_to_descriptor = &mut peers.node_id_to_descriptor;
                        let peers = &mut peers.peers;
                        let mut descriptors_needing_disconnect = Vec::new();
+                       let peer_count = peers.len();
 
                        peers.retain(|descriptor, peer| {
-                               if peer.awaiting_pong {
+                               if !peer.channel_encryptor.is_ready_for_encryption() {
+                                       // The peer needs to complete its handshake before we can exchange messages
+                                       return true;
+                               }
+
+                               if (peer.awaiting_pong_tick_intervals > 0 && !peer.received_message_since_timer_tick)
+                                       || peer.awaiting_pong_tick_intervals as u64 >
+                                               MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64
+                               {
                                        descriptors_needing_disconnect.push(descriptor.clone());
                                        match peer.their_node_id {
                                                Some(node_id) => {
@@ -1470,21 +1555,26 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        return false;
                                }
 
-                               if !peer.channel_encryptor.is_ready_for_encryption() {
-                                       // The peer needs to complete its handshake before we can exchange messages
+                               peer.received_message_since_timer_tick = false;
+                               if peer.awaiting_pong_tick_intervals == -1 {
+                                       // Magic value set in `maybe_send_extra_ping`.
+                                       peer.awaiting_pong_tick_intervals = 1;
+                                       return true;
+                               }
+
+                               if peer.awaiting_pong_tick_intervals > 0 {
+                                       peer.awaiting_pong_tick_intervals += 1;
                                        return true;
                                }
 
+                               peer.awaiting_pong_tick_intervals = 1;
                                let ping = msgs::Ping {
                                        ponglen: 0,
                                        byteslen: 64,
                                };
                                self.enqueue_message(peer, &ping);
+                               self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
 
-                               let mut descriptor_clone = descriptor.clone();
-                               self.do_attempt_write_data(&mut descriptor_clone, peer);
-
-                               peer.awaiting_pong = true;
                                true
                        });
 
@@ -1643,11 +1733,23 @@ mod tests {
                // than can fit into a peer's buffer).
                let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
 
-               // Make each peer to read the messages that the other peer just wrote to them.
-               peers[0].process_events();
-               peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
-               peers[1].process_events();
-               peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
+               // Make each peer to read the messages that the other peer just wrote to them. Note that
+               // due to the max-messagse-before-bing limits this may take a few iterations to complete.
+               for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
+                       peers[0].process_events();
+                       let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+                       assert!(!b_read_data.is_empty());
+
+                       peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
+                       peers[1].process_events();
+
+                       let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+                       assert!(!a_read_data.is_empty());
+                       peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
+
+                       peers[1].process_events();
+                       assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
+               }
 
                // Check that each peer has received the expected number of channel updates and channel
                // announcements.