use io;
use alloc::collections::LinkedList;
use sync::{Arc, Mutex, MutexGuard, RwLock};
+use core::sync::atomic::{AtomicBool, Ordering};
use core::{cmp, hash, fmt, mem};
use core::ops::Deref;
use core::convert::Infallible;
/// `peers` write lock to do so, so instead we block on this empty mutex when entering
/// `process_events`.
event_processing_lock: Mutex<()>,
+ /// Because event processing is global and always does all available work before returning,
+ /// there is no reason for us to have many event processors waiting on the lock at once.
+ /// Instead, we limit the total blocked event processors to always exactly one by setting this
+ /// when an event process call is waiting.
+ blocked_event_processors: AtomicBool,
our_node_secret: SecretKey,
ephemeral_key_midstate: Sha256Engine,
custom_message_handler: CMH,
}),
node_id_to_descriptor: Mutex::new(HashMap::new()),
event_processing_lock: Mutex::new(()),
+ blocked_event_processors: AtomicBool::new(false),
our_node_secret,
ephemeral_key_midstate,
peer_counter: AtomicCounter::new(),
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
/// or one of the other clients provided in our language bindings.
///
+ /// Note that if there are any other calls to this function waiting on lock(s) this may return
+ /// without doing any work. All available events that need handling will be handled before the
+ /// other calls return.
+ ///
/// [`send_payment`]: crate::ln::channelmanager::ChannelManager::send_payment
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
/// [`send_data`]: SocketDescriptor::send_data
pub fn process_events(&self) {
- let _single_processor_lock = self.event_processing_lock.lock().unwrap();
+ let mut _single_processor_lock = self.event_processing_lock.try_lock();
+ if _single_processor_lock.is_err() {
+ // While we could wake the older sleeper here with a CV and make more even waiting
+ // times, that would be a lot of overengineering for a simple "reduce total waiter
+ // count" goal.
+ match self.blocked_event_processors.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) {
+ Err(val) => {
+ debug_assert!(val, "compare_exchange failed spuriously?");
+ return;
+ },
+ Ok(val) => {
+ debug_assert!(!val, "compare_exchange succeeded spuriously?");
+ // We're the only waiter, as the running process_events may have emptied the
+ // pending events "long" ago and there are new events for us to process, wait until
+ // its done and process any leftover events before returning.
+ _single_processor_lock = Ok(self.event_processing_lock.lock().unwrap());
+ self.blocked_event_processors.store(false, Ordering::Release);
+ }
+ }
+ }
let mut peers_to_disconnect = HashMap::new();
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();