use util::events::{MessageSendEvent};
use util::logger::Logger;
-use std::collections::{HashMap,hash_map,LinkedList};
+use std::collections::{HashMap,hash_map,HashSet,LinkedList};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{cmp,error,hash,fmt};
struct PeerHolder<Descriptor: SocketDescriptor> {
peers: HashMap<Descriptor, Peer>,
+ /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
+ /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
+ peers_needing_send: HashSet<Descriptor>,
/// Only add to this set when noise completes:
node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
}
struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> {
peers: &'a mut HashMap<Descriptor, Peer>,
+ peers_needing_send: &'a mut HashSet<Descriptor>,
node_id_to_descriptor: &'a mut HashMap<PublicKey, Descriptor>,
}
impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
fn borrow_parts(&mut self) -> MutPeerHolder<Descriptor> {
MutPeerHolder {
peers: &mut self.peers,
+ peers_needing_send: &mut self.peers_needing_send,
node_id_to_descriptor: &mut self.node_id_to_descriptor,
}
}
pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc<Logger>) -> PeerManager<Descriptor> {
PeerManager {
message_handler: message_handler,
- peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }),
+ peers: Mutex::new(PeerHolder {
+ peers: HashMap::new(),
+ peers_needing_send: HashSet::new(),
+ node_id_to_descriptor: HashMap::new()
+ }),
our_node_secret: our_node_secret,
initial_syncs_sent: AtomicUsize::new(0),
logger,
/// Note that if an Err is returned here you MUST NOT call disconnect_event for the new
/// descriptor but must disconnect the connection immediately.
///
- /// Returns some bytes to send to the remote node.
+ /// Returns a small number of bytes to send to the remote node (currently always 50).
///
/// Panics if descriptor is duplicative with some other descriptor which has not yet has a
/// disconnect_event.
///
/// May return an Err to indicate that the connection should be closed.
///
- /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into
- /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The
- /// invariants around calling write_event in case a write did not fully complete must still
- /// hold. Note that this function will often call send_data on many peers before returning, not
- /// just this peer!
+ /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity.
+ /// Thus, however, you almost certainly want to call process_events() after any read_event to
+ /// generate send_data calls to handle responses.
///
/// If Ok(true) is returned, further read_events should not be triggered until a write_event on
- /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note
- /// that this must be true even if a send_data call with resume_read=true was made during the
- /// course of this function!
+ /// this file descriptor has resume_read set (preventing DoS issues in the send buffer).
///
/// Panics if the descriptor was not previously registered in a new_*_connection event.
pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
{
log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
+ peers.peers_needing_send.insert(peer_descriptor.clone());
}
}
}
pause_read
};
- self.process_events();
-
Ok(pause_read)
}
- /// Checks for any events generated by our handlers and processes them. May be needed after eg
- /// calls to ChannelManager::process_pending_htlc_forward.
+ /// Checks for any events generated by our handlers and processes them. Includes sending most
+ /// response messages as well as messages generated by calls to handler functions directly (eg
+ /// functions like ChannelManager::process_pending_htlc_forward or send_payment).
pub fn process_events(&self) {
{
// TODO: There are some DoS attacks here where you can flood someone's outbound send
// drop optional-ish messages when send buffers get full!
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
- let mut peers = self.peers.lock().unwrap();
+ let mut peers_lock = self.peers.lock().unwrap();
+ let peers = peers_lock.borrow_parts();
for event in events_generated.drain(..) {
macro_rules! get_peer_for_forwarding {
($node_id: expr, $handle_no_such_peer: block) => {
match *action {
msgs::ErrorAction::DisconnectPeer { ref msg } => {
if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) {
+ peers.peers_needing_send.remove(&descriptor);
if let Some(mut peer) = peers.peers.remove(&descriptor) {
if let Some(ref msg) = *msg {
log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
}
}
}
+
+ for mut descriptor in peers.peers_needing_send.drain() {
+ match peers.peers.get_mut(&descriptor) {
+ Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer),
+ None => panic!("Inconsistent peers set state!"),
+ }
+ }
}
}
fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
let mut peers = self.peers.lock().unwrap();
+ peers.peers_needing_send.remove(descriptor);
let peer_option = peers.peers.remove(descriptor);
match peer_option {
None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),