]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Limit blocked PeerManager::process_events waiters to two
authorMatt Corallo <git@bluematt.me>
Wed, 6 Oct 2021 04:45:07 +0000 (04:45 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 10 May 2022 23:40:20 +0000 (23:40 +0000)
Only one instance of PeerManager::process_events can run at a time,
and each run always finishes all available work before returning.
Thus, having several threads blocked on the process_events lock
doesn't accomplish anything but blocking more threads.

Here we limit the number of blocked calls on process_events to two
- one processing events and one blocked at the top which will
process all available events after the first completes.

lightning/src/ln/peer_handler.rs

index 7408263085c41e8162c038e5d573ee01db15e470..bb7b697e420ea5603e18dca088808b5150938ecd 100644 (file)
@@ -34,6 +34,7 @@ use prelude::*;
 use io;
 use alloc::collections::LinkedList;
 use sync::{Arc, Mutex, MutexGuard, RwLock};
+use core::sync::atomic::{AtomicBool, Ordering};
 use core::{cmp, hash, fmt, mem};
 use core::ops::Deref;
 use core::convert::Infallible;
@@ -437,6 +438,11 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
        /// `peers` write lock to do so, so instead we block on this empty mutex when entering
        /// `process_events`.
        event_processing_lock: Mutex<()>,
+       /// Because event processing is global and always does all available work before returning,
+       /// there is no reason for us to have many event processors waiting on the lock at once.
+       /// Instead, we limit the total blocked event processors to always exactly one by setting this
+       /// when an event process call is waiting.
+       blocked_event_processors: AtomicBool,
        our_node_secret: SecretKey,
        ephemeral_key_midstate: Sha256Engine,
        custom_message_handler: CMH,
@@ -569,6 +575,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                        }),
                        node_id_to_descriptor: Mutex::new(HashMap::new()),
                        event_processing_lock: Mutex::new(()),
+                       blocked_event_processors: AtomicBool::new(false),
                        our_node_secret,
                        ephemeral_key_midstate,
                        peer_counter: AtomicCounter::new(),
@@ -1369,11 +1376,34 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
        /// or one of the other clients provided in our language bindings.
        ///
+       /// Note that if there are any other calls to this function waiting on lock(s) this may return
+       /// without doing any work. All available events that need handling will be handled before the
+       /// other calls return.
+       ///
        /// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
        /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
        /// [`send_data`]: SocketDescriptor::send_data
        pub fn process_events(&self) {
-               let _single_processor_lock = self.event_processing_lock.lock().unwrap();
+               let mut _single_processor_lock = self.event_processing_lock.try_lock();
+               if _single_processor_lock.is_err() {
+                       // While we could wake the older sleeper here with a CV and make more even waiting
+                       // times, that would be a lot of overengineering for a simple "reduce total waiter
+                       // count" goal.
+                       match self.blocked_event_processors.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) {
+                               Err(val) => {
+                                       debug_assert!(val, "compare_exchange failed spuriously?");
+                                       return;
+                               },
+                               Ok(val) => {
+                                       debug_assert!(!val, "compare_exchange succeeded spuriously?");
+                                       // We're the only waiter, as the running process_events may have emptied the
+                                       // pending events "long" ago and there are new events for us to process, wait until
+                                       // its done and process any leftover events before returning.
+                                       _single_processor_lock = Ok(self.event_processing_lock.lock().unwrap());
+                                       self.blocked_event_processors.store(false, Ordering::Release);
+                               }
+                       }
+               }
 
                let mut peers_to_disconnect = HashMap::new();
                let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();