Avoid reentrancy of send_data from PeerHandler::read_bytes. 2018-10-msg-resp-overhaul
authorMatt Corallo <git@bluematt.me>
Sat, 20 Oct 2018 22:17:19 +0000 (18:17 -0400)
committerMatt Corallo <git@bluematt.me>
Sat, 27 Oct 2018 13:42:04 +0000 (09:42 -0400)
This greatly simplifies clients of PeerHandler, and because almost
all response messages have already been moved to process_events
this doesn't change much effeciency-wise.

src/ln/peer_handler.rs

index 4471ea0025807d4ad63c71338785ca6b6897cffe..d868b0758bd745a6e7ee53294e7127c7942d4a89 100644 (file)
@@ -15,7 +15,7 @@ use util::byte_utils;
 use util::events::{MessageSendEvent};
 use util::logger::Logger;
 
-use std::collections::{HashMap,hash_map,LinkedList};
+use std::collections::{HashMap,hash_map,HashSet,LinkedList};
 use std::sync::{Arc, Mutex};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::{cmp,error,hash,fmt};
@@ -106,17 +106,22 @@ struct Peer {
 
 struct PeerHolder<Descriptor: SocketDescriptor> {
        peers: HashMap<Descriptor, Peer>,
+       /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
+       /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
+       peers_needing_send: HashSet<Descriptor>,
        /// Only add to this set when noise completes:
        node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
 }
 struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> {
        peers: &'a mut HashMap<Descriptor, Peer>,
+       peers_needing_send: &'a mut HashSet<Descriptor>,
        node_id_to_descriptor: &'a mut HashMap<PublicKey, Descriptor>,
 }
 impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
        fn borrow_parts(&mut self) -> MutPeerHolder<Descriptor> {
                MutPeerHolder {
                        peers: &mut self.peers,
+                       peers_needing_send: &mut self.peers_needing_send,
                        node_id_to_descriptor: &mut self.node_id_to_descriptor,
                }
        }
@@ -162,7 +167,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc<Logger>) -> PeerManager<Descriptor> {
                PeerManager {
                        message_handler: message_handler,
-                       peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }),
+                       peers: Mutex::new(PeerHolder {
+                               peers: HashMap::new(),
+                               peers_needing_send: HashSet::new(),
+                               node_id_to_descriptor: HashMap::new()
+                       }),
                        our_node_secret: our_node_secret,
                        initial_syncs_sent: AtomicUsize::new(0),
                        logger,
@@ -188,7 +197,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        /// Note that if an Err is returned here you MUST NOT call disconnect_event for the new
        /// descriptor but must disconnect the connection immediately.
        ///
-       /// Returns some bytes to send to the remote node.
+       /// Returns a small number of bytes to send to the remote node (currently always 50).
        ///
        /// Panics if descriptor is duplicative with some other descriptor which has not yet has a
        /// disconnect_event.
@@ -298,16 +307,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
        ///
        /// May return an Err to indicate that the connection should be closed.
        ///
-       /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into
-       /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The
-       /// invariants around calling write_event in case a write did not fully complete must still
-       /// hold. Note that this function will often call send_data on many peers before returning, not
-       /// just this peer!
+       /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity.
+       /// Thus, however, you almost certainly want to call process_events() after any read_event to
+       /// generate send_data calls to handle responses.
        ///
        /// If Ok(true) is returned, further read_events should not be triggered until a write_event on
-       /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note
-       /// that this must be true even if a send_data call with resume_read=true was made during the
-       /// course of this function!
+       /// this file descriptor has resume_read set (preventing DoS issues in the send buffer).
        ///
        /// Panics if the descriptor was not previously registered in a new_*_connection event.
        pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
@@ -347,6 +352,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                                        {
                                                                                log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
                                                                                peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
+                                                                               peers.peers_needing_send.insert(peer_descriptor.clone());
                                                                        }
                                                                }
                                                        }
@@ -670,13 +676,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        pause_read
                };
 
-               self.process_events();
-
                Ok(pause_read)
        }
 
-       /// Checks for any events generated by our handlers and processes them. May be needed after eg
-       /// calls to ChannelManager::process_pending_htlc_forward.
+       /// Checks for any events generated by our handlers and processes them. Includes sending most
+       /// response messages as well as messages generated by calls to handler functions directly (eg
+       /// functions like ChannelManager::process_pending_htlc_forward or send_payment).
        pub fn process_events(&self) {
                {
                        // TODO: There are some DoS attacks here where you can flood someone's outbound send
@@ -684,7 +689,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                        // drop optional-ish messages when send buffers get full!
 
                        let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
-                       let mut peers = self.peers.lock().unwrap();
+                       let mut peers_lock = self.peers.lock().unwrap();
+                       let peers = peers_lock.borrow_parts();
                        for event in events_generated.drain(..) {
                                macro_rules! get_peer_for_forwarding {
                                        ($node_id: expr, $handle_no_such_peer: block) => {
@@ -888,6 +894,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                                        match *action {
                                                                msgs::ErrorAction::DisconnectPeer { ref msg } => {
                                                                        if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) {
+                                                                               peers.peers_needing_send.remove(&descriptor);
                                                                                if let Some(mut peer) = peers.peers.remove(&descriptor) {
                                                                                        if let Some(ref msg) = *msg {
                                                                                                log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
@@ -923,6 +930,13 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
                                        }
                                }
                        }
+
+                       for mut descriptor in peers.peers_needing_send.drain() {
+                               match peers.peers.get_mut(&descriptor) {
+                                       Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer),
+                                       None => panic!("Inconsistent peers set state!"),
+                               }
+                       }
                }
        }
 
@@ -938,6 +952,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
 
        fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
                let mut peers = self.peers.lock().unwrap();
+               peers.peers_needing_send.remove(descriptor);
                let peer_option = peers.peers.remove(descriptor);
                match peer_option {
                        None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),