]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Reliably deliver gossip messages from our `ChannelMessageHandler`
authorMatt Corallo <git@bluematt.me>
Mon, 24 Jun 2024 20:24:36 +0000 (20:24 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 17 Oct 2024 19:09:15 +0000 (19:09 +0000)
When our `ChannelMessageHandler` creates gossip broadcast
`MessageSendEvent`s, we generally want these to be reliably
delivered to all our peers, even if there's not much buffer space
available.

Here we do this by passing an extra flag to `forward_broadcast_msg`
which indicates where the message came from, then ignoring the
buffer-full criteria when the flag is set.

lightning/src/ln/peer_handler.rs

index a18f8bf8508fa27457a523dc7887a4835e4059cd..a2919aa947aa585266d05510adce4513a5f47bfc 100644 (file)
@@ -1600,7 +1600,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                }
 
                for msg in msgs_to_forward.drain(..) {
-                       self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref().map(|(pk, _)| pk));
+                       self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref().map(|(pk, _)| pk), false);
                }
 
                Ok(pause_read)
@@ -1946,7 +1946,17 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                Ok(should_forward)
        }
 
-       fn forward_broadcast_msg(&self, peers: &HashMap<Descriptor, Mutex<Peer>>, msg: &wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) {
+       /// Forwards a gossip `msg` to `peers` excluding node(s) that generated the gossip message and
+       /// excluding `except_node`.
+       ///
+       /// If the message queue for a peer is somewhat full, the message will not be forwarded to them
+       /// unless `allow_large_buffer` is set, in which case the message will be treated as critical
+       /// and delivered no matter the available buffer space.
+       fn forward_broadcast_msg(
+               &self, peers: &HashMap<Descriptor, Mutex<Peer>>,
+               msg: &wire::Message<<<CMH as Deref>::Target as wire::CustomMessageReader>::CustomMessage>,
+               except_node: Option<&PublicKey>, allow_large_buffer: bool,
+       ) {
                match msg {
                        wire::Message::ChannelAnnouncement(ref msg) => {
                                log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg);
@@ -1961,7 +1971,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        debug_assert!(peer.their_node_id.is_some());
                                        debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
                                        let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
-                                       if peer.buffer_full_drop_gossip_broadcast() {
+                                       if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
                                                log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1989,7 +1999,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        debug_assert!(peer.their_node_id.is_some());
                                        debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
                                        let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
-                                       if peer.buffer_full_drop_gossip_broadcast() {
+                                       if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
                                                log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -2017,7 +2027,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        debug_assert!(peer.their_node_id.is_some());
                                        debug_assert!(peer.channel_encryptor.is_ready_for_encryption());
                                        let logger = WithContext::from(&self.logger, peer.their_node_id.map(|p| p.0), None, None);
-                                       if peer.buffer_full_drop_gossip_broadcast() {
+                                       if peer.buffer_full_drop_gossip_broadcast() && !allow_large_buffer {
                                                log_gossip!(logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -2099,6 +2109,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                }
                                        }
                                }
+
+                               // Handles a `MessageSendEvent`, using `from_chan_handler` to decide if we should
+                               // robustly gossip broadcast events even if a peer's message buffer is full.
                                let mut handle_event = |event, from_chan_handler| {
                                        match event {
                                                MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
@@ -2293,14 +2306,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
                                                        log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                        match self.message_handler.route_handler.handle_channel_announcement(None, &msg) {
-                                                               Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
-                                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
+                                                               Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
+                                                                       let forward = wire::Message::ChannelAnnouncement(msg);
+                                                                       self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
+                                                               },
                                                                _ => {},
                                                        }
                                                        if let Some(msg) = update_msg {
                                                                match self.message_handler.route_handler.handle_channel_update(None, &msg) {
-                                                                       Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
-                                                                               self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
+                                                                       Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
+                                                                               let forward = wire::Message::ChannelUpdate(msg);
+                                                                               self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
+                                                                       },
                                                                        _ => {},
                                                                }
                                                        }
@@ -2308,16 +2325,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                MessageSendEvent::BroadcastChannelUpdate { msg } => {
                                                        log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}", msg.contents);
                                                        match self.message_handler.route_handler.handle_channel_update(None, &msg) {
-                                                               Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
-                                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
+                                                               Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
+                                                                       let forward = wire::Message::ChannelUpdate(msg);
+                                                                       self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
+                                                               },
                                                                _ => {},
                                                        }
                                                },
                                                MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
                                                        log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
                                                        match self.message_handler.route_handler.handle_node_announcement(None, &msg) {
-                                                               Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
-                                                                       self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
+                                                               Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => {
+                                                                       let forward = wire::Message::NodeAnnouncement(msg);
+                                                                       self.forward_broadcast_msg(peers, &forward, None, from_chan_handler);
+                                                               },
                                                                _ => {},
                                                        }
                                                },
@@ -2689,7 +2710,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
                log_debug!(self.logger, "Broadcasting NodeAnnouncement after passing it to our own RoutingMessageHandler.");
                let _ = self.message_handler.route_handler.handle_node_announcement(None, &msg);
-               self.forward_broadcast_msg(&*self.peers.read().unwrap(), &wire::Message::NodeAnnouncement(msg), None);
+               self.forward_broadcast_msg(&*self.peers.read().unwrap(), &wire::Message::NodeAnnouncement(msg), None, true);
        }
 }