]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Never block a thread on the `PeerManager` event handling lock 2023-05-event-deadlock
authorMatt Corallo <git@bluematt.me>
Tue, 9 May 2023 00:30:33 +0000 (00:30 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 24 May 2023 02:39:37 +0000 (02:39 +0000)
If thre's a thread currently handling `PeerManager` events, the
next thread which attempts to handle events will block on the first
and then handle events after the first completes. (later threads
will return immediately to avoid blocking more than one thread).

This works fine as long as the user has a spare thread to leave
blocked, but if they don't (e.g. are running with a single-threaded
tokio runtime) this can lead to a full deadlock.

Instead, here, we never block waiting on another event processing
thread, returning immediately after signaling that the first thread
should start over once its complete to ensure all events are
handled.

While this could lead to starvation as we cause one thread to go
around and around and around again, the risk of that should be
relatively low as event handling should be pretty quick, and it's
certainly better than deadlocking.

Fixes https://github.com/lightningdevkit/rapid-gossip-sync-server/issues/32

Atomic lock simplification suggestion from @andrei-21

lightning/src/ln/peer_handler.rs
lightning/src/util/test_utils.rs

index 0659412f774190ba7adee5ab599d3c4e0174cbb0..2fcfb713c23f5105762c48c3e42062aacb6960b0 100644 (file)
@@ -36,7 +36,7 @@ use crate::prelude::*;
 use crate::io;
 use alloc::collections::LinkedList;
 use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock};
-use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
+use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
 use core::{cmp, hash, fmt, mem};
 use core::ops::Deref;
 use core::convert::Infallible;
@@ -676,15 +676,18 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
        /// lock held. Entries may be added with only the `peers` read lock held (though the
        /// `Descriptor` value must already exist in `peers`).
        node_id_to_descriptor: Mutex<HashMap<PublicKey, Descriptor>>,
-       /// We can only have one thread processing events at once, but we don't usually need the full
-       /// `peers` write lock to do so, so instead we block on this empty mutex when entering
-       /// `process_events`.
-       event_processing_lock: Mutex<()>,
-       /// Because event processing is global and always does all available work before returning,
-       /// there is no reason for us to have many event processors waiting on the lock at once.
-       /// Instead, we limit the total blocked event processors to always exactly one by setting this
-       /// when an event process call is waiting.
-       blocked_event_processors: AtomicBool,
+       /// We can only have one thread processing events at once, but if a second call to
+       /// `process_events` happens while a first call is in progress, one of the two calls needs to
+       /// start from the top to ensure any new messages are also handled.
+       ///
+       /// Because the event handler calls into user code which may block, we don't want to block a
+       /// second thread waiting for another thread to handle events which is then blocked on user
+       /// code, so we store an atomic counter here:
+       ///  * 0 indicates no event processor is running
+       ///  * 1 indicates an event processor is running
+       ///  * > 1 indicates an event processor is running but needs to start again from the top once
+       ///        it finishes as another thread tried to start processing events but returned early.
+       event_processing_state: AtomicI32,
 
        /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
        /// value increases strictly since we don't assume access to a time source.
@@ -854,8 +857,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        message_handler,
                        peers: FairRwLock::new(HashMap::new()),
                        node_id_to_descriptor: Mutex::new(HashMap::new()),
-                       event_processing_lock: Mutex::new(()),
-                       blocked_event_processors: AtomicBool::new(false),
+                       event_processing_state: AtomicI32::new(0),
                        ephemeral_key_midstate,
                        peer_counter: AtomicCounter::new(),
                        gossip_processing_backlogged: AtomicBool::new(false),
@@ -1784,356 +1786,351 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
        /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
        /// [`send_data`]: SocketDescriptor::send_data
        pub fn process_events(&self) {
-               let mut _single_processor_lock = self.event_processing_lock.try_lock();
-               if _single_processor_lock.is_err() {
-                       // While we could wake the older sleeper here with a CV and make more even waiting
-                       // times, that would be a lot of overengineering for a simple "reduce total waiter
-                       // count" goal.
-                       match self.blocked_event_processors.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) {
-                               Err(val) => {
-                                       debug_assert!(val, "compare_exchange failed spuriously?");
-                                       return;
-                               },
-                               Ok(val) => {
-                                       debug_assert!(!val, "compare_exchange succeeded spuriously?");
-                                       // We're the only waiter, as the running process_events may have emptied the
-                                       // pending events "long" ago and there are new events for us to process, wait until
-                                       // its done and process any leftover events before returning.
-                                       _single_processor_lock = Ok(self.event_processing_lock.lock().unwrap());
-                                       self.blocked_event_processors.store(false, Ordering::Release);
-                               }
-                       }
+               if self.event_processing_state.fetch_add(1, Ordering::AcqRel) > 0 {
+                       // If we're not the first event processor to get here, just return early, the increment
+                       // we just did will be treated as "go around again" at the end.
+                       return;
                }
 
-               self.update_gossip_backlogged();
-               let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
-
-               let mut peers_to_disconnect = HashMap::new();
-               let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
-               events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
+               loop {
+                       self.update_gossip_backlogged();
+                       let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
 
-               {
-                       // TODO: There are some DoS attacks here where you can flood someone's outbound send
-                       // buffer by doing things like announcing channels on another node. We should be willing to
-                       // drop optional-ish messages when send buffers get full!
+                       let mut peers_to_disconnect = HashMap::new();
+                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+                       events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
 
-                       let peers_lock = self.peers.read().unwrap();
-                       let peers = &*peers_lock;
-                       macro_rules! get_peer_for_forwarding {
-                               ($node_id: expr) => {
-                                       {
-                                               if peers_to_disconnect.get($node_id).is_some() {
-                                                       // If we've "disconnected" this peer, do not send to it.
-                                                       continue;
-                                               }
-                                               let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
-                                               match descriptor_opt {
-                                                       Some(descriptor) => match peers.get(&descriptor) {
-                                                               Some(peer_mutex) => {
-                                                                       let peer_lock = peer_mutex.lock().unwrap();
-                                                                       if !peer_lock.handshake_complete() {
+                       {
+                               // TODO: There are some DoS attacks here where you can flood someone's outbound send
+                               // buffer by doing things like announcing channels on another node. We should be willing to
+                               // drop optional-ish messages when send buffers get full!
+
+                               let peers_lock = self.peers.read().unwrap();
+                               let peers = &*peers_lock;
+                               macro_rules! get_peer_for_forwarding {
+                                       ($node_id: expr) => {
+                                               {
+                                                       if peers_to_disconnect.get($node_id).is_some() {
+                                                               // If we've "disconnected" this peer, do not send to it.
+                                                               continue;
+                                                       }
+                                                       let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
+                                                       match descriptor_opt {
+                                                               Some(descriptor) => match peers.get(&descriptor) {
+                                                                       Some(peer_mutex) => {
+                                                                               let peer_lock = peer_mutex.lock().unwrap();
+                                                                               if !peer_lock.handshake_complete() {
+                                                                                       continue;
+                                                                               }
+                                                                               peer_lock
+                                                                       },
+                                                                       None => {
+                                                                               debug_assert!(false, "Inconsistent peers set state!");
                                                                                continue;
                                                                        }
-                                                                       peer_lock
                                                                },
                                                                None => {
-                                                                       debug_assert!(false, "Inconsistent peers set state!");
                                                                        continue;
-                                                               }
-                                                       },
-                                                       None => {
-                                                               continue;
-                                                       },
+                                                               },
+                                                       }
                                                }
                                        }
                                }
-                       }
-                       for event in events_generated.drain(..) {
-                               match event {
-                                       MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.temporary_channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.temporary_channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.temporary_channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.temporary_channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.temporary_channel_id),
-                                                               log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
-                                               // TODO: If the peer is gone we should generate a DiscardFunding event
-                                               // indicating to the wallet that they should just throw away this funding transaction
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
-                                               log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               update_add_htlcs.len(),
-                                                               update_fulfill_htlcs.len(),
-                                                               update_fail_htlcs.len(),
-                                                               log_bytes!(commitment_signed.channel_id));
-                                               let mut peer = get_peer_for_forwarding!(node_id);
-                                               for msg in update_add_htlcs {
-                                                       self.enqueue_message(&mut *peer, msg);
-                                               }
-                                               for msg in update_fulfill_htlcs {
-                                                       self.enqueue_message(&mut *peer, msg);
-                                               }
-                                               for msg in update_fail_htlcs {
-                                                       self.enqueue_message(&mut *peer, msg);
-                                               }
-                                               for msg in update_fail_malformed_htlcs {
-                                                       self.enqueue_message(&mut *peer, msg);
-                                               }
-                                               if let &Some(ref msg) = update_fee {
-                                                       self.enqueue_message(&mut *peer, msg);
-                                               }
-                                               self.enqueue_message(&mut *peer, commitment_signed);
-                                       },
-                                       MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
-                                               log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id),
-                                                               log_bytes!(msg.channel_id));
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
-                                               log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
-                                                               log_pubkey!(node_id),
-                                                               msg.contents.short_channel_id);
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg);
-                                       },
-                                       MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
-                                               log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
-                                               match self.message_handler.route_handler.handle_channel_announcement(&msg) {
-                                                       Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
-                                                               self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
-                                                       _ => {},
-                                               }
-                                               if let Some(msg) = update_msg {
+                               for event in events_generated.drain(..) {
+                                       match event {
+                                               MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.temporary_channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.temporary_channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.temporary_channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.temporary_channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.temporary_channel_id),
+                                                                       log_funding_channel_id!(msg.funding_txid, msg.funding_output_index));
+                                                       // TODO: If the peer is gone we should generate a DiscardFunding event
+                                                       // indicating to the wallet that they should just throw away this funding transaction
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendChannelReady event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
+                                                       log_debug!(self.logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       update_add_htlcs.len(),
+                                                                       update_fulfill_htlcs.len(),
+                                                                       update_fail_htlcs.len(),
+                                                                       log_bytes!(commitment_signed.channel_id));
+                                                       let mut peer = get_peer_for_forwarding!(node_id);
+                                                       for msg in update_add_htlcs {
+                                                               self.enqueue_message(&mut *peer, msg);
+                                                       }
+                                                       for msg in update_fulfill_htlcs {
+                                                               self.enqueue_message(&mut *peer, msg);
+                                                       }
+                                                       for msg in update_fail_htlcs {
+                                                               self.enqueue_message(&mut *peer, msg);
+                                                       }
+                                                       for msg in update_fail_malformed_htlcs {
+                                                               self.enqueue_message(&mut *peer, msg);
+                                                       }
+                                                       if let &Some(ref msg) = update_fee {
+                                                               self.enqueue_message(&mut *peer, msg);
+                                                       }
+                                                       self.enqueue_message(&mut *peer, commitment_signed);
+                                               },
+                                               MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling Shutdown event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
+                                                       log_debug!(self.logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       log_bytes!(msg.channel_id));
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
+                                                       log_debug!(self.logger, "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
+                                                                       log_pubkey!(node_id),
+                                                                       msg.contents.short_channel_id);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg);
+                                               },
+                                               MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
+                                                       log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
+                                                       match self.message_handler.route_handler.handle_channel_announcement(&msg) {
+                                                               Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
+                                                                       self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None),
+                                                               _ => {},
+                                                       }
+                                                       if let Some(msg) = update_msg {
+                                                               match self.message_handler.route_handler.handle_channel_update(&msg) {
+                                                                       Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
+                                                                               self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
+                                                                       _ => {},
+                                                               }
+                                                       }
+                                               },
+                                               MessageSendEvent::BroadcastChannelUpdate { msg } => {
+                                                       log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                        match self.message_handler.route_handler.handle_channel_update(&msg) {
                                                                Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
                                                                        self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
                                                                _ => {},
                                                        }
+                                               },
+                                               MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
+                                                       log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
+                                                       match self.message_handler.route_handler.handle_node_announcement(&msg) {
+                                                               Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
+                                                                       self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
+                                                               _ => {},
+                                                       }
+                                               },
+                                               MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
+                                                       log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id), msg.contents.short_channel_id);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::HandleError { ref node_id, ref action } => {
+                                                       match *action {
+                                                               msgs::ErrorAction::DisconnectPeer { ref msg } => {
+                                                                       // We do not have the peers write lock, so we just store that we're
+                                                                       // about to disconenct the peer and do it after we finish
+                                                                       // processing most messages.
+                                                                       peers_to_disconnect.insert(*node_id, msg.clone());
+                                                               },
+                                                               msgs::ErrorAction::IgnoreAndLog(level) => {
+                                                                       log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
+                                                               },
+                                                               msgs::ErrorAction::IgnoreDuplicateGossip => {},
+                                                               msgs::ErrorAction::IgnoreError => {
+                                                                       log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
+                                                               },
+                                                               msgs::ErrorAction::SendErrorMessage { ref msg } => {
+                                                                       log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
+                                                                                       log_pubkey!(node_id),
+                                                                                       msg.data);
+                                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                               },
+                                                               msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
+                                                                       log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
+                                                                                       log_pubkey!(node_id),
+                                                                                       msg.data);
+                                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                               },
+                                                       }
+                                               },
+                                               MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               },
+                                               MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                }
-                                       },
-                                       MessageSendEvent::BroadcastChannelUpdate { msg } => {
-                                               log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
-                                               match self.message_handler.route_handler.handle_channel_update(&msg) {
-                                                       Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
-                                                               self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None),
-                                                       _ => {},
+                                               MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
+                                                       log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
+                                                               log_pubkey!(node_id),
+                                                               msg.short_channel_ids.len(),
+                                                               msg.first_blocknum,
+                                                               msg.number_of_blocks,
+                                                               msg.sync_complete);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                }
-                                       },
-                                       MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
-                                               log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id);
-                                               match self.message_handler.route_handler.handle_node_announcement(&msg) {
-                                                       Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) =>
-                                                               self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None),
-                                                       _ => {},
+                                               MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                }
-                                       },
-                                       MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
-                                               log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
-                                                               log_pubkey!(node_id), msg.contents.short_channel_id);
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::HandleError { ref node_id, ref action } => {
-                                               match *action {
-                                                       msgs::ErrorAction::DisconnectPeer { ref msg } => {
-                                                               // We do not have the peers write lock, so we just store that we're
-                                                               // about to disconenct the peer and do it after we finish
-                                                               // processing most messages.
-                                                               peers_to_disconnect.insert(*node_id, msg.clone());
-                                                       },
-                                                       msgs::ErrorAction::IgnoreAndLog(level) => {
-                                                               log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
-                                                       },
-                                                       msgs::ErrorAction::IgnoreDuplicateGossip => {},
-                                                       msgs::ErrorAction::IgnoreError => {
-                                                               log_debug!(self.logger, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
-                                                       },
-                                                       msgs::ErrorAction::SendErrorMessage { ref msg } => {
-                                                               log_trace!(self.logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
-                                                                               log_pubkey!(node_id),
-                                                                               msg.data);
-                                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                                       },
-                                                       msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
-                                                               log_given_level!(self.logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
-                                                                               log_pubkey!(node_id),
-                                                                               msg.data);
-                                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                                       },
-                                               }
-                                       },
-                                       MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       },
-                                       MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       }
-                                       MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
-                                               log_gossip!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
-                                                       log_pubkey!(node_id),
-                                                       msg.short_channel_ids.len(),
-                                                       msg.first_blocknum,
-                                                       msg.number_of_blocks,
-                                                       msg.sync_complete);
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                       }
-                                       MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
-                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                        }
                                }
-                       }
 
-                       for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() {
-                               if peers_to_disconnect.get(&node_id).is_some() { continue; }
-                               self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
-                       }
+                               for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() {
+                                       if peers_to_disconnect.get(&node_id).is_some() { continue; }
+                                       self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
+                               }
 
-                       for (descriptor, peer_mutex) in peers.iter() {
-                               let mut peer = peer_mutex.lock().unwrap();
-                               if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
-                               self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled);
+                               for (descriptor, peer_mutex) in peers.iter() {
+                                       let mut peer = peer_mutex.lock().unwrap();
+                                       if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
+                                       self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled);
+                               }
                        }
-               }
-               if !peers_to_disconnect.is_empty() {
-                       let mut peers_lock = self.peers.write().unwrap();
-                       let peers = &mut *peers_lock;
-                       for (node_id, msg) in peers_to_disconnect.drain() {
-                               // Note that since we are holding the peers *write* lock we can
-                               // remove from node_id_to_descriptor immediately (as no other
-                               // thread can be holding the peer lock if we have the global write
-                               // lock).
-
-                               let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
-                               if let Some(mut descriptor) = descriptor_opt {
-                                       if let Some(peer_mutex) = peers.remove(&descriptor) {
-                                               let mut peer = peer_mutex.lock().unwrap();
-                                               if let Some(msg) = msg {
-                                                       log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
-                                                                       log_pubkey!(node_id),
-                                                                       msg.data);
-                                                       self.enqueue_message(&mut *peer, &msg);
-                                                       // This isn't guaranteed to work, but if there is enough free
-                                                       // room in the send buffer, put the error message there...
-                                                       self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
-                                               }
-                                               self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError");
-                                       } else { debug_assert!(false, "Missing connection for peer"); }
+                       if !peers_to_disconnect.is_empty() {
+                               let mut peers_lock = self.peers.write().unwrap();
+                               let peers = &mut *peers_lock;
+                               for (node_id, msg) in peers_to_disconnect.drain() {
+                                       // Note that since we are holding the peers *write* lock we can
+                                       // remove from node_id_to_descriptor immediately (as no other
+                                       // thread can be holding the peer lock if we have the global write
+                                       // lock).
+
+                                       let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
+                                       if let Some(mut descriptor) = descriptor_opt {
+                                               if let Some(peer_mutex) = peers.remove(&descriptor) {
+                                                       let mut peer = peer_mutex.lock().unwrap();
+                                                       if let Some(msg) = msg {
+                                                               log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
+                                                                               log_pubkey!(node_id),
+                                                                               msg.data);
+                                                               self.enqueue_message(&mut *peer, &msg);
+                                                               // This isn't guaranteed to work, but if there is enough free
+                                                               // room in the send buffer, put the error message there...
+                                                               self.do_attempt_write_data(&mut descriptor, &mut *peer, false);
+                                                       }
+                                                       self.do_disconnect(descriptor, &*peer, "DisconnectPeer HandleError");
+                                               } else { debug_assert!(false, "Missing connection for peer"); }
+                                       }
                                }
                        }
+
+                       if self.event_processing_state.fetch_sub(1, Ordering::AcqRel) != 1 {
+                               // If another thread incremented the state while we were running we should go
+                               // around again, but only once.
+                               self.event_processing_state.store(1, Ordering::Release);
+                               continue;
+                       }
+                       break;
                }
        }
 
@@ -2879,4 +2876,53 @@ mod tests {
                // For (None)
                assert_eq!(filter_addresses(None), None);
        }
+
+       #[test]
+       #[cfg(feature = "std")]
+       fn test_process_events_multithreaded() {
+               use std::time::{Duration, Instant};
+               // Test that `process_events` getting called on multiple threads doesn't generate too many
+               // loop iterations.
+               // Each time `process_events` goes around the loop we call
+               // `get_and_clear_pending_msg_events`, which we count using the `TestMessageHandler`.
+               // Because the loop should go around once more after a call which fails to take the
+               // single-threaded lock, if we write zero to the counter before calling `process_events` we
+               // should never observe there having been more than 2 loop iterations.
+               // Further, because the last thread to exit will call `process_events` before returning, we
+               // should always have at least one count at the end.
+               let cfg = Arc::new(create_peermgr_cfgs(1));
+               // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
+               let peer = Arc::new(create_network(1, unsafe { &*(&*cfg as *const _) as &'static _ }).pop().unwrap());
+
+               let exit_flag = Arc::new(AtomicBool::new(false));
+               macro_rules! spawn_thread { () => { {
+                       let thread_cfg = Arc::clone(&cfg);
+                       let thread_peer = Arc::clone(&peer);
+                       let thread_exit = Arc::clone(&exit_flag);
+                       std::thread::spawn(move || {
+                               while !thread_exit.load(Ordering::Acquire) {
+                                       thread_cfg[0].chan_handler.message_fetch_counter.store(0, Ordering::Release);
+                                       thread_peer.process_events();
+                                       std::thread::sleep(Duration::from_micros(1));
+                               }
+                       })
+               } } }
+
+               let thread_a = spawn_thread!();
+               let thread_b = spawn_thread!();
+               let thread_c = spawn_thread!();
+
+               let start_time = Instant::now();
+               while start_time.elapsed() < Duration::from_millis(100) {
+                       let val = cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire);
+                       assert!(val <= 2);
+                       std::thread::yield_now(); // Winblowz seemingly doesn't ever interrupt threads?!
+               }
+
+               exit_flag.store(true, Ordering::Release);
+               thread_a.join().unwrap();
+               thread_b.join().unwrap();
+               thread_c.join().unwrap();
+               assert!(cfg[0].chan_handler.message_fetch_counter.load(Ordering::Acquire) >= 1);
+       }
 }
index ee5180be70a5ec45a7c19c852818bdfcff95958c..598f73e0f1101dde53704cf01ceef83f220ccef0 100644 (file)
@@ -359,6 +359,7 @@ pub struct TestChannelMessageHandler {
        pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
        expected_recv_msgs: Mutex<Option<Vec<wire::Message<()>>>>,
        connected_peers: Mutex<HashSet<PublicKey>>,
+       pub message_fetch_counter: AtomicUsize,
 }
 
 impl TestChannelMessageHandler {
@@ -367,6 +368,7 @@ impl TestChannelMessageHandler {
                        pending_events: Mutex::new(Vec::new()),
                        expected_recv_msgs: Mutex::new(None),
                        connected_peers: Mutex::new(HashSet::new()),
+                       message_fetch_counter: AtomicUsize::new(0),
                }
        }
 
@@ -517,6 +519,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler {
 
 impl events::MessageSendEventsProvider for TestChannelMessageHandler {
        fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+               self.message_fetch_counter.fetch_add(1, Ordering::AcqRel);
                let mut pending_events = self.pending_events.lock().unwrap();
                let mut ret = Vec::new();
                mem::swap(&mut ret, &mut *pending_events);