X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fln%2Fpeer_handler.rs;h=d868b0758bd745a6e7ee53294e7127c7942d4a89;hb=f0490118b3dcbf3d52fd305ea777c0c9d6b6d7d0;hp=9a2cee9dd4931ec01cd579250edb917ff2c4ee8d;hpb=e382a7b4b3e5f3e59a9300b9d8a4d8bff06366fe;p=rust-lightning diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index 9a2cee9d..d868b075 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -15,7 +15,7 @@ use util::byte_utils; use util::events::{MessageSendEvent}; use util::logger::Logger; -use std::collections::{HashMap,hash_map,LinkedList}; +use std::collections::{HashMap,hash_map,HashSet,LinkedList}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{cmp,error,hash,fmt}; @@ -106,17 +106,22 @@ struct Peer { struct PeerHolder { peers: HashMap, + /// Added to by do_read_event for cases where we pushed a message onto the send buffer but + /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events() + peers_needing_send: HashSet, /// Only add to this set when noise completes: node_id_to_descriptor: HashMap, } struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> { peers: &'a mut HashMap, + peers_needing_send: &'a mut HashSet, node_id_to_descriptor: &'a mut HashMap, } impl PeerHolder { fn borrow_parts(&mut self) -> MutPeerHolder { MutPeerHolder { peers: &mut self.peers, + peers_needing_send: &mut self.peers_needing_send, node_id_to_descriptor: &mut self.node_id_to_descriptor, } } @@ -162,7 +167,11 @@ impl PeerManager { pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc) -> PeerManager { PeerManager { message_handler: message_handler, - peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }), + peers: Mutex::new(PeerHolder { + peers: HashMap::new(), + peers_needing_send: HashSet::new(), + node_id_to_descriptor: HashMap::new() + }), our_node_secret: our_node_secret, initial_syncs_sent: AtomicUsize::new(0), logger, @@ -188,7 +197,7 @@ impl PeerManager { /// Note that if an Err is returned here you MUST NOT call disconnect_event for the new /// descriptor but must disconnect the connection immediately. /// - /// Returns some bytes to send to the remote node. + /// Returns a small number of bytes to send to the remote node (currently always 50). /// /// Panics if descriptor is duplicative with some other descriptor which has not yet has a /// disconnect_event. @@ -298,16 +307,12 @@ impl PeerManager { /// /// May return an Err to indicate that the connection should be closed. /// - /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into - /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The - /// invariants around calling write_event in case a write did not fully complete must still - /// hold. Note that this function will often call send_data on many peers before returning, not - /// just this peer! + /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity. + /// Thus, however, you almost certainly want to call process_events() after any read_event to + /// generate send_data calls to handle responses. /// /// If Ok(true) is returned, further read_events should not be triggered until a write_event on - /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note - /// that this must be true even if a send_data call with resume_read=true was made during the - /// course of this function! + /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). /// /// Panics if the descriptor was not previously registered in a new_*_connection event. pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec) -> Result { @@ -347,6 +352,7 @@ impl PeerManager { { log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap())); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..])); + peers.peers_needing_send.insert(peer_descriptor.clone()); } } } @@ -520,9 +526,7 @@ impl PeerManager { }, 16); } - for msg in self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()) { - encode_and_send_msg!(msg, 136); - } + self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap()); }, 17 => { let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader)); @@ -581,20 +585,11 @@ impl PeerManager { 38 => { let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader)); - let resp_options = try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg)); - if let Some(resp) = resp_options.0 { - encode_and_send_msg!(resp, 38); - } - if let Some(resp) = resp_options.1 { - encode_and_send_msg!(resp, 39); - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg)); }, 39 => { let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader)); - let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg)); - if let Some(resp) = resp_option { - encode_and_send_msg!(resp, 39); - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg)); }, 128 => { @@ -616,33 +611,11 @@ impl PeerManager { 132 => { let msg = try_potential_decodeerror!(msgs::CommitmentSigned::read(&mut reader)); - let resps = try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg)); - encode_and_send_msg!(resps.0, 133); - if let Some(resp) = resps.1 { - encode_and_send_msg!(resp, 132); - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg)); }, 133 => { let msg = try_potential_decodeerror!(msgs::RevokeAndACK::read(&mut reader)); - let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg)); - match resp_option { - Some(resps) => { - for resp in resps.update_add_htlcs { - encode_and_send_msg!(resp, 128); - } - for resp in resps.update_fulfill_htlcs { - encode_and_send_msg!(resp, 130); - } - for resp in resps.update_fail_htlcs { - encode_and_send_msg!(resp, 131); - } - if let Some(resp) = resps.update_fee { - encode_and_send_msg!(resp, 134); - } - encode_and_send_msg!(resps.commitment_signed, 132); - }, - None => {}, - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg)); }, 134 => { let msg = try_potential_decodeerror!(msgs::UpdateFee::read(&mut reader)); @@ -650,45 +623,7 @@ impl PeerManager { }, 136 => { let msg = try_potential_decodeerror!(msgs::ChannelReestablish::read(&mut reader)); - let (funding_locked, revoke_and_ack, commitment_update, order) = try_potential_handleerror!(self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg)); - if let Some(lock_msg) = funding_locked { - encode_and_send_msg!(lock_msg, 36); - } - macro_rules! handle_raa { () => { - if let Some(revoke_msg) = revoke_and_ack { - encode_and_send_msg!(revoke_msg, 133); - } - } } - macro_rules! handle_cu { () => { - match commitment_update { - Some(resps) => { - for resp in resps.update_add_htlcs { - encode_and_send_msg!(resp, 128); - } - for resp in resps.update_fulfill_htlcs { - encode_and_send_msg!(resp, 130); - } - for resp in resps.update_fail_htlcs { - encode_and_send_msg!(resp, 131); - } - if let Some(resp) = resps.update_fee { - encode_and_send_msg!(resp, 134); - } - encode_and_send_msg!(resps.commitment_signed, 132); - }, - None => {}, - } - } } - match order { - msgs::RAACommitmentOrder::RevokeAndACKFirst => { - handle_raa!(); - handle_cu!(); - }, - msgs::RAACommitmentOrder::CommitmentFirst => { - handle_cu!(); - handle_raa!(); - }, - } + try_potential_handleerror!(self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg)); }, // Routing control: @@ -741,13 +676,12 @@ impl PeerManager { pause_read }; - self.process_events(); - Ok(pause_read) } - /// Checks for any events generated by our handlers and processes them. May be needed after eg - /// calls to ChannelManager::process_pending_htlc_forward. + /// Checks for any events generated by our handlers and processes them. Includes sending most + /// response messages as well as messages generated by calls to handler functions directly (eg + /// functions like ChannelManager::process_pending_htlc_forward or send_payment). pub fn process_events(&self) { { // TODO: There are some DoS attacks here where you can flood someone's outbound send @@ -755,7 +689,8 @@ impl PeerManager { // drop optional-ish messages when send buffers get full! let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); - let mut peers = self.peers.lock().unwrap(); + let mut peers_lock = self.peers.lock().unwrap(); + let peers = peers_lock.borrow_parts(); for event in events_generated.drain(..) { macro_rules! get_peer_for_forwarding { ($node_id: expr, $handle_no_such_peer: block) => { @@ -883,6 +818,16 @@ impl PeerManager { peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133))); Self::do_attempt_write_data(&mut descriptor, peer); }, + MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { + log_trace!(self, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: Do whatever we're gonna do for handling dropped messages + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_trace!(self, "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), @@ -893,6 +838,16 @@ impl PeerManager { peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38))); Self::do_attempt_write_data(&mut descriptor, peer); }, + MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { + log_trace!(self, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + log_bytes!(msg.channel_id)); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { + //TODO: Do whatever we're gonna do for handling dropped messages + }); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136))); + Self::do_attempt_write_data(&mut descriptor, peer); + }, MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() { @@ -939,6 +894,7 @@ impl PeerManager { match *action { msgs::ErrorAction::DisconnectPeer { ref msg } => { if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) { + peers.peers_needing_send.remove(&descriptor); if let Some(mut peer) = peers.peers.remove(&descriptor) { if let Some(ref msg) = *msg { log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", @@ -974,6 +930,13 @@ impl PeerManager { } } } + + for mut descriptor in peers.peers_needing_send.drain() { + match peers.peers.get_mut(&descriptor) { + Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer), + None => panic!("Inconsistent peers set state!"), + } + } } } @@ -989,6 +952,7 @@ impl PeerManager { fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { let mut peers = self.peers.lock().unwrap(); + peers.peers_needing_send.remove(descriptor); let peer_option = peers.peers.remove(descriptor); match peer_option { None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),