}
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
-/// forwarding messages to peers altogether.
-const FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT: usize = 2;
+/// forwarding gossip messages to peers altogether.
+const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
/// we have fewer than this many messages in the outbound buffer again.
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
/// the peer.
-const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_SIZE_LIMIT;
+const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
struct Peer {
channel_encryptor: PeerChannelEncryptor,
}
}
- /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
+ /// Append a message to a peer's pending outbound/write buffer
+ fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
+ }
+
+ /// Append a message to a peer's pending outbound/write buffer
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
let mut buffer = VecWriter(Vec::with_capacity(2048));
wire::write(message, &mut buffer).unwrap(); // crash if the write failed
- let encoded_message = buffer.0;
-
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()));
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
+ self.enqueue_encoded_message(peer, &buffer.0);
}
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.enqueue_encoded_message(peer, &encoded_msg);
}
},
wire::Message::NodeAnnouncement(ref msg) => {
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.enqueue_encoded_message(peer, &encoded_msg);
}
},
wire::Message::ChannelUpdate(ref msg) => {
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+ self.enqueue_encoded_message(peer, &encoded_msg);
}
},
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),