From: Matt Corallo Date: Fri, 30 Jul 2021 18:03:28 +0000 (+0000) Subject: Process messages from peers in parallel in `PeerManager`. X-Git-Tag: v0.0.107~31^2~14 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=7c8b0986984e0882229c239ef2d3b621a61f7d35;p=rust-lightning Process messages from peers in parallel in `PeerManager`. This adds the required locking to process messages from different peers simultaneously in `PeerManager`. Note that channel messages are still processed under a global lock in `ChannelManager`, and most work is still processed under a global lock in gossip message handling, but parallelizing message deserialization and message decryption is somewhat helpful. --- diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 07300fdf6..dd6eda63f 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -33,7 +33,7 @@ use routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; use prelude::*; use io; use alloc::collections::LinkedList; -use sync::{Arc, Mutex}; +use sync::{Arc, Mutex, RwLock}; use core::{cmp, hash, fmt, mem}; use core::ops::Deref; use core::convert::Infallible; @@ -376,9 +376,7 @@ impl Peer { } struct PeerHolder { - peers: HashMap, - /// Only add to this set when noise completes: - node_id_to_descriptor: HashMap, + peers: HashMap>, } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -425,7 +423,12 @@ pub struct PeerManager, - peers: Mutex>, + peers: RwLock>, + /// Only add to this set when noise completes. + /// Locked *after* peers. When an item is removed, it must be removed with the `peers` write + /// lock held. Entries may be added with only the `peers` read lock held (though the + /// `Descriptor` value must already exist in `peers`). + node_id_to_descriptor: Mutex>, our_node_secret: SecretKey, ephemeral_key_midstate: Sha256Engine, custom_message_handler: CMH, @@ -553,10 +556,10 @@ impl P PeerManager { message_handler, - peers: Mutex::new(PeerHolder { + peers: RwLock::new(PeerHolder { peers: HashMap::new(), - node_id_to_descriptor: HashMap::new() }), + node_id_to_descriptor: Mutex::new(HashMap::new()), our_node_secret, ephemeral_key_midstate, peer_counter: AtomicCounter::new(), @@ -571,8 +574,9 @@ impl P /// new_outbound_connection, however entries will only appear once the initial handshake has /// completed and we are sure the remote peer has the private key for the given node_id. pub fn get_peer_node_ids(&self) -> Vec { - let peers = self.peers.lock().unwrap(); - peers.peers.values().filter_map(|p| { + let peers = self.peers.read().unwrap(); + peers.peers.values().filter_map(|peer_mutex| { + let p = peer_mutex.lock().unwrap(); if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() { return None; } @@ -608,8 +612,8 @@ impl P let res = peer_encryptor.get_act_one().to_vec(); let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes - let mut peers = self.peers.lock().unwrap(); - if peers.peers.insert(descriptor, Peer { + let mut peers = self.peers.write().unwrap(); + if peers.peers.insert(descriptor, Mutex::new(Peer { channel_encryptor: peer_encryptor, their_node_id: None, their_features: None, @@ -629,7 +633,7 @@ impl P awaiting_pong_timer_tick_intervals: 0, received_message_since_timer_tick: false, sent_gossip_timestamp_filter: false, - }).is_some() { + })).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; Ok(res) @@ -655,8 +659,8 @@ impl P let peer_encryptor = PeerChannelEncryptor::new_inbound(&self.our_node_secret); let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes - let mut peers = self.peers.lock().unwrap(); - if peers.peers.insert(descriptor, Peer { + let mut peers = self.peers.write().unwrap(); + if peers.peers.insert(descriptor, Mutex::new(Peer { channel_encryptor: peer_encryptor, their_node_id: None, their_features: None, @@ -676,7 +680,7 @@ impl P awaiting_pong_timer_tick_intervals: 0, received_message_since_timer_tick: false, sent_gossip_timestamp_filter: false, - }).is_some() { + })).is_some() { panic!("PeerManager driver duplicated descriptors!"); }; Ok(()) @@ -766,17 +770,18 @@ impl P /// [`send_data`]: SocketDescriptor::send_data /// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> { - let mut peers = self.peers.lock().unwrap(); - match peers.peers.get_mut(descriptor) { + let peers = self.peers.read().unwrap(); + match peers.peers.get(descriptor) { None => { // This is most likely a simple race condition where the user found that the socket // was writeable, then we told the user to `disconnect_socket()`, then they called // this method. Return an error to make sure we get disconnected. return Err(PeerHandleError { no_connection_possible: false }); }, - Some(peer) => { + Some(peer_mutex) => { + let mut peer = peer_mutex.lock().unwrap(); peer.awaiting_write_event = false; - self.do_attempt_write_data(descriptor, peer); + self.do_attempt_write_data(descriptor, &mut peer); } }; Ok(()) @@ -828,18 +833,20 @@ impl P fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result { let pause_read = { - let mut peers_lock = self.peers.lock().unwrap(); - let peers = &mut *peers_lock; + let peers = self.peers.read().unwrap(); let mut msgs_to_forward = Vec::new(); let mut peer_node_id = None; - let pause_read = match peers.peers.get_mut(peer_descriptor) { + let pause_read = match peers.peers.get(peer_descriptor) { None => { // This is most likely a simple race condition where the user read some bytes // from the socket, then we told the user to `disconnect_socket()`, then they // called this method. Return an error to make sure we get disconnected. return Err(PeerHandleError { no_connection_possible: false }); }, - Some(peer) => { + Some(peer_mutex) => { + let mut peer_lock = peer_mutex.lock().unwrap(); + let peer = &mut *peer_lock; + assert!(peer.pending_read_buffer.len() > 0); assert!(peer.pending_read_buffer.len() > peer.pending_read_buffer_pos); @@ -893,7 +900,7 @@ impl P macro_rules! insert_node_id { () => { - match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) { + match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap()) { hash_map::Entry::Occupied(_) => { log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap())); peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event @@ -1023,7 +1030,7 @@ impl P }; for msg in msgs_to_forward.drain(..) { - self.forward_broadcast_msg(peers, &msg, peer_node_id.as_ref()); + self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref()); } pause_read @@ -1242,13 +1249,14 @@ impl P Ok(should_forward) } - fn forward_broadcast_msg(&self, peers: &mut PeerHolder, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { + fn forward_broadcast_msg(&self, peers: &PeerHolder, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { match msg { wire::Message::ChannelAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer) in peers.peers.iter_mut() { + for (_, peer_mutex) in peers.peers.iter() { + let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue @@ -1266,14 +1274,15 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(peer, &encoded_msg); + self.enqueue_encoded_message(&mut *peer, &encoded_msg); } }, wire::Message::NodeAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer) in peers.peers.iter_mut() { + for (_, peer_mutex) in peers.peers.iter() { + let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_node_announcement(msg.contents.node_id) { continue @@ -1290,14 +1299,15 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(peer, &encoded_msg); + self.enqueue_encoded_message(&mut *peer, &encoded_msg); } }, wire::Message::ChannelUpdate(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer) in peers.peers.iter_mut() { + for (_, peer_mutex) in peers.peers.iter() { + let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue @@ -1311,7 +1321,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(peer, &encoded_msg); + self.enqueue_encoded_message(&mut *peer, &encoded_msg); } }, _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), @@ -1337,20 +1347,21 @@ impl P // buffer by doing things like announcing channels on another node. We should be willing to // drop optional-ish messages when send buffers get full! - let mut peers_lock = self.peers.lock().unwrap(); + let mut peers_lock = self.peers.write().unwrap(); let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); let peers = &mut *peers_lock; macro_rules! get_peer_for_forwarding { ($node_id: expr) => { { - match peers.node_id_to_descriptor.get($node_id) { + match self.node_id_to_descriptor.lock().unwrap().get($node_id) { Some(descriptor) => match peers.peers.get_mut(&descriptor) { - Some(peer) => { - if peer.their_features.is_none() { + Some(peer_mutex) => { + let peer_lock = peer_mutex.lock().unwrap(); + if peer_lock.their_features.is_none() { continue; } - peer + peer_lock }, None => panic!("Inconsistent peers set state!"), }, @@ -1367,13 +1378,13 @@ impl P log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.temporary_channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", @@ -1382,25 +1393,25 @@ impl P log_funding_channel_id!(msg.funding_txid, msg.funding_output_index)); // TODO: If the peer is gone we should generate a DiscardFunding event // indicating to the wallet that they should just throw away this funding transaction - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", @@ -1409,47 +1420,47 @@ impl P update_fulfill_htlcs.len(), update_fail_htlcs.len(), log_bytes!(commitment_signed.channel_id)); - let peer = get_peer_for_forwarding!(node_id); + let mut peer = get_peer_for_forwarding!(node_id); for msg in update_add_htlcs { - self.enqueue_message(peer, msg); + self.enqueue_message(&mut *peer, msg); } for msg in update_fulfill_htlcs { - self.enqueue_message(peer, msg); + self.enqueue_message(&mut *peer, msg); } for msg in update_fail_htlcs { - self.enqueue_message(peer, msg); + self.enqueue_message(&mut *peer, msg); } for msg in update_fail_malformed_htlcs { - self.enqueue_message(peer, msg); + self.enqueue_message(&mut *peer, msg); } if let &Some(ref msg) = update_fee { - self.enqueue_message(peer, msg); + self.enqueue_message(&mut *peer, msg); } - self.enqueue_message(peer, commitment_signed); + self.enqueue_message(&mut *peer, commitment_signed); }, MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", log_pubkey!(node_id), log_bytes!(msg.channel_id)); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); @@ -1483,22 +1494,26 @@ impl P MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", log_pubkey!(node_id), msg.contents.short_channel_id); - let peer = get_peer_for_forwarding!(node_id); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::HandleError { ref node_id, ref action } => { match *action { msgs::ErrorAction::DisconnectPeer { ref msg } => { - if let Some(mut descriptor) = peers.node_id_to_descriptor.remove(node_id) { - if let Some(mut peer) = peers.peers.remove(&descriptor) { + // Note that since we are holding the peers *write* lock we can + // remove from node_id_to_descriptor immediately (as no other + // thread can be holding the peer lock if we have the global write + // lock). + if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(node_id) { + if let Some(peer_mutex) = peers.peers.remove(&descriptor) { if let Some(ref msg) = *msg { log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); - self.enqueue_message(&mut peer, msg); + let mut peer = peer_mutex.lock().unwrap(); + self.enqueue_message(&mut *peer, msg); // This isn't guaranteed to work, but if there is enough free // room in the send buffer, put the error message there... - self.do_attempt_write_data(&mut descriptor, &mut peer); + self.do_attempt_write_data(&mut descriptor, &mut *peer); } else { log_gossip!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id)); } @@ -1518,21 +1533,21 @@ impl P log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => { log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, } }, MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); }, MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", @@ -1541,20 +1556,20 @@ impl P msg.first_blocknum, msg.number_of_blocks, msg.sync_complete); - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { - self.enqueue_message(get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); } } } for (node_id, msg) in self.custom_message_handler.get_and_clear_pending_msg() { - self.enqueue_message(get_peer_for_forwarding!(&node_id), &msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); } - for (descriptor, peer) in peers.peers.iter_mut() { - self.do_attempt_write_data(&mut (*descriptor).clone(), peer); + for (descriptor, peer_mutex) in peers.peers.iter_mut() { + self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap()); } } } @@ -1565,7 +1580,7 @@ impl P } fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { - let mut peers = self.peers.lock().unwrap(); + let mut peers = self.peers.write().unwrap(); let peer_option = peers.peers.remove(descriptor); match peer_option { None => { @@ -1573,13 +1588,14 @@ impl P // was disconnected, then we told the user to `disconnect_socket()`, then they // called this method. Either way we're disconnected, return. }, - Some(peer) => { + Some(peer_lock) => { + let peer = peer_lock.lock().unwrap(); match peer.their_node_id { Some(node_id) => { log_trace!(self.logger, "Handling disconnection of peer {}, with {}future connection to the peer possible.", log_pubkey!(node_id), if no_connection_possible { "no " } else { "" }); - peers.node_id_to_descriptor.remove(&node_id); + self.node_id_to_descriptor.lock().unwrap().remove(&node_id); self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); }, None => {} @@ -1598,8 +1614,8 @@ impl P /// /// [`disconnect_socket`]: SocketDescriptor::disconnect_socket pub fn disconnect_by_node_id(&self, node_id: PublicKey, no_connection_possible: bool) { - let mut peers_lock = self.peers.lock().unwrap(); - if let Some(mut descriptor) = peers_lock.node_id_to_descriptor.remove(&node_id) { + let mut peers_lock = self.peers.write().unwrap(); + if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id); peers_lock.peers.remove(&descriptor); self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); @@ -1611,17 +1627,16 @@ impl P /// an indication that TCP sockets have stalled even if we weren't around to time them out /// using regular ping/pongs. pub fn disconnect_all_peers(&self) { - let mut peers_lock = self.peers.lock().unwrap(); + let mut peers_lock = self.peers.write().unwrap(); + self.node_id_to_descriptor.lock().unwrap().clear(); let peers = &mut *peers_lock; for (mut descriptor, peer) in peers.peers.drain() { - if let Some(node_id) = peer.their_node_id { + if let Some(node_id) = peer.lock().unwrap().their_node_id { log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id); - peers.node_id_to_descriptor.remove(&node_id); self.message_handler.chan_handler.peer_disconnected(&node_id, false); } descriptor.disconnect_socket(); } - debug_assert!(peers.node_id_to_descriptor.is_empty()); } /// This is called when we're blocked on sending additional gossip messages until we receive a @@ -1650,15 +1665,13 @@ impl P /// /// [`send_data`]: SocketDescriptor::send_data pub fn timer_tick_occurred(&self) { - let mut peers_lock = self.peers.lock().unwrap(); + let mut peers_lock = self.peers.write().unwrap(); { - let peers = &mut *peers_lock; - let node_id_to_descriptor = &mut peers.node_id_to_descriptor; - let peers = &mut peers.peers; let mut descriptors_needing_disconnect = Vec::new(); - let peer_count = peers.len(); + let peer_count = peers_lock.peers.len(); - peers.retain(|descriptor, peer| { + peers_lock.peers.retain(|descriptor, peer_mutex| { + let mut peer = peer_mutex.lock().unwrap(); let mut do_disconnect_peer = false; if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() { // The peer needs to complete its handshake before we can exchange messages. We @@ -1689,7 +1702,7 @@ impl P match peer.their_node_id { Some(node_id) => { log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id); - node_id_to_descriptor.remove(&node_id); + self.node_id_to_descriptor.lock().unwrap().remove(&node_id); self.message_handler.chan_handler.peer_disconnected(&node_id, false); } None => {}, @@ -1708,7 +1721,7 @@ impl P ponglen: 0, byteslen: 64, }; - self.enqueue_message(peer, &ping); + self.enqueue_message(&mut *peer, &ping); self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer); true @@ -1834,7 +1847,7 @@ mod tests { let chan_handler = test_utils::TestChannelMessageHandler::new(); let mut peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); let secp_ctx = Secp256k1::new(); let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); @@ -1847,7 +1860,7 @@ mod tests { peers[0].message_handler.chan_handler = &chan_handler; peers[0].process_events(); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); } #[test] @@ -1856,17 +1869,17 @@ mod tests { let cfgs = create_peermgr_cfgs(2); let peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); // peers[0] awaiting_pong is set to true, but the Peer is still connected peers[0].timer_tick_occurred(); peers[0].process_events(); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected peers[0].timer_tick_occurred(); peers[0].process_events(); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); } #[test] @@ -1928,9 +1941,9 @@ mod tests { peers[0].new_inbound_connection(fd_a.clone(), None).unwrap(); // If we get a single timer tick before completion, that's fine - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); peers[0].timer_tick_occurred(); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false); peers[0].process_events(); @@ -1939,7 +1952,7 @@ mod tests { // ...but if we get a second timer tick, we should disconnect the peer peers[0].timer_tick_occurred(); - assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err()); }