X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fln%2Fpeer_handler.rs;h=d868b0758bd745a6e7ee53294e7127c7942d4a89;hb=7bb598e525dc90e978d29a68b3091203c63c024e;hp=4471ea0025807d4ad63c71338785ca6b6897cffe;hpb=249aa7755038172279781caf47f681c01843dc73;p=rust-lightning diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index 4471ea00..d868b075 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -15,7 +15,7 @@ use util::byte_utils; use util::events::{MessageSendEvent}; use util::logger::Logger; -use std::collections::{HashMap,hash_map,LinkedList}; +use std::collections::{HashMap,hash_map,HashSet,LinkedList}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{cmp,error,hash,fmt}; @@ -106,17 +106,22 @@ struct Peer { struct PeerHolder { peers: HashMap, + /// Added to by do_read_event for cases where we pushed a message onto the send buffer but + /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events() + peers_needing_send: HashSet, /// Only add to this set when noise completes: node_id_to_descriptor: HashMap, } struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> { peers: &'a mut HashMap, + peers_needing_send: &'a mut HashSet, node_id_to_descriptor: &'a mut HashMap, } impl PeerHolder { fn borrow_parts(&mut self) -> MutPeerHolder { MutPeerHolder { peers: &mut self.peers, + peers_needing_send: &mut self.peers_needing_send, node_id_to_descriptor: &mut self.node_id_to_descriptor, } } @@ -162,7 +167,11 @@ impl PeerManager { pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc) -> PeerManager { PeerManager { message_handler: message_handler, - peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }), + peers: Mutex::new(PeerHolder { + peers: HashMap::new(), + peers_needing_send: HashSet::new(), + node_id_to_descriptor: HashMap::new() + }), our_node_secret: our_node_secret, initial_syncs_sent: AtomicUsize::new(0), logger, @@ -188,7 +197,7 @@ impl PeerManager { /// Note that if an Err is returned here you MUST NOT call disconnect_event for the new /// descriptor but must disconnect the connection immediately. /// - /// Returns some bytes to send to the remote node. + /// Returns a small number of bytes to send to the remote node (currently always 50). /// /// Panics if descriptor is duplicative with some other descriptor which has not yet has a /// disconnect_event. @@ -298,16 +307,12 @@ impl PeerManager { /// /// May return an Err to indicate that the connection should be closed. /// - /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into - /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The - /// invariants around calling write_event in case a write did not fully complete must still - /// hold. Note that this function will often call send_data on many peers before returning, not - /// just this peer! + /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity. + /// Thus, however, you almost certainly want to call process_events() after any read_event to + /// generate send_data calls to handle responses. /// /// If Ok(true) is returned, further read_events should not be triggered until a write_event on - /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note - /// that this must be true even if a send_data call with resume_read=true was made during the - /// course of this function! + /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). /// /// Panics if the descriptor was not previously registered in a new_*_connection event. pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec) -> Result { @@ -347,6 +352,7 @@ impl PeerManager { { log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap())); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..])); + peers.peers_needing_send.insert(peer_descriptor.clone()); } } } @@ -670,13 +676,12 @@ impl PeerManager { pause_read }; - self.process_events(); - Ok(pause_read) } - /// Checks for any events generated by our handlers and processes them. May be needed after eg - /// calls to ChannelManager::process_pending_htlc_forward. + /// Checks for any events generated by our handlers and processes them. Includes sending most + /// response messages as well as messages generated by calls to handler functions directly (eg + /// functions like ChannelManager::process_pending_htlc_forward or send_payment). pub fn process_events(&self) { { // TODO: There are some DoS attacks here where you can flood someone's outbound send @@ -684,7 +689,8 @@ impl PeerManager { // drop optional-ish messages when send buffers get full! let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); - let mut peers = self.peers.lock().unwrap(); + let mut peers_lock = self.peers.lock().unwrap(); + let peers = peers_lock.borrow_parts(); for event in events_generated.drain(..) { macro_rules! get_peer_for_forwarding { ($node_id: expr, $handle_no_such_peer: block) => { @@ -888,6 +894,7 @@ impl PeerManager { match *action { msgs::ErrorAction::DisconnectPeer { ref msg } => { if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) { + peers.peers_needing_send.remove(&descriptor); if let Some(mut peer) = peers.peers.remove(&descriptor) { if let Some(ref msg) = *msg { log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", @@ -923,6 +930,13 @@ impl PeerManager { } } } + + for mut descriptor in peers.peers_needing_send.drain() { + match peers.peers.get_mut(&descriptor) { + Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer), + None => panic!("Inconsistent peers set state!"), + } + } } } @@ -938,6 +952,7 @@ impl PeerManager { fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { let mut peers = self.peers.lock().unwrap(); + peers.peers_needing_send.remove(descriptor); let peer_option = peers.peers.remove(descriptor); match peer_option { None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),