- if peers.insert(descriptor, Mutex::new(Peer {
- channel_encryptor: peer_encryptor,
- their_node_id: None,
- their_features: None,
- their_net_address: remote_network_address,
-
- pending_outbound_buffer: LinkedList::new(),
- pending_outbound_buffer_first_msg_offset: 0,
- gossip_broadcast_buffer: LinkedList::new(),
- awaiting_write_event: false,
-
- pending_read_buffer,
- pending_read_buffer_pos: 0,
- pending_read_is_header: false,
-
- sync_status: InitSyncTracker::NoSyncRequested,
-
- msgs_sent_since_pong: 0,
- awaiting_pong_timer_tick_intervals: 0,
- received_message_since_timer_tick: false,
- sent_gossip_timestamp_filter: false,
- })).is_some() {
- panic!("PeerManager driver duplicated descriptors!");
- };
- Ok(())
+ match peers.entry(descriptor) {
+ hash_map::Entry::Occupied(_) => {
+ debug_assert!(false, "PeerManager driver duplicated descriptors!");
+ Err(PeerHandleError {})
+ },
+ hash_map::Entry::Vacant(e) => {
+ e.insert(Mutex::new(Peer {
+ channel_encryptor: peer_encryptor,
+ their_node_id: None,
+ their_features: None,
+ their_net_address: remote_network_address,
+
+ pending_outbound_buffer: LinkedList::new(),
+ pending_outbound_buffer_first_msg_offset: 0,
+ gossip_broadcast_buffer: LinkedList::new(),
+ awaiting_write_event: false,
+
+ pending_read_buffer,
+ pending_read_buffer_pos: 0,
+ pending_read_is_header: false,
+
+ sync_status: InitSyncTracker::NoSyncRequested,
+
+ msgs_sent_since_pong: 0,
+ awaiting_pong_timer_tick_intervals: 0,
+ received_message_since_timer_tick: false,
+ sent_gossip_timestamp_filter: false,
+
+ received_channel_announce_since_backlogged: false,
+ inbound_connection: true,
+ }));
+ Ok(())
+ }
+ }
+ }
+
+ fn peer_should_read(&self, peer: &mut Peer) -> bool {
+ peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed))
+ }
+
+ fn update_gossip_backlogged(&self) {
+ let new_state = self.message_handler.route_handler.processing_queue_high();
+ let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed);
+ if prev_state && !new_state {
+ self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed);
+ }