awaiting_pong_timer_tick_intervals: i8,
received_message_since_timer_tick: bool,
sent_gossip_timestamp_filter: bool,
+
+ /// Indicates we've received a `channel_announcement` since the last time we had
+ /// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a
+ /// `channel_announcement` at all - we set this unconditionally but unset it every time we
+ /// check if we're gossip-processing-backlogged).
+ received_channel_announce_since_backlogged: bool,
}
impl Peer {
/// Returns whether we should be reading bytes from this peer, based on whether its outbound
/// buffer still has space and we don't need to pause reads to get some writes out.
- fn should_read(&self) -> bool {
- self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
+ fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool {
+ if !gossip_processing_backlogged {
+ self.received_channel_announce_since_backlogged = false;
+ }
+ self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
+ (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged)
}
/// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
awaiting_pong_timer_tick_intervals: 0,
received_message_since_timer_tick: false,
sent_gossip_timestamp_filter: false,
+
+ received_channel_announce_since_backlogged: false,
})).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
awaiting_pong_timer_tick_intervals: 0,
received_message_since_timer_tick: false,
sent_gossip_timestamp_filter: false,
+
+ received_channel_announce_since_backlogged: false,
})).is_some() {
panic!("PeerManager driver duplicated descriptors!");
};
Ok(())
}
- fn peer_should_read(&self, peer: &Peer) -> bool {
- !self.gossip_processing_backlogged.load(Ordering::Relaxed) && peer.should_read()
+ fn peer_should_read(&self, peer: &mut Peer) -> bool {
+ peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed))
}
fn update_gossip_backlogged(&self) {
self.maybe_send_extra_ping(peer);
}
+ let should_read = self.peer_should_read(peer);
let next_buff = match peer.pending_outbound_buffer.front() {
None => {
if force_one_write && !have_written {
- let should_read = self.peer_should_read(&peer);
if should_read {
let data_sent = descriptor.send_data(&[], should_read);
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
};
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
- let data_sent = descriptor.send_data(pending, self.peer_should_read(&peer));
+ let data_sent = descriptor.send_data(pending, should_read);
have_written = true;
peer.pending_outbound_buffer_first_msg_offset += data_sent;
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
}
}
}
- pause_read = !self.peer_should_read(&peer);
+ pause_read = !self.peer_should_read(peer);
if let Some(message) = msg_to_handle {
match self.handle_message(&peer_mutex, peer_lock, message) {
return Ok(None);
}
+ if let wire::Message::ChannelAnnouncement(ref _msg) = message {
+ peer_lock.received_channel_announce_since_backlogged = true;
+ }
+
mem::drop(peer_lock);
if is_gossip_msg(message.type_id()) {
}
for (descriptor, peer_mutex) in peers.iter() {
- self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap(), flush_read_disabled);
+ let mut peer = peer_mutex.lock().unwrap();
+ if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
+ self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled);
}
}
if !peers_to_disconnect.is_empty() {
for (descriptor, peer_mutex) in peers_lock.iter() {
let mut peer = peer_mutex.lock().unwrap();
+ if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
+
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
// The peer needs to complete its handshake before we can exchange messages. We
// give peers one timer tick to complete handshake, reusing