use util::events::{EventsProvider,Event};
use util::logger::Logger;
-use std::collections::{HashMap,LinkedList};
+use std::collections::{HashMap,hash_map,LinkedList};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{cmp,error,mem,hash,fmt};
/// Only add to this set when noise completes:
node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
}
+struct MutPeerHolder<'a, Descriptor: SocketDescriptor + 'a> {
+ peers: &'a mut HashMap<Descriptor, Peer>,
+ node_id_to_descriptor: &'a mut HashMap<PublicKey, Descriptor>,
+}
+impl<Descriptor: SocketDescriptor> PeerHolder<Descriptor> {
+ fn borrow_parts(&mut self) -> MutPeerHolder<Descriptor> {
+ MutPeerHolder {
+ peers: &mut self.peers,
+ node_id_to_descriptor: &mut self.node_id_to_descriptor,
+ }
+ }
+}
pub struct PeerManager<Descriptor: SocketDescriptor> {
message_handler: MessageHandler,
logger: Arc<Logger>,
}
-
macro_rules! encode_msg {
($msg: expr, $msg_code: expr) => {
{
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: Vec<u8>) -> Result<bool, PeerHandleError> {
let pause_read = {
- let mut peers = self.peers.lock().unwrap();
- let (should_insert_node_id, pause_read) = match peers.peers.get_mut(peer_descriptor) {
+ let mut peers_lock = self.peers.lock().unwrap();
+ let peers = peers_lock.borrow_parts();
+ let pause_read = match peers.peers.get_mut(peer_descriptor) {
None => panic!("Descriptor for read_event is not already known to PeerManager"),
Some(peer) => {
assert!(peer.pending_read_buffer.len() > 0);
assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos);
- let mut insert_node_id = None;
let mut read_pos = 0;
while read_pos < data.len() {
{
}
}
+ macro_rules! insert_node_id {
+ () => {
+ match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) {
+ hash_map::Entry::Occupied(_) => {
+ peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
+ return Err(PeerHandleError{ no_connection_possible: false })
+ },
+ hash_map::Entry::Vacant(entry) => entry.insert(peer_descriptor.clone()),
+ };
+ }
+ }
+
let next_step = peer.channel_encryptor.get_noise_step();
match next_step {
NextNoiseStep::ActOne => {
peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes
peer.pending_read_is_header = true;
- insert_node_id = Some(peer.their_node_id.unwrap());
+ insert_node_id!();
let mut local_features = msgs::LocalFeatures::new();
if self.initial_syncs_sent.load(Ordering::Acquire) < INITIAL_SYNCS_TO_SEND {
self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel);
peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes
peer.pending_read_is_header = true;
peer.their_node_id = Some(their_node_id);
- insert_node_id = Some(peer.their_node_id.unwrap());
+ insert_node_id!();
},
NextNoiseStep::NoiseComplete => {
if peer.pending_read_is_header {
Self::do_attempt_write_data(peer_descriptor, peer);
- (insert_node_id /* should_insert_node_id */, peer.pending_outbound_buffer.len() > 10) // pause_read
+ peer.pending_outbound_buffer.len() > 10 // pause_read
}
};
- match should_insert_node_id {
- Some(node_id) => { peers.node_id_to_descriptor.insert(node_id, peer_descriptor.clone()); },
- None => {}
- };
-
pause_read
};