Allow events processing without holding `total_consistency_lock`
authorElias Rohrer <ero@tnull.de>
Fri, 21 Apr 2023 16:02:54 +0000 (18:02 +0200)
committerElias Rohrer <ero@tnull.de>
Fri, 21 Apr 2023 16:02:54 +0000 (18:02 +0200)
Unfortunately, the RAII types used by `RwLock` are not `Send`, which is
why they can't be held over `await` boundaries. In order to allow
asynchronous events processing in multi-threaded environments, we here
allow to process events without holding the `total_consistency_lock`.

lightning-background-processor/src/lib.rs
lightning/src/ln/channelmanager.rs

index 9be5127472fe039126356c6ebcd0997a56094ff0..7b7a75d60e65c184519b358773f6c920ab7c03a3 100644 (file)
@@ -1485,10 +1485,9 @@ mod tests {
                                })
                        }, false,
                );
-               // TODO: Drop _local and simply spawn after #2003
-               let local_set = tokio::task::LocalSet::new();
-               local_set.spawn_local(bp_future);
-               local_set.spawn_local(async move {
+
+               let t1 = tokio::spawn(bp_future);
+               let t2 = tokio::spawn(async move {
                        do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
                                let mut i = 0;
                                loop {
@@ -1500,7 +1499,9 @@ mod tests {
                        }, tokio::time::sleep(Duration::from_millis(1)).await);
                        exit_sender.send(()).unwrap();
                });
-               local_set.await;
+               let (r1, r2) = tokio::join!(t1, t2);
+               r1.unwrap().unwrap();
+               r2.unwrap()
        }
 
        macro_rules! do_test_payment_path_scoring {
@@ -1654,13 +1655,14 @@ mod tests {
                                })
                        }, false,
                );
-               // TODO: Drop _local and simply spawn after #2003
-               let local_set = tokio::task::LocalSet::new();
-               local_set.spawn_local(bp_future);
-               local_set.spawn_local(async move {
+               let t1 = tokio::spawn(bp_future);
+               let t2 = tokio::spawn(async move {
                        do_test_payment_path_scoring!(nodes, receiver.recv().await);
                        exit_sender.send(()).unwrap();
                });
-               local_set.await;
+
+               let (r1, r2) = tokio::join!(t1, t2);
+               r1.unwrap().unwrap();
+               r2.unwrap()
        }
 }
index 213f3b954a2e2200a419d6da36a9fcdba0cc8a27..37b22a18428d27fa14cf24b05624dd85f2324cee 100644 (file)
@@ -72,7 +72,7 @@ use core::{cmp, mem};
 use core::cell::RefCell;
 use crate::io::Read;
 use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
-use core::sync::atomic::{AtomicUsize, Ordering};
+use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
 use core::time::Duration;
 use core::ops::Deref;
 
@@ -926,6 +926,8 @@ where
 
        /// See `ChannelManager` struct-level documentation for lock order requirements.
        pending_events: Mutex<Vec<events::Event>>,
+       /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
+       pending_events_processor: AtomicBool,
        /// See `ChannelManager` struct-level documentation for lock order requirements.
        pending_background_events: Mutex<Vec<BackgroundEvent>>,
        /// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1680,30 +1682,47 @@ macro_rules! handle_new_monitor_update {
 
 macro_rules! process_events_body {
        ($self: expr, $event_to_handle: expr, $handle_event: expr) => {
-               // We'll acquire our total consistency lock until the returned future completes so that
-               // we can be sure no other persists happen while processing events.
-               let _read_guard = $self.total_consistency_lock.read().unwrap();
+               let mut processed_all_events = false;
+               while !processed_all_events {
+                       if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
+                               return;
+                       }
 
-               let mut result = NotifyOption::SkipPersist;
+                       let mut result = NotifyOption::SkipPersist;
 
-               // TODO: This behavior should be documented. It's unintuitive that we query
-               // ChannelMonitors when clearing other events.
-               if $self.process_pending_monitor_events() {
-                       result = NotifyOption::DoPersist;
-               }
+                       {
+                               // We'll acquire our total consistency lock so that we can be sure no other
+                               // persists happen while processing monitor events.
+                               let _read_guard = $self.total_consistency_lock.read().unwrap();
+
+                               // TODO: This behavior should be documented. It's unintuitive that we query
+                               // ChannelMonitors when clearing other events.
+                               if $self.process_pending_monitor_events() {
+                                       result = NotifyOption::DoPersist;
+                               }
+                       }
 
-               let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]);
-               if !pending_events.is_empty() {
-                       result = NotifyOption::DoPersist;
-               }
+                       let pending_events = $self.pending_events.lock().unwrap().clone();
+                       let num_events = pending_events.len();
+                       if !pending_events.is_empty() {
+                               result = NotifyOption::DoPersist;
+                       }
 
-               for event in pending_events {
-                       $event_to_handle = event;
-                       $handle_event;
-               }
+                       for event in pending_events {
+                               $event_to_handle = event;
+                               $handle_event;
+                       }
 
-               if result == NotifyOption::DoPersist {
-                       $self.persistence_notifier.notify();
+                       {
+                               let mut pending_events = $self.pending_events.lock().unwrap();
+                               pending_events.drain(..num_events);
+                               processed_all_events = pending_events.is_empty();
+                               $self.pending_events_processor.store(false, Ordering::Release);
+                       }
+
+                       if result == NotifyOption::DoPersist {
+                               $self.persistence_notifier.notify();
+                       }
                }
        }
 }
@@ -1771,6 +1790,7 @@ where
                        per_peer_state: FairRwLock::new(HashMap::new()),
 
                        pending_events: Mutex::new(Vec::new()),
+                       pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
                        persistence_notifier: Notifier::new(),
@@ -7916,6 +7936,7 @@ where
                        per_peer_state: FairRwLock::new(per_peer_state),
 
                        pending_events: Mutex::new(pending_events_read),
+                       pending_events_processor: AtomicBool::new(false),
                        pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
                        persistence_notifier: Notifier::new(),