From 68b3d2e4536c8057389c3cb56636e332f5beccb2 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 9 Aug 2022 01:57:09 +0000 Subject: [PATCH] Move PersistenceNotifier to a new util module It was always somewhat strange to have a bunch of notification logic in `channelmanager`, and with the next commit adding a bunch more, its moved here first. --- lightning/src/ln/channelmanager.rs | 132 +------------------------ lightning/src/util/mod.rs | 1 + lightning/src/util/wakers.rs | 150 +++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 128 deletions(-) create mode 100644 lightning/src/util/wakers.rs diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 692d46ed..577c3d82 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -54,6 +54,8 @@ use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner, Rec use util::config::{UserConfig, ChannelConfig}; use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; use util::{byte_utils, events}; +use util::crypto::sign; +use util::wakers::PersistenceNotifier; use util::scid_utils::fake_scid; use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter}; use util::logger::{Level, Logger}; @@ -64,15 +66,11 @@ use prelude::*; use core::{cmp, mem}; use core::cell::RefCell; use io::Read; -use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard}; +use sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; use core::sync::atomic::{AtomicUsize, Ordering}; use core::time::Duration; use core::ops::Deref; -#[cfg(any(test, feature = "std"))] -use std::time::Instant; -use util::crypto::sign; - // We hold various information about HTLC relay in the HTLC objects in Channel itself: // // Upon receipt of an HTLC from a peer, we'll give it a PendingHTLCStatus indicating if it should @@ -5992,10 +5990,7 @@ where #[cfg(any(test, feature = "_test_utils"))] pub fn get_persistence_condvar_value(&self) -> bool { - let mutcond = &self.persistence_notifier.persistence_lock; - let &(ref mtx, _) = mutcond; - let guard = mtx.lock().unwrap(); - *guard + self.persistence_notifier.needs_persist() } /// Gets the latest best block which was connected either via the [`chain::Listen`] or @@ -6237,77 +6232,6 @@ impl } } -/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to -/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`. -struct PersistenceNotifier { - /// Users won't access the persistence_lock directly, but rather wait on its bool using - /// `wait_timeout` and `wait`. - persistence_lock: (Mutex, Condvar), -} - -impl PersistenceNotifier { - fn new() -> Self { - Self { - persistence_lock: (Mutex::new(false), Condvar::new()), - } - } - - fn wait(&self) { - loop { - let &(ref mtx, ref cvar) = &self.persistence_lock; - let mut guard = mtx.lock().unwrap(); - if *guard { - *guard = false; - return; - } - guard = cvar.wait(guard).unwrap(); - let result = *guard; - if result { - *guard = false; - return - } - } - } - - #[cfg(any(test, feature = "std"))] - fn wait_timeout(&self, max_wait: Duration) -> bool { - let current_time = Instant::now(); - loop { - let &(ref mtx, ref cvar) = &self.persistence_lock; - let mut guard = mtx.lock().unwrap(); - if *guard { - *guard = false; - return true; - } - guard = cvar.wait_timeout(guard, max_wait).unwrap().0; - // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the - // desired wait time has actually passed, and if not then restart the loop with a reduced wait - // time. Note that this logic can be highly simplified through the use of - // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to - // 1.42.0. - let elapsed = current_time.elapsed(); - let result = *guard; - if result || elapsed >= max_wait { - *guard = false; - return result; - } - match max_wait.checked_sub(elapsed) { - None => return result, - Some(_) => continue - } - } - } - - // Signal to the ChannelManager persister that there are updates necessitating persisting to disk. - fn notify(&self) { - let &(ref persist_mtx, ref cnd) = &self.persistence_lock; - let mut persistence_lock = persist_mtx.lock().unwrap(); - *persistence_lock = true; - mem::drop(persistence_lock); - cnd.notify_all(); - } -} - const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; @@ -7355,54 +7279,6 @@ mod tests { use util::test_utils; use chain::keysinterface::KeysInterface; - #[cfg(feature = "std")] - #[test] - fn test_wait_timeout() { - use ln::channelmanager::PersistenceNotifier; - use sync::Arc; - use core::sync::atomic::AtomicBool; - use std::thread; - - let persistence_notifier = Arc::new(PersistenceNotifier::new()); - let thread_notifier = Arc::clone(&persistence_notifier); - - let exit_thread = Arc::new(AtomicBool::new(false)); - let exit_thread_clone = exit_thread.clone(); - thread::spawn(move || { - loop { - let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock; - let mut persistence_lock = persist_mtx.lock().unwrap(); - *persistence_lock = true; - cnd.notify_all(); - - if exit_thread_clone.load(Ordering::SeqCst) { - break - } - } - }); - - // Check that we can block indefinitely until updates are available. - let _ = persistence_notifier.wait(); - - // Check that the PersistenceNotifier will return after the given duration if updates are - // available. - loop { - if persistence_notifier.wait_timeout(Duration::from_millis(100)) { - break - } - } - - exit_thread.store(true, Ordering::SeqCst); - - // Check that the PersistenceNotifier will return after the given duration even if no updates - // are available. - loop { - if !persistence_notifier.wait_timeout(Duration::from_millis(100)) { - break - } - } - } - #[test] fn test_notify_limits() { // Check that a few cases which don't require the persistence of a new ChannelManager, diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index c6181ab2..21976113 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -21,6 +21,7 @@ pub mod ser; pub mod message_signing; pub mod invoice; pub mod persist; +pub mod wakers; pub(crate) mod atomic_counter; pub(crate) mod byte_utils; diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs new file mode 100644 index 00000000..ba218e4e --- /dev/null +++ b/lightning/src/util/wakers.rs @@ -0,0 +1,150 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! Utilities which allow users to block on some future notification from LDK. These are +//! specifically used by [`ChannelManager`] to allow waiting until the [`ChannelManager`] needs to +//! be re-persisted. +//! +//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager + +use core::mem; +use core::time::Duration; +use sync::{Condvar, Mutex}; + +#[cfg(any(test, feature = "std"))] +use std::time::Instant; + +/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to +/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`. +pub(crate) struct PersistenceNotifier { + /// Users won't access the persistence_lock directly, but rather wait on its bool using + /// `wait_timeout` and `wait`. + persistence_lock: (Mutex, Condvar), +} + +impl PersistenceNotifier { + pub(crate) fn new() -> Self { + Self { + persistence_lock: (Mutex::new(false), Condvar::new()), + } + } + + pub(crate) fn wait(&self) { + loop { + let &(ref mtx, ref cvar) = &self.persistence_lock; + let mut guard = mtx.lock().unwrap(); + if *guard { + *guard = false; + return; + } + guard = cvar.wait(guard).unwrap(); + let result = *guard; + if result { + *guard = false; + return + } + } + } + + #[cfg(any(test, feature = "std"))] + pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool { + let current_time = Instant::now(); + loop { + let &(ref mtx, ref cvar) = &self.persistence_lock; + let mut guard = mtx.lock().unwrap(); + if *guard { + *guard = false; + return true; + } + guard = cvar.wait_timeout(guard, max_wait).unwrap().0; + // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the + // desired wait time has actually passed, and if not then restart the loop with a reduced wait + // time. Note that this logic can be highly simplified through the use of + // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to + // 1.42.0. + let elapsed = current_time.elapsed(); + let result = *guard; + if result || elapsed >= max_wait { + *guard = false; + return result; + } + match max_wait.checked_sub(elapsed) { + None => return result, + Some(_) => continue + } + } + } + + /// Wake waiters, tracking that persistence needs to occur. + pub(crate) fn notify(&self) { + let &(ref persist_mtx, ref cnd) = &self.persistence_lock; + let mut persistence_lock = persist_mtx.lock().unwrap(); + *persistence_lock = true; + mem::drop(persistence_lock); + cnd.notify_all(); + } + + #[cfg(any(test, feature = "_test_utils"))] + pub fn needs_persist(&self) -> bool { + let &(ref mtx, _) = &self.persistence_lock; + let guard = mtx.lock().unwrap(); + *guard + } +} + +#[cfg(test)] +mod tests { + #[cfg(feature = "std")] + #[test] + fn test_wait_timeout() { + use super::*; + use sync::Arc; + use core::sync::atomic::{AtomicBool, Ordering}; + use std::thread; + + let persistence_notifier = Arc::new(PersistenceNotifier::new()); + let thread_notifier = Arc::clone(&persistence_notifier); + + let exit_thread = Arc::new(AtomicBool::new(false)); + let exit_thread_clone = exit_thread.clone(); + thread::spawn(move || { + loop { + let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock; + let mut persistence_lock = persist_mtx.lock().unwrap(); + *persistence_lock = true; + cnd.notify_all(); + + if exit_thread_clone.load(Ordering::SeqCst) { + break + } + } + }); + + // Check that we can block indefinitely until updates are available. + let _ = persistence_notifier.wait(); + + // Check that the PersistenceNotifier will return after the given duration if updates are + // available. + loop { + if persistence_notifier.wait_timeout(Duration::from_millis(100)) { + break + } + } + + exit_thread.store(true, Ordering::SeqCst); + + // Check that the PersistenceNotifier will return after the given duration even if no updates + // are available. + loop { + if !persistence_notifier.wait_timeout(Duration::from_millis(100)) { + break + } + } + } +} -- 2.30.2