From e5606c6ca2430031bdebaf9d3e6b2b4e6476083a Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 10 Jun 2021 18:48:59 +0000 Subject: [PATCH] Refactor message broadcasting out into a utility method This will allow us to broadcast messages received in the next commit. --- lightning/src/ln/peer_handler.rs | 114 ++++++++++++++++++------------- 1 file changed, 68 insertions(+), 46 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 8d2d85d42..f03cfbea5 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -1010,6 +1010,64 @@ impl PeerManager, msg: &wire::Message, except_node: Option<&PublicKey>) { + match msg { + wire::Message::ChannelAnnouncement(ref msg) => { + let encoded_msg = encode_msg!(msg); + + for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { + if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || + !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { + continue + } + if peer.their_node_id.as_ref() == Some(&msg.contents.node_id_1) || + peer.their_node_id.as_ref() == Some(&msg.contents.node_id_2) { + continue; + } + if except_node.is_some() && peer.their_node_id.as_ref() == except_node { + continue; + } + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + peers.peers_needing_send.insert((*descriptor).clone()); + } + }, + wire::Message::NodeAnnouncement(ref msg) => { + let encoded_msg = encode_msg!(msg); + + for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { + if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || + !peer.should_forward_node_announcement(msg.contents.node_id) { + continue + } + if peer.their_node_id.as_ref() == Some(&msg.contents.node_id) { + continue; + } + if except_node.is_some() && peer.their_node_id.as_ref() == except_node { + continue; + } + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + peers.peers_needing_send.insert((*descriptor).clone()); + } + }, + wire::Message::ChannelUpdate(ref msg) => { + let encoded_msg = encode_msg!(msg); + + for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { + if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || + !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { + continue + } + if except_node.is_some() && peer.their_node_id.as_ref() == except_node { + continue; + } + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); + peers.peers_needing_send.insert((*descriptor).clone()); + } + }, + _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), + } + } + /// Checks for any events generated by our handlers and processes them. Includes sending most /// response messages as well as messages generated by calls to handler functions directly (eg /// functions like ChannelManager::process_pending_htlc_forward or send_payment). @@ -1157,59 +1215,23 @@ impl PeerManager { + MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { log_trace!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); - if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() { - let encoded_msg = encode_msg!(msg); - let encoded_update_msg = encode_msg!(update_msg); - - for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { - if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || - !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { - continue - } - match peer.their_node_id { - None => continue, - Some(their_node_id) => { - if their_node_id == msg.contents.node_id_1 || their_node_id == msg.contents.node_id_2 { - continue - } - } - } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..])); - self.do_attempt_write_data(&mut (*descriptor).clone(), peer); - } + if self.message_handler.route_handler.handle_channel_announcement(&msg).is_ok() && self.message_handler.route_handler.handle_channel_update(&update_msg).is_ok() { + self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None); + self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None); } }, - MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => { + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { log_trace!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler"); - if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() { - let encoded_msg = encode_msg!(msg); - - for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { - if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || - !peer.should_forward_node_announcement(msg.contents.node_id) { - continue - } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); - self.do_attempt_write_data(&mut (*descriptor).clone(), peer); - } + if self.message_handler.route_handler.handle_node_announcement(&msg).is_ok() { + self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None); } }, - MessageSendEvent::BroadcastChannelUpdate { ref msg } => { + MessageSendEvent::BroadcastChannelUpdate { msg } => { log_trace!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); - if self.message_handler.route_handler.handle_channel_update(msg).is_ok() { - let encoded_msg = encode_msg!(msg); - - for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { - if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || - !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { - continue - } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..])); - self.do_attempt_write_data(&mut (*descriptor).clone(), peer); - } + if self.message_handler.route_handler.handle_channel_update(&msg).is_ok() { + self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None); } }, MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => { -- 2.39.5