Add logging for (dis)connect in peer_handler/channelmanager
[rust-lightning] / src / ln / peer_handler.rs
index 9a2cee9dd4931ec01cd579250edb917ff2c4ee8d..5d13df95c208853bea8202c4dbff34217e374357 100644 (file)
@@ -15,7 +15,7 @@ use util::byte_utils;
 use util::events::{MessageSendEvent};
 use util::logger::Logger;
 
-use std::collections::{HashMap,hash_map,LinkedList};
+use std::collections::{HashMap,hash_map,HashSet,LinkedList};
 use std::sync::{Arc, Mutex};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::{cmp,error,hash,fmt};
@@ -106,17 +106,22 @@ struct Peer {
 
 struct PeerHolder<Descriptor: SocketDescriptor> {
        peers: HashMap<Descriptor, Peer>,
+       /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
+       /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
+       peers_needing_send: HashSet<Descriptor>,
        /// Only add to this set when noise completes:
        node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
 }
 struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> {
        peers: &'a mut HashMap<Descriptor, Peer>,
+       peers_needing_send: &'a mut HashSet<Descriptor>,
        node_id_to_descriptor: &'a mut HashMap<PublicKey, Descriptor>,
 }
 impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
        fn borrow_parts(&mut self) -> MutPeerHolder<Descriptor> {
                MutPeerHolder {
                        peers: &mut self.peers,
+                       peers_needing_send: &mut self.peers_needing_send,
                        node_id_to_descriptor: &mut self.node_id_to_descriptor,
                }
        }
@@ -162,7 +167,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc<Logger>) -> PeerManager<Descriptor> {
                PeerManager {
                        message_handler: message_handler,
-                       peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }),
+                       peers: Mutex::new(PeerHolder {
+                               peers: HashMap::new(),
+                               peers_needing_send: HashSet::new(),
+                               node_id_to_descriptor: HashMap::new()
+                       }),
                        our_node_secret: our_node_secret,
                        initial_syncs_sent: AtomicUsize::new(0),
                        logger,
@@ -188,7 +197,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        /// Note that if an Err is returned here you MUST NOT call disconnect_event for the new
        /// descriptor but must disconnect the connection immediately.
        ///
-       /// Returns some bytes to send to the remote node.
+       /// Returns a small number of bytes to send to the remote node (currently always 50).
        ///
        /// Panics if descriptor is duplicative with some other descriptor which has not yet has a
        /// disconnect_event.
@@ -298,16 +307,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        ///
        /// May return an Err to indicate that the connection should be closed.
        ///
-       /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into
-       /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The
-       /// invariants around calling write_event in case a write did not fully complete must still
-       /// hold. Note that this function will often call send_data on many peers before returning, not
-       /// just this peer!
+       /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity.
+       /// Thus, however, you almost certainly want to call process_events() after any read_event to
+       /// generate send_data calls to handle responses.
        ///
        /// If Ok(true) is returned, further read_events should not be triggered until a write_event on
-       /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note
-       /// that this must be true even if a send_data call with resume_read=true was made during the
-       /// course of this function!
+       /// this file descriptor has resume_read set (preventing DoS issues in the send buffer).
        ///
        /// Panics if the descriptor was not previously registered in a new_*_connection event.
        pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
@@ -347,6 +352,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                        {
                                                                                log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
                                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
+                                                                               peers.peers_needing_send.insert(peer_descriptor.clone());
                                                                        }
                                                                }
                                                        }
@@ -411,10 +417,14 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                () => {
                                                                        match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) {
                                                                                hash_map::Entry::Occupied(_) => {
+                                                                                       log_trace!(self, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap()));
                                                                                        peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
                                                                                        return Err(PeerHandleError{ no_connection_possible: false })
                                                                                },
-                                                                               hash_map::Entry::Vacant(entry) => entry.insert(peer_descriptor.clone()),
+                                                                               hash_map::Entry::Vacant(entry) => {
+                                                                                       log_trace!(self, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap()));
+                                                                                       entry.insert(peer_descriptor.clone())
+                                                                               },
                                                                        };
                                                                }
                                                        }
@@ -471,6 +481,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                log_trace!(self, "Received message of type {} from {}", msg_type, log_pubkey!(peer.their_node_id.unwrap()));
                                                                                if msg_type != 16 && peer.their_global_features.is_none() {
                                                                                        // Need an init message as first message
+                                                                                       log_trace!(self, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
                                                                                        return Err(PeerHandleError{ no_connection_possible: false });
                                                                                }
                                                                                let mut reader = ::std::io::Cursor::new(&msg_data[2..]);
@@ -520,9 +531,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                                        }, 16);
                                                                                                }
 
-                                                                                               for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) {
-                                                                                                       encode_and_send_msg!(msg, 136);
-                                                                                               }
+                                                                                               self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap());
                                                                                        },
                                                                                        17 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader));
@@ -581,20 +590,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
                                                                                        38 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader));
-                                                                                               let resp_options = try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(resp) = resp_options.0 {
-                                                                                                       encode_and_send_msg!(resp, 38);
-                                                                                               }
-                                                                                               if let Some(resp) = resp_options.1 {
-                                                                                                       encode_and_send_msg!(resp, 39);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        39 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader));
-                                                                                               let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(resp) = resp_option {
-                                                                                                       encode_and_send_msg!(resp, 39);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
 
                                                                                        128 => {
@@ -616,33 +616,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
                                                                                        132 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::CommitmentSigned::read(&mut reader));
-                                                                                               let resps = try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg));
-                                                                                               encode_and_send_msg!(resps.0, 133);
-                                                                                               if let Some(resp) = resps.1 {
-                                                                                                       encode_and_send_msg!(resp, 132);
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        133 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::RevokeAndACK::read(&mut reader));
-                                                                                               let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg));
-                                                                                               match resp_option {
-                                                                                                       Some(resps) => {
-                                                                                                               for resp in resps.update_add_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 128);
-                                                                                                               }
-                                                                                                               for resp in resps.update_fulfill_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 130);
-                                                                                                               }
-                                                                                                               for resp in resps.update_fail_htlcs {
-                                                                                                                       encode_and_send_msg!(resp, 131);
-                                                                                                               }
-                                                                                                               if let Some(resp) = resps.update_fee {
-                                                                                                                       encode_and_send_msg!(resp, 134);
-                                                                                                               }
-                                                                                                               encode_and_send_msg!(resps.commitment_signed, 132);
-                                                                                                       },
-                                                                                                       None => {},
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
                                                                                        134 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::UpdateFee::read(&mut reader));
@@ -650,45 +628,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                                        },
                                                                                        136 => {
                                                                                                let msg = try_potential_decodeerror!(msgs::ChannelReestablish::read(&mut reader));
-                                                                                               let (funding_locked, revoke_and_ack, commitment_update, order) = try_potential_handleerror!(self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg));
-                                                                                               if let Some(lock_msg) = funding_locked {
-                                                                                                       encode_and_send_msg!(lock_msg, 36);
-                                                                                               }
-                                                                                               macro_rules! handle_raa { () => {
-                                                                                                       if let Some(revoke_msg) = revoke_and_ack {
-                                                                                                               encode_and_send_msg!(revoke_msg, 133);
-                                                                                                       }
-                                                                                               } }
-                                                                                               macro_rules! handle_cu { () => {
-                                                                                                       match commitment_update {
-                                                                                                               Some(resps) => {
-                                                                                                                       for resp in resps.update_add_htlcs {
-                                                                                                                               encode_and_send_msg!(resp, 128);
-                                                                                                                       }
-                                                                                                                       for resp in resps.update_fulfill_htlcs {
-                                                                                                                               encode_and_send_msg!(resp, 130);
-                                                                                                                       }
-                                                                                                                       for resp in resps.update_fail_htlcs {
-                                                                                                                               encode_and_send_msg!(resp, 131);
-                                                                                                                       }
-                                                                                                                       if let Some(resp) = resps.update_fee {
-                                                                                                                               encode_and_send_msg!(resp, 134);
-                                                                                                                       }
-                                                                                                                       encode_and_send_msg!(resps.commitment_signed, 132);
-                                                                                                               },
-                                                                                                               None => {},
-                                                                                                       }
-                                                                                               } }
-                                                                                               match order {
-                                                                                                       msgs::RAACommitmentOrder::RevokeAndACKFirst => {
-                                                                                                               handle_raa!();
-                                                                                                               handle_cu!();
-                                                                                                       },
-                                                                                                       msgs::RAACommitmentOrder::CommitmentFirst => {
-                                                                                                               handle_cu!();
-                                                                                                               handle_raa!();
-                                                                                                       },
-                                                                                               }
+                                                                                               try_potential_handleerror!(self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg));
                                                                                        },
 
                                                                                        // Routing control:
@@ -741,13 +681,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        pause_read
                };
 
-               self.process_events();
-
                Ok(pause_read)
        }
 
-       /// Checks for any events generated by our handlers and processes them. May be needed after eg
-       /// calls to ChannelManager::process_pending_htlc_forward.
+       /// Checks for any events generated by our handlers and processes them. Includes sending most
+       /// response messages as well as messages generated by calls to handler functions directly (eg
+       /// functions like ChannelManager::process_pending_htlc_forward or send_payment).
        pub fn process_events(&self) {
                {
                        // TODO: There are some DoS attacks here where you can flood someone's outbound send
@@ -755,7 +694,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        // drop optional-ish messages when send buffers get full!
 
                        let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
-                       let mut peers = self.peers.lock().unwrap();
+                       let mut peers_lock = self.peers.lock().unwrap();
+                       let peers = peers_lock.borrow_parts();
                        for event in events_generated.drain(..) {
                                macro_rules! get_peer_for_forwarding {
                                        ($node_id: expr, $handle_no_such_peer: block) => {
@@ -883,6 +823,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
                                        },
+                                       MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
                                        MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
                                                log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
@@ -893,6 +843,16 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
                                                Self::do_attempt_write_data(&mut descriptor, peer);
                                        },
+                                       MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
+                                               log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
+                                                               //TODO: Do whatever we're gonna do for handling dropped messages
+                                                       });
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
+                                               Self::do_attempt_write_data(&mut descriptor, peer);
+                                       },
                                        MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                                log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
@@ -939,6 +899,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        match *action {
                                                                msgs::ErrorAction::DisconnectPeer { ref msg } => {
                                                                        if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) {
+                                                                               peers.peers_needing_send.remove(&descriptor);
                                                                                if let Some(mut peer) = peers.peers.remove(&descriptor) {
                                                                                        if let Some(ref msg) = *msg {
                                                                                                log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
@@ -974,6 +935,13 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                        }
                                }
                        }
+
+                       for mut descriptor in peers.peers_needing_send.drain() {
+                               match peers.peers.get_mut(&descriptor) {
+                                       Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer),
+                                       None => panic!("Inconsistent peers set state!"),
+                               }
+                       }
                }
        }
 
@@ -989,6 +957,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
        fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
                let mut peers = self.peers.lock().unwrap();
+               peers.peers_needing_send.remove(descriptor);
                let peer_option = peers.peers.remove(descriptor);
                match peer_option {
                        None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),