Refactor message broadcasting out into a utility method
[rust-lightning] / lightning / src / ln / peer_handler.rs
index 6e05b5498cf284391db623152242a571c57aa346..0497fae158b45006cc697935fe7cab87f3f0cd6a 100644 (file)
@@ -807,8 +807,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                }
                                        }
 
-                                       self.do_attempt_write_data(peer_descriptor, peer);
-
                                        peer.pending_outbound_buffer.len() > 10 // pause_read
                                }
                        };
@@ -1011,6 +1009,64 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                Ok(())
        }
 
+       fn forward_broadcast_msg(&self, peers: &mut PeerHolder<Descriptor>, msg: &wire::Message, except_node: Option<&PublicKey>) {
+               match msg {
+                       wire::Message::ChannelAnnouncement(ref msg) => {
+                               let encoded_msg = encode_msg!(msg);
+
+                               for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
+                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                                       !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
+                                               continue
+                                       }
+                                       if peer.their_node_id.as_ref() == Some(&msg.contents.node_id_1) ||
+                                          peer.their_node_id.as_ref() == Some(&msg.contents.node_id_2) {
+                                               continue;
+                                       }
+                                       if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
+                                               continue;
+                                       }
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                                       peers.peers_needing_send.insert((*descriptor).clone());
+                               }
+                       },
+                       wire::Message::NodeAnnouncement(ref msg) => {
+                               let encoded_msg = encode_msg!(msg);
+
+                               for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
+                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                                       !peer.should_forward_node_announcement(msg.contents.node_id) {
+                                               continue
+                                       }
+                                       if peer.their_node_id.as_ref() == Some(&msg.contents.node_id) {
+                                               continue;
+                                       }
+                                       if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
+                                               continue;
+                                       }
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                                       peers.peers_needing_send.insert((*descriptor).clone());
+                               }
+                       },
+                       wire::Message::ChannelUpdate(ref msg) => {
+                               let encoded_msg = encode_msg!(msg);
+
+                               for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
+                                       if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
+                                                       !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
+                                               continue
+                                       }
+                                       if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
+                                               continue;
+                                       }
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
+                                       peers.peers_needing_send.insert((*descriptor).clone());
+                               }
+                       },
+                       _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
+               }
+       }
+
        /// Checks for any events generated by our handlers and processes them. Includes sending most
        /// response messages as well as messages generated by calls to handler functions directly (eg
        /// functions like ChannelManager::process_pending_htlc_forward or send_payment).
@@ -1026,19 +1082,17 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                        let peers = &mut *peers_lock;
                        for event in events_generated.drain(..) {
                                macro_rules! get_peer_for_forwarding {
-                                       ($node_id: expr, $handle_no_such_peer: block) => {
+                                       ($node_id: expr) => {
                                                {
                                                        let descriptor = match peers.node_id_to_descriptor.get($node_id) {
                                                                Some(descriptor) => descriptor.clone(),
                                                                None => {
-                                                                       $handle_no_such_peer;
                                                                        continue;
                                                                },
                                                        };
                                                        match peers.peers.get_mut(&descriptor) {
                                                                Some(peer) => {
                                                                        if peer.their_features.is_none() {
-                                                                               $handle_no_such_peer;
                                                                                continue;
                                                                        }
                                                                        (descriptor, peer)
@@ -1053,9 +1107,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1063,9 +1115,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1074,10 +1124,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id),
                                                                log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
-                                                               //they should just throw away this funding transaction
-                                                       });
+                                               // TODO: If the peer is gone we should generate a DiscardFunding event
+                                               // indicating to the wallet that they should just throw away this funding transaction
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1085,10 +1134,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
-                                                               //they should just throw away this funding transaction
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1096,9 +1142,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1106,10 +1150,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: generate a DiscardFunding event indicating to the wallet that
-                                                               //they should just throw away this funding transaction
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1120,9 +1161,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                update_fulfill_htlcs.len(),
                                                                update_fail_htlcs.len(),
                                                                log_bytes!(commitment_signed.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                for msg in update_add_htlcs {
                                                        peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                }
@@ -1145,9 +1184,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1155,9 +1192,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1165,9 +1200,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
@@ -1175,65 +1208,27 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                log_trace!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.channel_id));
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                               //TODO: Do whatever we're gonna do for handling dropped messages
-                                                       });
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
-                                       MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
+                                       MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
                                                log_trace!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
-                                               if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg);
-                                                       let encoded_update_msg = encode_msg!(update_msg);
-
-                                                       for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
-                                                                       continue
-                                                               }
-                                                               match peer.their_node_id {
-                                                                       None => continue,
-                                                                       Some(their_node_id) => {
-                                                                               if their_node_id == msg.contents.node_id_1 || their_node_id == msg.contents.node_id_2 {
-                                                                                       continue
-                                                                               }
-                                                                       }
-                                                               }
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
-                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
-                                                       }
+                                               if self.message_handler.route_handler.handle_channel_announcement(&msg).is_ok() && self.message_handler.route_handler.handle_channel_update(&update_msg).is_ok() {
+                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None);
+                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None);
                                                }
                                        },
-                                       MessageSendEvent::BroadcastNodeAnnouncement { ref msg } => {
+                                       MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
                                                log_trace!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler");
-                                               if self.message_handler.route_handler.handle_node_announcement(msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg);
-
-                                                       for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_node_announcement(msg.contents.node_id) {
-                                                                       continue
-                                                               }
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
-                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
-                                                       }
+                                               if self.message_handler.route_handler.handle_node_announcement(&msg).is_ok() {
+                                                       self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None);
                                                }
                                        },
-                                       MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
+                                       MessageSendEvent::BroadcastChannelUpdate { msg } => {
                                                log_trace!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
-                                               if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg);
-
-                                                       for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
-                                                               if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
-                                                                               !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
-                                                                       continue
-                                                               }
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
-                                                               self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
-                                                       }
+                                               if self.message_handler.route_handler.handle_channel_update(&msg).is_ok() {
+                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None);
                                                }
                                        },
                                        MessageSendEvent::PaymentFailureNetworkUpdate { ref update } => {
@@ -1266,21 +1261,19 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
                                                                                log_pubkey!(node_id),
                                                                                msg.data);
-                                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
-                                                                       //TODO: Do whatever we're gonna do for handling dropped messages
-                                                               });
+                                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                                self.do_attempt_write_data(&mut descriptor, peer);
                                                        },
                                                }
                                        },
                                        MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        }
@@ -1291,7 +1284,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                        msg.first_blocknum,
                                                        msg.number_of_blocks,
                                                        msg.sync_complete);
-                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id);
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        }
@@ -1494,7 +1487,9 @@ mod tests {
                let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone()).unwrap();
                peer_a.new_inbound_connection(fd_a.clone()).unwrap();
                assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
+               peer_a.process_events();
                assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               peer_b.process_events();
                assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
                (fd_a.clone(), fd_b.clone())
        }
@@ -1533,10 +1528,12 @@ mod tests {
 
                // peers[0] awaiting_pong is set to true, but the Peer is still connected
                peers[0].timer_tick_occurred();
+               peers[0].process_events();
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
 
                // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected
                peers[0].timer_tick_occurred();
+               peers[0].process_events();
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
        }
 
@@ -1557,7 +1554,9 @@ mod tests {
                let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
 
                // Make each peer to read the messages that the other peer just wrote to them.
+               peers[0].process_events();
                peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
+               peers[1].process_events();
                peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
 
                // Check that each peer has received the expected number of channel updates and channel