-use std::collections::{HashMap,hash_map,LinkedList};
+use std::collections::{HashMap,hash_map,HashSet,LinkedList};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{cmp,error,hash,fmt};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{cmp,error,hash,fmt};
struct PeerHolder<Descriptor: SocketDescriptor> {
peers: HashMap<Descriptor, Peer>,
struct PeerHolder<Descriptor: SocketDescriptor> {
peers: HashMap<Descriptor, Peer>,
/// Only add to this set when noise completes:
node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
}
struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> {
peers: &'a mut HashMap<Descriptor, Peer>,
/// Only add to this set when noise completes:
node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
}
struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> {
peers: &'a mut HashMap<Descriptor, Peer>,
node_id_to_descriptor: &'a mut HashMap<PublicKey, Descriptor>,
}
impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
fn borrow_parts(&mut self) -> MutPeerHolder<Descriptor> {
MutPeerHolder {
peers: &mut self.peers,
node_id_to_descriptor: &'a mut HashMap<PublicKey, Descriptor>,
}
impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
fn borrow_parts(&mut self) -> MutPeerHolder<Descriptor> {
MutPeerHolder {
peers: &mut self.peers,
pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc<Logger>) -> PeerManager<Descriptor> {
PeerManager {
message_handler: message_handler,
pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, logger: Arc<Logger>) -> PeerManager<Descriptor> {
PeerManager {
message_handler: message_handler,
- peers: Mutex::new(PeerHolder { peers: HashMap::new(), node_id_to_descriptor: HashMap::new() }),
+ peers: Mutex::new(PeerHolder {
+ peers: HashMap::new(),
+ peers_needing_send: HashSet::new(),
+ node_id_to_descriptor: HashMap::new()
+ }),
our_node_secret: our_node_secret,
initial_syncs_sent: AtomicUsize::new(0),
logger,
our_node_secret: our_node_secret,
initial_syncs_sent: AtomicUsize::new(0),
logger,
/// Note that if an Err is returned here you MUST NOT call disconnect_event for the new
/// descriptor but must disconnect the connection immediately.
///
/// Note that if an Err is returned here you MUST NOT call disconnect_event for the new
/// descriptor but must disconnect the connection immediately.
///
- /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into
- /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The
- /// invariants around calling write_event in case a write did not fully complete must still
- /// hold. Note that this function will often call send_data on many peers before returning, not
- /// just this peer!
+ /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity.
+ /// Thus, however, you almost certainly want to call process_events() after any read_event to
+ /// generate send_data calls to handle responses.
- /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note
- /// that this must be true even if a send_data call with resume_read=true was made during the
- /// course of this function!
+ /// this file descriptor has resume_read set (preventing DoS issues in the send buffer).
///
/// Panics if the descriptor was not previously registered in a new_*_connection event.
pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
///
/// Panics if the descriptor was not previously registered in a new_*_connection event.
pub fn read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
{
log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
{
log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
() => {
match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) {
hash_map::Entry::Occupied(_) => {
() => {
match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) {
hash_map::Entry::Occupied(_) => {
peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
return Err(PeerHandleError{ no_connection_possible: false })
},
peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
return Err(PeerHandleError{ no_connection_possible: false })
},
- hash_map::Entry::Vacant(entry) => entry.insert(peer_descriptor.clone()),
+ hash_map::Entry::Vacant(entry) => {
+ log_trace!(self, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap()));
+ entry.insert(peer_descriptor.clone())
+ },
log_trace!(self, "Received message of type {} from {}", msg_type, log_pubkey!(peer.their_node_id.unwrap()));
if msg_type != 16 && peer.their_global_features.is_none() {
// Need an init message as first message
log_trace!(self, "Received message of type {} from {}", msg_type, log_pubkey!(peer.their_node_id.unwrap()));
if msg_type != 16 && peer.their_global_features.is_none() {
// Need an init message as first message
return Err(PeerHandleError{ no_connection_possible: false });
}
let mut reader = ::std::io::Cursor::new(&msg_data[2..]);
return Err(PeerHandleError{ no_connection_possible: false });
}
let mut reader = ::std::io::Cursor::new(&msg_data[2..]);
- /// Checks for any events generated by our handlers and processes them. May be needed after eg
- /// calls to ChannelManager::process_pending_htlc_forward.
+ /// Checks for any events generated by our handlers and processes them. Includes sending most
+ /// response messages as well as messages generated by calls to handler functions directly (eg
+ /// functions like ChannelManager::process_pending_htlc_forward or send_payment).
pub fn process_events(&self) {
{
// TODO: There are some DoS attacks here where you can flood someone's outbound send
pub fn process_events(&self) {
{
// TODO: There are some DoS attacks here where you can flood someone's outbound send
// drop optional-ish messages when send buffers get full!
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
// drop optional-ish messages when send buffers get full!
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
for event in events_generated.drain(..) {
macro_rules! get_peer_for_forwarding {
($node_id: expr, $handle_no_such_peer: block) => {
for event in events_generated.drain(..) {
macro_rules! get_peer_for_forwarding {
($node_id: expr, $handle_no_such_peer: block) => {
match *action {
msgs::ErrorAction::DisconnectPeer { ref msg } => {
if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) {
match *action {
msgs::ErrorAction::DisconnectPeer { ref msg } => {
if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) {
if let Some(mut peer) = peers.peers.remove(&descriptor) {
if let Some(ref msg) = *msg {
log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
if let Some(mut peer) = peers.peers.remove(&descriptor) {
if let Some(ref msg) = *msg {
log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
+
+ for mut descriptor in peers.peers_needing_send.drain() {
+ match peers.peers.get_mut(&descriptor) {
+ Some(peer) => Self::do_attempt_write_data(&mut descriptor, peer),
+ None => panic!("Inconsistent peers set state!"),
+ }
+ }
fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
let mut peers = self.peers.lock().unwrap();
fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) {
let mut peers = self.peers.lock().unwrap();
let peer_option = peers.peers.remove(descriptor);
match peer_option {
None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),
let peer_option = peers.peers.remove(descriptor);
match peer_option {
None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),