}
for msg in msgs_to_forward.drain(..) {
- self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref().map(|(pk, _)| pk));
+ self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref().map(|(pk, _)| pk), false);
}
Ok(pause_read)
Ok(should_forward)
}
- fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
+ /// Forwards a gossip `msg` to `peers` excluding node(s) that generated the gossip message and
+ /// excluding `except_node`.
+ ///
+ /// If the message queue for a peer is somewhat full, the message will not be forwarded to them
+ /// unless `allow_large_buffer` is set, in which case the message will be treated as critical
+ /// and delivered no matter the available buffer space.
+ fn forward_broadcast_msg(
+ &self, peers: &HashMap<Descriptor, Mutex<Peer>>,
+ msg: &wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>,
+ except_node: Option<&PublicKey>, allow_large_buffer: bool,
+ ) {
match msg {
wire::Message::ChannelAnnouncement(ref msg) => {
log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
debug_assert!(peer.their_node_id.is_some());
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
- if peer.buffer_full_drop_gossip_broadcast() {
+ if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
debug_assert!(peer.their_node_id.is_some());
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
- if peer.buffer_full_drop_gossip_broadcast() {
+ if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
debug_assert!(peer.their_node_id.is_some());
debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
- if peer.buffer_full_drop_gossip_broadcast() {
+ if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
}
}
}
+
+ // Handles a `MessageSendEvent`, using `from_chan_handler` to decide if we should
+ // robustly gossip broadcast events even if a peer's message buffer is full.
let mut handle_event = |event, from_chan_handler| {
match event {
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
match self.message_handler.route_handler.handle_channel_announcement(None, &msg) {
- Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
- self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
+ Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
+ let forward = wire::Message::ChannelAnnouncement(msg);
+ self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
+ },
_ => {},
}
if let Some(msg) = update_msg {
match self.message_handler.route_handler.handle_channel_update(None, &msg) {
- Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
- self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
+ Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
+ let forward = wire::Message::ChannelUpdate(msg);
+ self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
+ },
_ => {},
}
}
MessageSendEvent::BroadcastChannelUpdate { msg } => {
log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}", msg.contents);
match self.message_handler.route_handler.handle_channel_update(None, &msg) {
- Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
- self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
+ Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
+ let forward = wire::Message::ChannelUpdate(msg);
+ self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
+ },
_ => {},
}
},
MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
match self.message_handler.route_handler.handle_node_announcement(None, &msg) {
- Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
- self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
+ Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
+ let forward = wire::Message::NodeAnnouncement(msg);
+ self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
+ },
_ => {},
}
},
log_debug!(self.logger, "Broadcasting NodeAnnouncement after passing it to our own RoutingMessageHandler.");
let _ = self.message_handler.route_handler.handle_node_announcement(None, &msg);
- self.forward_broadcast_msg(&*self.peers.read().unwrap(), &wire::Message::NodeAnnouncement(msg), None);
+ self.forward_broadcast_msg(&*self.peers.read().unwrap(), &wire::Message::NodeAnnouncement(msg), None, true);
}
}