From 96fc0f3453388256145f9ff154554aedd69207d2 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 25 Feb 2022 21:57:57 +0000 Subject: [PATCH] Drop `PeerHolder` as it now only has one field --- lightning/src/ln/peer_handler.rs | 74 ++++++++++++++++---------------- 1 file changed, 36 insertions(+), 38 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index f771cf4ff..08bfa919d 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -376,14 +376,6 @@ impl Peer { } } -struct PeerHolder { - /// Peer is under its own mutex for sending and receiving bytes, but note that we do *not* hold - /// this mutex while we're processing a message. This is fine as [`PeerManager::read_event`] - /// requires that there be no parallel calls for a given peer, so mutual exclusion of messages - /// handed to the `MessageHandler`s for a given peer is already guaranteed. - peers: HashMap>, -} - /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. /// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static /// lifetimes). Other times you can afford a reference, which is more efficient, in which case @@ -428,7 +420,15 @@ pub struct PeerManager, - peers: FairRwLock>, + /// Connection state for each connected peer - we have an outer read-write lock which is taken + /// as read while we're doing processing for a peer and taken write when a peer is being added + /// or removed. + /// + /// The inner Peer lock is held for sending and receiving bytes, but note that we do *not* hold + /// it while we're processing a message. This is fine as [`PeerManager::read_event`] requires + /// that there be no parallel calls for a given peer, so mutual exclusion of messages handed to + /// the `MessageHandler`s for a given peer is already guaranteed. + peers: FairRwLock>>, /// Only add to this set when noise completes. /// Locked *after* peers. When an item is removed, it must be removed with the `peers` write /// lock held. Entries may be added with only the `peers` read lock held (though the @@ -570,9 +570,7 @@ impl P PeerManager { message_handler, - peers: FairRwLock::new(PeerHolder { - peers: HashMap::new(), - }), + peers: FairRwLock::new(HashMap::new()), node_id_to_descriptor: Mutex::new(HashMap::new()), event_processing_lock: Mutex::new(()), blocked_event_processors: AtomicBool::new(false), @@ -591,7 +589,7 @@ impl P /// completed and we are sure the remote peer has the private key for the given node_id. pub fn get_peer_node_ids(&self) -> Vec { let peers = self.peers.read().unwrap(); - peers.peers.values().filter_map(|peer_mutex| { + peers.values().filter_map(|peer_mutex| { let p = peer_mutex.lock().unwrap(); if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() { return None; @@ -629,7 +627,7 @@ impl P let pending_read_buffer = [0; 50].to_vec(); // Noise act two is 50 bytes let mut peers = self.peers.write().unwrap(); - if peers.peers.insert(descriptor, Mutex::new(Peer { + if peers.insert(descriptor, Mutex::new(Peer { channel_encryptor: peer_encryptor, their_node_id: None, their_features: None, @@ -676,7 +674,7 @@ impl P let pending_read_buffer = [0; 50].to_vec(); // Noise act one is 50 bytes let mut peers = self.peers.write().unwrap(); - if peers.peers.insert(descriptor, Mutex::new(Peer { + if peers.insert(descriptor, Mutex::new(Peer { channel_encryptor: peer_encryptor, their_node_id: None, their_features: None, @@ -787,7 +785,7 @@ impl P /// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> { let peers = self.peers.read().unwrap(); - match peers.peers.get(descriptor) { + match peers.get(descriptor) { None => { // This is most likely a simple race condition where the user found that the socket // was writeable, then we told the user to `disconnect_socket()`, then they called @@ -852,7 +850,7 @@ impl P let peers = self.peers.read().unwrap(); let mut msgs_to_forward = Vec::new(); let mut peer_node_id = None; - match peers.peers.get(peer_descriptor) { + match peers.get(peer_descriptor) { None => { // This is most likely a simple race condition where the user read some bytes // from the socket, then we told the user to `disconnect_socket()`, then they @@ -1288,13 +1286,13 @@ impl P Ok(should_forward) } - fn forward_broadcast_msg(&self, peers: &PeerHolder, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { + fn forward_broadcast_msg(&self, peers: &HashMap>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { match msg { wire::Message::ChannelAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer_mutex) in peers.peers.iter() { + for (_, peer_mutex) in peers.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { @@ -1320,7 +1318,7 @@ impl P log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced node: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer_mutex) in peers.peers.iter() { + for (_, peer_mutex) in peers.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_node_announcement(msg.contents.node_id) { @@ -1345,7 +1343,7 @@ impl P log_gossip!(self.logger, "Sending message to all peers except {:?}: {:?}", except_node, msg); let encoded_msg = encode_msg!(msg); - for (_, peer_mutex) in peers.peers.iter() { + for (_, peer_mutex) in peers.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { @@ -1426,7 +1424,7 @@ impl P } let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); match descriptor_opt { - Some(descriptor) => match peers.peers.get(&descriptor) { + Some(descriptor) => match peers.get(&descriptor) { Some(peer_mutex) => { let peer_lock = peer_mutex.lock().unwrap(); if peer_lock.their_features.is_none() { @@ -1625,7 +1623,7 @@ impl P self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); } - for (descriptor, peer_mutex) in peers.peers.iter() { + for (descriptor, peer_mutex) in peers.iter() { self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap()); } } @@ -1639,7 +1637,7 @@ impl P // lock). if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { - if let Some(peer_mutex) = peers.peers.remove(&descriptor) { + if let Some(peer_mutex) = peers.remove(&descriptor) { if let Some(msg) = msg { log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), @@ -1667,7 +1665,7 @@ impl P fn disconnect_event_internal(&self, descriptor: &Descriptor, no_connection_possible: bool) { let mut peers = self.peers.write().unwrap(); - let peer_option = peers.peers.remove(descriptor); + let peer_option = peers.remove(descriptor); match peer_option { None => { // This is most likely a simple race condition where the user found that the socket @@ -1703,7 +1701,7 @@ impl P let mut peers_lock = self.peers.write().unwrap(); if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id); - peers_lock.peers.remove(&descriptor); + peers_lock.remove(&descriptor); self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); descriptor.disconnect_socket(); } @@ -1716,7 +1714,7 @@ impl P let mut peers_lock = self.peers.write().unwrap(); self.node_id_to_descriptor.lock().unwrap().clear(); let peers = &mut *peers_lock; - for (mut descriptor, peer) in peers.peers.drain() { + for (mut descriptor, peer) in peers.drain() { if let Some(node_id) = peer.lock().unwrap().their_node_id { log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id); self.message_handler.chan_handler.peer_disconnected(&node_id, false); @@ -1755,7 +1753,7 @@ impl P { let peers_lock = self.peers.read().unwrap(); - for (descriptor, peer_mutex) in peers_lock.peers.iter() { + for (descriptor, peer_mutex) in peers_lock.iter() { let mut peer = peer_mutex.lock().unwrap(); if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() { // The peer needs to complete its handshake before we can exchange messages. We @@ -1779,7 +1777,7 @@ impl P if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick) || peer.awaiting_pong_timer_tick_intervals as u64 > - MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.peers.len() as u64 + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 { descriptors_needing_disconnect.push(descriptor.clone()); continue; @@ -1805,7 +1803,7 @@ impl P { let mut peers_lock = self.peers.write().unwrap(); for descriptor in descriptors_needing_disconnect.iter() { - if let Some(peer) = peers_lock.peers.remove(&descriptor) { + if let Some(peer) = peers_lock.remove(descriptor) { if let Some(node_id) = peer.lock().unwrap().their_node_id { log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id); self.node_id_to_descriptor.lock().unwrap().remove(&node_id); @@ -1935,7 +1933,7 @@ mod tests { let chan_handler = test_utils::TestChannelMessageHandler::new(); let mut peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); let secp_ctx = Secp256k1::new(); let their_id = PublicKey::from_secret_key(&secp_ctx, &peers[1].our_node_secret); @@ -1948,7 +1946,7 @@ mod tests { peers[0].message_handler.chan_handler = &chan_handler; peers[0].process_events(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); } #[test] @@ -1957,17 +1955,17 @@ mod tests { let cfgs = create_peermgr_cfgs(2); let peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); // peers[0] awaiting_pong is set to true, but the Peer is still connected peers[0].timer_tick_occurred(); peers[0].process_events(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected peers[0].timer_tick_occurred(); peers[0].process_events(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); } #[test] @@ -2029,9 +2027,9 @@ mod tests { peers[0].new_inbound_connection(fd_a.clone(), None).unwrap(); // If we get a single timer tick before completion, that's fine - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); peers[0].timer_tick_occurred(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 1); + assert_eq!(peers[0].peers.read().unwrap().len(), 1); assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false); peers[0].process_events(); @@ -2040,7 +2038,7 @@ mod tests { // ...but if we get a second timer tick, we should disconnect the peer peers[0].timer_tick_occurred(); - assert_eq!(peers[0].peers.read().unwrap().peers.len(), 0); + assert_eq!(peers[0].peers.read().unwrap().len(), 0); assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err()); } -- 2.39.5