Move PersistenceNotifier to a new util module
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 692d46eda90e8937ec289eed73f47fff6c8d5c8c..577c3d82cfe2ef82167ed97367e0fa2ffdab1cc3 100644 (file)
@@ -54,6 +54,8 @@ use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner, Rec
 use util::config::{UserConfig, ChannelConfig};
 use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
 use util::{byte_utils, events};
+use util::crypto::sign;
+use util::wakers::PersistenceNotifier;
 use util::scid_utils::fake_scid;
 use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter};
 use util::logger::{Level, Logger};
@@ -64,15 +66,11 @@ use prelude::*;
 use core::{cmp, mem};
 use core::cell::RefCell;
 use io::Read;
-use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard};
+use sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
 use core::sync::atomic::{AtomicUsize, Ordering};
 use core::time::Duration;
 use core::ops::Deref;
 
-#[cfg(any(test, feature = "std"))]
-use std::time::Instant;
-use util::crypto::sign;
-
 // We hold various information about HTLC relay in the HTLC objects in Channel itself:
 //
 // Upon receipt of an HTLC from a peer, we'll give it a PendingHTLCStatus indicating if it should
@@ -5992,10 +5990,7 @@ where
 
        #[cfg(any(test, feature = "_test_utils"))]
        pub fn get_persistence_condvar_value(&self) -> bool {
-               let mutcond = &self.persistence_notifier.persistence_lock;
-               let &(ref mtx, _) = mutcond;
-               let guard = mtx.lock().unwrap();
-               *guard
+               self.persistence_notifier.needs_persist()
        }
 
        /// Gets the latest best block which was connected either via the [`chain::Listen`] or
@@ -6237,77 +6232,6 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
        }
 }
 
-/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
-/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`.
-struct PersistenceNotifier {
-       /// Users won't access the persistence_lock directly, but rather wait on its bool using
-       /// `wait_timeout` and `wait`.
-       persistence_lock: (Mutex<bool>, Condvar),
-}
-
-impl PersistenceNotifier {
-       fn new() -> Self {
-               Self {
-                       persistence_lock: (Mutex::new(false), Condvar::new()),
-               }
-       }
-
-       fn wait(&self) {
-               loop {
-                       let &(ref mtx, ref cvar) = &self.persistence_lock;
-                       let mut guard = mtx.lock().unwrap();
-                       if *guard {
-                               *guard = false;
-                               return;
-                       }
-                       guard = cvar.wait(guard).unwrap();
-                       let result = *guard;
-                       if result {
-                               *guard = false;
-                               return
-                       }
-               }
-       }
-
-       #[cfg(any(test, feature = "std"))]
-       fn wait_timeout(&self, max_wait: Duration) -> bool {
-               let current_time = Instant::now();
-               loop {
-                       let &(ref mtx, ref cvar) = &self.persistence_lock;
-                       let mut guard = mtx.lock().unwrap();
-                       if *guard {
-                               *guard = false;
-                               return true;
-                       }
-                       guard = cvar.wait_timeout(guard, max_wait).unwrap().0;
-                       // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
-                       // desired wait time has actually passed, and if not then restart the loop with a reduced wait
-                       // time. Note that this logic can be highly simplified through the use of
-                       // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
-                       // 1.42.0.
-                       let elapsed = current_time.elapsed();
-                       let result = *guard;
-                       if result || elapsed >= max_wait {
-                               *guard = false;
-                               return result;
-                       }
-                       match max_wait.checked_sub(elapsed) {
-                               None => return result,
-                               Some(_) => continue
-                       }
-               }
-       }
-
-       // Signal to the ChannelManager persister that there are updates necessitating persisting to disk.
-       fn notify(&self) {
-               let &(ref persist_mtx, ref cnd) = &self.persistence_lock;
-               let mut persistence_lock = persist_mtx.lock().unwrap();
-               *persistence_lock = true;
-               mem::drop(persistence_lock);
-               cnd.notify_all();
-       }
-}
-
 const SERIALIZATION_VERSION: u8 = 1;
 const MIN_SERIALIZATION_VERSION: u8 = 1;
 
@@ -7355,54 +7279,6 @@ mod tests {
        use util::test_utils;
        use chain::keysinterface::KeysInterface;
 
-       #[cfg(feature = "std")]
-       #[test]
-       fn test_wait_timeout() {
-               use ln::channelmanager::PersistenceNotifier;
-               use sync::Arc;
-               use core::sync::atomic::AtomicBool;
-               use std::thread;
-
-               let persistence_notifier = Arc::new(PersistenceNotifier::new());
-               let thread_notifier = Arc::clone(&persistence_notifier);
-
-               let exit_thread = Arc::new(AtomicBool::new(false));
-               let exit_thread_clone = exit_thread.clone();
-               thread::spawn(move || {
-                       loop {
-                               let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock;
-                               let mut persistence_lock = persist_mtx.lock().unwrap();
-                               *persistence_lock = true;
-                               cnd.notify_all();
-
-                               if exit_thread_clone.load(Ordering::SeqCst) {
-                                       break
-                               }
-                       }
-               });
-
-               // Check that we can block indefinitely until updates are available.
-               let _ = persistence_notifier.wait();
-
-               // Check that the PersistenceNotifier will return after the given duration if updates are
-               // available.
-               loop {
-                       if persistence_notifier.wait_timeout(Duration::from_millis(100)) {
-                               break
-                       }
-               }
-
-               exit_thread.store(true, Ordering::SeqCst);
-
-               // Check that the PersistenceNotifier will return after the given duration even if no updates
-               // are available.
-               loop {
-                       if !persistence_notifier.wait_timeout(Duration::from_millis(100)) {
-                               break
-                       }
-               }
-       }
-
        #[test]
        fn test_notify_limits() {
                // Check that a few cases which don't require the persistence of a new ChannelManager,