From: Elias Rohrer Date: Fri, 21 Apr 2023 16:02:54 +0000 (+0200) Subject: Allow events processing without holding `total_consistency_lock` X-Git-Tag: v0.0.115~7^2 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=f2453b7;p=rust-lightning Allow events processing without holding `total_consistency_lock` Unfortunately, the RAII types used by `RwLock` are not `Send`, which is why they can't be held over `await` boundaries. In order to allow asynchronous events processing in multi-threaded environments, we here allow to process events without holding the `total_consistency_lock`. --- diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 9be512747..7b7a75d60 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1485,10 +1485,9 @@ mod tests { }) }, false, ); - // TODO: Drop _local and simply spawn after #2003 - let local_set = tokio::task::LocalSet::new(); - local_set.spawn_local(bp_future); - local_set.spawn_local(async move { + + let t1 = tokio::spawn(bp_future); + let t2 = tokio::spawn(async move { do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, { let mut i = 0; loop { @@ -1500,7 +1499,9 @@ mod tests { }, tokio::time::sleep(Duration::from_millis(1)).await); exit_sender.send(()).unwrap(); }); - local_set.await; + let (r1, r2) = tokio::join!(t1, t2); + r1.unwrap().unwrap(); + r2.unwrap() } macro_rules! do_test_payment_path_scoring { @@ -1654,13 +1655,14 @@ mod tests { }) }, false, ); - // TODO: Drop _local and simply spawn after #2003 - let local_set = tokio::task::LocalSet::new(); - local_set.spawn_local(bp_future); - local_set.spawn_local(async move { + let t1 = tokio::spawn(bp_future); + let t2 = tokio::spawn(async move { do_test_payment_path_scoring!(nodes, receiver.recv().await); exit_sender.send(()).unwrap(); }); - local_set.await; + + let (r1, r2) = tokio::join!(t1, t2); + r1.unwrap().unwrap(); + r2.unwrap() } } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 213f3b954..37b22a184 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -72,7 +72,7 @@ use core::{cmp, mem}; use core::cell::RefCell; use crate::io::Read; use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState}; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; use core::time::Duration; use core::ops::Deref; @@ -926,6 +926,8 @@ where /// See `ChannelManager` struct-level documentation for lock order requirements. pending_events: Mutex>, + /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously. + pending_events_processor: AtomicBool, /// See `ChannelManager` struct-level documentation for lock order requirements. pending_background_events: Mutex>, /// Used when we have to take a BIG lock to make sure everything is self-consistent. @@ -1680,30 +1682,47 @@ macro_rules! handle_new_monitor_update { macro_rules! process_events_body { ($self: expr, $event_to_handle: expr, $handle_event: expr) => { - // We'll acquire our total consistency lock until the returned future completes so that - // we can be sure no other persists happen while processing events. - let _read_guard = $self.total_consistency_lock.read().unwrap(); + let mut processed_all_events = false; + while !processed_all_events { + if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; + } - let mut result = NotifyOption::SkipPersist; + let mut result = NotifyOption::SkipPersist; - // TODO: This behavior should be documented. It's unintuitive that we query - // ChannelMonitors when clearing other events. - if $self.process_pending_monitor_events() { - result = NotifyOption::DoPersist; - } + { + // We'll acquire our total consistency lock so that we can be sure no other + // persists happen while processing monitor events. + let _read_guard = $self.total_consistency_lock.read().unwrap(); + + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if $self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + } - let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]); - if !pending_events.is_empty() { - result = NotifyOption::DoPersist; - } + let pending_events = $self.pending_events.lock().unwrap().clone(); + let num_events = pending_events.len(); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } - for event in pending_events { - $event_to_handle = event; - $handle_event; - } + for event in pending_events { + $event_to_handle = event; + $handle_event; + } - if result == NotifyOption::DoPersist { - $self.persistence_notifier.notify(); + { + let mut pending_events = $self.pending_events.lock().unwrap(); + pending_events.drain(..num_events); + processed_all_events = pending_events.is_empty(); + $self.pending_events_processor.store(false, Ordering::Release); + } + + if result == NotifyOption::DoPersist { + $self.persistence_notifier.notify(); + } } } } @@ -1771,6 +1790,7 @@ where per_peer_state: FairRwLock::new(HashMap::new()), pending_events: Mutex::new(Vec::new()), + pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), persistence_notifier: Notifier::new(), @@ -7916,6 +7936,7 @@ where per_peer_state: FairRwLock::new(per_peer_state), pending_events: Mutex::new(pending_events_read), + pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), persistence_notifier: Notifier::new(),