X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fln%2Fpeer_handler.rs;h=5d13df95c208853bea8202c4dbff34217e374357;hb=b030e84ad8f9eeb678fa52fa3b589f9e937a46ec;hp=5c7e23c6beaf68e229d0071e64df0f61303e7f38;hpb=e2de49ddc4143da3d87d1a8615bb1b9a33a4c5a3;p=rust-lightning diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index 5c7e23c6..5d13df95 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -15,7 +15,7 @@ use util::byte_utils; use util::events::{MessageSendEvent}; use util::logger::Logger; -use std::collections::{HashMap,hash_map,LinkedList}; +use std::collections::{HashMap,hash_map,HashSet,LinkedList}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{cmp,error,hash,fmt}; @@ -106,17 +106,22 @@ struct Peer { struct PeerHolder { peers: HashMap, + /// Added to by do_read_event for cases where we pushed a message onto the send buffer but + /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events() + peers_needing_send: HashSet, /// Only add to this set when noise completes: node_id_to_descriptor: HashMap, } struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> { peers: &'a mut HashMap, + peers_needing_send: &'a mut HashSet, node_id_to_descriptor: &'a mut HashMap, } impl PeerHolder { fn borrow_parts(&mut self) -> MutPeerHolder { MutPeerHolder { peers: &mut self.peers, + peers_needing_send: &mut self.peers_needing_send, node_id_to_descriptor: &mut self.node_id_to_descriptor, } } @@ -162,7 +167,11 @@ impl PeerManager { pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc) -> PeerManager { PeerManager { message_handler: message_handler, - peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }), + peers: Mutex::new(PeerHolder { + peers: HashMap::new(), + peers_needing_send: HashSet::new(), + node_id_to_descriptor: HashMap::new() + }), our_node_secret: our_node_secret, initial_syncs_sent: AtomicUsize::new(0), logger, @@ -188,7 +197,7 @@ impl PeerManager { /// Note that if an Err is returned here you MUST NOT call disconnect_event for the new /// descriptor but must disconnect the connection immediately. /// - /// Returns some bytes to send to the remote node. + /// Returns a small number of bytes to send to the remote node (currently always 50). /// /// Panics if descriptor is duplicative with some other descriptor which has not yet has a /// disconnect_event. @@ -298,16 +307,12 @@ impl PeerManager { /// /// May return an Err to indicate that the connection should be closed. /// - /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into - /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The - /// invariants around calling write_event in case a write did not fully complete must still - /// hold. Note that this function will often call send_data on many peers before returning, not - /// just this peer! + /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity. + /// Thus, however, you almost certainly want to call process_events() after any read_event to + /// generate send_data calls to handle responses. /// /// If Ok(true) is returned, further read_events should not be triggered until a write_event on - /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note - /// that this must be true even if a send_data call with resume_read=true was made during the - /// course of this function! + /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). /// /// Panics if the descriptor was not previously registered in a new_*_connection event. pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec) -> Result { @@ -347,6 +352,7 @@ impl PeerManager { { log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap())); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..])); + peers.peers_needing_send.insert(peer_descriptor.clone()); } } } @@ -411,10 +417,14 @@ impl PeerManager { () => { match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) { hash_map::Entry::Occupied(_) => { + log_trace!(self, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap())); peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event return Err(PeerHandleError{ no_connection_possible: false }) }, - hash_map::Entry::Vacant(entry) => entry.insert(peer_descriptor.clone()), + hash_map::Entry::Vacant(entry) => { + log_trace!(self, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap())); + entry.insert(peer_descriptor.clone()) + }, }; } } @@ -471,6 +481,7 @@ impl PeerManager { log_trace!(self, "Received message of type {} from {}", msg_type, log_pubkey!(peer.their_node_id.unwrap())); if msg_type != 16 && peer.their_global_features.is_none() { // Need an init message as first message + log_trace!(self, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); return Err(PeerHandleError{ no_connection_possible: false }); } let mut reader = ::std::io::Cursor::new(&msg_data[2..]); @@ -520,9 +531,7 @@ impl PeerManager { }, 16); } - for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) { - encode_and_send_msg!(msg, 136); - } + self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()); }, 17 => { let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader)); @@ -672,13 +681,12 @@ impl PeerManager { pause_read }; - self.process_events(); - Ok(pause_read) } - /// Checks for any events generated by our handlers and processes them. May be needed after eg - /// calls to ChannelManager::process_pending_htlc_forward. + /// Checks for any events generated by our handlers and processes them. Includes sending most + /// response messages as well as messages generated by calls to handler functions directly (eg + /// functions like ChannelManager::process_pending_htlc_forward or send_payment). pub fn process_events(&self) { { // TODO: There are some DoS attacks here where you can flood someone's outbound send @@ -686,7 +694,8 @@ impl PeerManager { // drop optional-ish messages when send buffers get full! let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); - let mut peers = self.peers.lock().unwrap(); + let mut peers_lock = self.peers.lock().unwrap(); + let peers = peers_lock.borrow_parts(); for event in events_generated.drain(..) { macro_rules! get_peer_for_forwarding { ($node_id: expr, $handle_no_such_peer: block) => { @@ -834,6 +843,16 @@ impl PeerManager { peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38))); Self::do_attempt_write_data(&mut descriptor, peer); }, + MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { + log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: Do whatever we're gonna do for handling dropped messages + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() { @@ -880,6 +899,7 @@ impl PeerManager { match *action { msgs::ErrorAction::DisconnectPeer { ref msg } => { if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) { + peers.peers_needing_send.remove(&descriptor); if let Some(mut peer) = peers.peers.remove(&descriptor) { if let Some(ref msg) = *msg { log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", @@ -915,6 +935,13 @@ impl PeerManager { } } } + + for mut descriptor in peers.peers_needing_send.drain() { + match peers.peers.get_mut(&descriptor) { + Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer), + None => panic!("Inconsistent peers set state!"), + } + } } } @@ -930,6 +957,7 @@ impl PeerManager { fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { let mut peers = self.peers.lock().unwrap(); + peers.peers_needing_send.remove(descriptor); let peer_option = peers.peers.remove(descriptor); match peer_option { None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),