X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;fp=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=3b83726958b3a655b681f9a821d114b5a97e7c8a;hb=8c6cb9953a3b00ce3da25fbdfd8ada0ec48fc63f;hp=4545cff51b2c721f588c3d89e9d2afd3814daa8e;hpb=ee2f1a929ed427c5b2f963a3d13da2fec1a1cc24;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 4545cff5..3b837269 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -54,6 +54,8 @@ use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner, Rec use util::config::{UserConfig, ChannelConfig}; use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; use util::{byte_utils, events}; +use util::crypto::sign; +use util::wakers::{Future, Notifier}; use util::scid_utils::fake_scid; use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter}; use util::logger::{Level, Logger}; @@ -64,15 +66,11 @@ use prelude::*; use core::{cmp, mem}; use core::cell::RefCell; use io::Read; -use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard}; +use sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; use core::sync::atomic::{AtomicUsize, Ordering}; use core::time::Duration; use core::ops::Deref; -#[cfg(any(test, feature = "std"))] -use std::time::Instant; -use util::crypto::sign; - // We hold various information about HTLC relay in the HTLC objects in Channel itself: // // Upon receipt of an HTLC from a peer, we'll give it a PendingHTLCStatus indicating if it should @@ -792,10 +790,10 @@ pub struct ChannelManager, - persistence_notifier: PersistenceNotifier, + persistence_notifier: Notifier, keys_manager: K, @@ -835,18 +833,18 @@ enum NotifyOption { /// notify or not based on whether relevant changes have been made, providing a closure to /// `optionally_notify` which returns a `NotifyOption`. struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { - persistence_notifier: &'a PersistenceNotifier, + persistence_notifier: &'a Notifier, should_persist: F, // We hold onto this result so the lock doesn't get released immediately. _read_guard: RwLockReadGuard<'a, ()>, } impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused - fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a Notifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { PersistenceNotifierGuard::optionally_notify(lock, notifier, || -> NotifyOption { NotifyOption::DoPersist }) } - fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { + fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { let read_guard = lock.read().unwrap(); PersistenceNotifierGuard { @@ -1627,7 +1625,7 @@ impl ChannelMana pending_events: Mutex::new(Vec::new()), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), - persistence_notifier: PersistenceNotifier::new(), + persistence_notifier: Notifier::new(), keys_manager, @@ -5991,12 +5989,16 @@ where self.persistence_notifier.wait() } + /// Gets a [`Future`] that completes when a persistable update is available. Note that + /// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and + /// should instead register actions to be taken later. + pub fn get_persistable_update_future(&self) -> Future { + self.persistence_notifier.get_future() + } + #[cfg(any(test, feature = "_test_utils"))] pub fn get_persistence_condvar_value(&self) -> bool { - let mutcond = &self.persistence_notifier.persistence_lock; - let &(ref mtx, _) = mutcond; - let guard = mtx.lock().unwrap(); - *guard + self.persistence_notifier.notify_pending() } /// Gets the latest best block which was connected either via the [`chain::Listen`] or @@ -6238,77 +6240,6 @@ impl } } -/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to -/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`. -struct PersistenceNotifier { - /// Users won't access the persistence_lock directly, but rather wait on its bool using - /// `wait_timeout` and `wait`. - persistence_lock: (Mutex, Condvar), -} - -impl PersistenceNotifier { - fn new() -> Self { - Self { - persistence_lock: (Mutex::new(false), Condvar::new()), - } - } - - fn wait(&self) { - loop { - let &(ref mtx, ref cvar) = &self.persistence_lock; - let mut guard = mtx.lock().unwrap(); - if *guard { - *guard = false; - return; - } - guard = cvar.wait(guard).unwrap(); - let result = *guard; - if result { - *guard = false; - return - } - } - } - - #[cfg(any(test, feature = "std"))] - fn wait_timeout(&self, max_wait: Duration) -> bool { - let current_time = Instant::now(); - loop { - let &(ref mtx, ref cvar) = &self.persistence_lock; - let mut guard = mtx.lock().unwrap(); - if *guard { - *guard = false; - return true; - } - guard = cvar.wait_timeout(guard, max_wait).unwrap().0; - // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the - // desired wait time has actually passed, and if not then restart the loop with a reduced wait - // time. Note that this logic can be highly simplified through the use of - // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to - // 1.42.0. - let elapsed = current_time.elapsed(); - let result = *guard; - if result || elapsed >= max_wait { - *guard = false; - return result; - } - match max_wait.checked_sub(elapsed) { - None => return result, - Some(_) => continue - } - } - } - - // Signal to the ChannelManager persister that there are updates necessitating persisting to disk. - fn notify(&self) { - let &(ref persist_mtx, ref cnd) = &self.persistence_lock; - let mut persistence_lock = persist_mtx.lock().unwrap(); - *persistence_lock = true; - mem::drop(persistence_lock); - cnd.notify_all(); - } -} - const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; @@ -7317,7 +7248,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> pending_events: Mutex::new(pending_events_read), pending_background_events: Mutex::new(pending_background_events_read), total_consistency_lock: RwLock::new(()), - persistence_notifier: PersistenceNotifier::new(), + persistence_notifier: Notifier::new(), keys_manager: args.keys_manager, logger: args.logger, @@ -7356,54 +7287,6 @@ mod tests { use util::test_utils; use chain::keysinterface::KeysInterface; - #[cfg(feature = "std")] - #[test] - fn test_wait_timeout() { - use ln::channelmanager::PersistenceNotifier; - use sync::Arc; - use core::sync::atomic::AtomicBool; - use std::thread; - - let persistence_notifier = Arc::new(PersistenceNotifier::new()); - let thread_notifier = Arc::clone(&persistence_notifier); - - let exit_thread = Arc::new(AtomicBool::new(false)); - let exit_thread_clone = exit_thread.clone(); - thread::spawn(move || { - loop { - let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock; - let mut persistence_lock = persist_mtx.lock().unwrap(); - *persistence_lock = true; - cnd.notify_all(); - - if exit_thread_clone.load(Ordering::SeqCst) { - break - } - } - }); - - // Check that we can block indefinitely until updates are available. - let _ = persistence_notifier.wait(); - - // Check that the PersistenceNotifier will return after the given duration if updates are - // available. - loop { - if persistence_notifier.wait_timeout(Duration::from_millis(100)) { - break - } - } - - exit_thread.store(true, Ordering::SeqCst); - - // Check that the PersistenceNotifier will return after the given duration even if no updates - // are available. - loop { - if !persistence_notifier.wait_timeout(Duration::from_millis(100)) { - break - } - } - } - #[test] fn test_notify_limits() { // Check that a few cases which don't require the persistence of a new ChannelManager,