X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Futil%2Fwakers.rs;h=f86fc376cee0202323b9924057157aaf8379f86a;hb=6ddf69c93b1c3e418251ed7a898efd943e47bc30;hp=166771948fad62bc2f10078b5360fb67207f50f5;hpb=1beb3bb4217d39df7ac1729eca2a188cae1ca958;p=rust-lightning diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 16677194..f86fc376 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -15,9 +15,9 @@ use alloc::sync::Arc; use core::mem; -use sync::{Condvar, Mutex}; +use crate::sync::{Condvar, Mutex, MutexGuard}; -use prelude::*; +use crate::prelude::*; #[cfg(any(test, feature = "std"))] use std::time::{Duration, Instant}; @@ -33,6 +33,20 @@ pub(crate) struct Notifier { condvar: Condvar, } +macro_rules! check_woken { + ($guard: expr, $retval: expr) => { { + if $guard.0 { + $guard.0 = false; + if $guard.1.as_ref().map(|l| l.lock().unwrap().complete).unwrap_or(false) { + // If we're about to return as woken, and the future state is marked complete, wipe + // the future state and let the next future wait until we get a new notify. + $guard.1.take(); + } + return $retval; + } + } } +} + impl Notifier { pub(crate) fn new() -> Self { Self { @@ -41,19 +55,25 @@ impl Notifier { } } + fn propagate_future_state_to_notify_flag(&self) -> MutexGuard<(bool, Option>>)> { + let mut lock = self.notify_pending.lock().unwrap(); + if let Some(existing_state) = &lock.1 { + if existing_state.lock().unwrap().callbacks_made { + // If the existing `FutureState` has completed and actually made callbacks, + // consider the notification flag to have been cleared and reset the future state. + lock.1.take(); + lock.0 = false; + } + } + lock + } + pub(crate) fn wait(&self) { loop { - let mut guard = self.notify_pending.lock().unwrap(); - if guard.0 { - guard.0 = false; - return; - } + let mut guard = self.propagate_future_state_to_notify_flag(); + check_woken!(guard, ()); guard = self.condvar.wait(guard).unwrap(); - let result = guard.0; - if result { - guard.0 = false; - return - } + check_woken!(guard, ()); } } @@ -61,25 +81,21 @@ impl Notifier { pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool { let current_time = Instant::now(); loop { - let mut guard = self.notify_pending.lock().unwrap(); - if guard.0 { - guard.0 = false; - return true; - } + let mut guard = self.propagate_future_state_to_notify_flag(); + check_woken!(guard, true); guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0; + check_woken!(guard, true); // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the // desired wait time has actually passed, and if not then restart the loop with a reduced wait // time. Note that this logic can be highly simplified through the use of // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to // 1.42.0. let elapsed = current_time.elapsed(); - let result = guard.0; - if result || elapsed >= max_wait { - guard.0 = false; - return result; + if elapsed >= max_wait { + return false; } match max_wait.checked_sub(elapsed) { - None => return result, + None => return false, Some(_) => continue } } @@ -88,30 +104,27 @@ impl Notifier { /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters. pub(crate) fn notify(&self) { let mut lock = self.notify_pending.lock().unwrap(); - lock.0 = true; - if let Some(future_state) = lock.1.take() { - future_state.lock().unwrap().complete(); + if let Some(future_state) = &lock.1 { + if future_state.lock().unwrap().complete() { + lock.1 = None; + return; + } } + lock.0 = true; mem::drop(lock); self.condvar.notify_all(); } /// Gets a [`Future`] that will get woken up with any waiters pub(crate) fn get_future(&self) -> Future { - let mut lock = self.notify_pending.lock().unwrap(); - if lock.0 { - Future { - state: Arc::new(Mutex::new(FutureState { - callbacks: Vec::new(), - complete: false, - })) - } - } else if let Some(existing_state) = &lock.1 { + let mut lock = self.propagate_future_state_to_notify_flag(); + if let Some(existing_state) = &lock.1 { Future { state: Arc::clone(&existing_state) } } else { let state = Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), - complete: false, + complete: lock.0, + callbacks_made: false, })); lock.1 = Some(Arc::clone(&state)); Future { state } @@ -142,16 +155,22 @@ impl FutureCallback for F { } pub(crate) struct FutureState { - callbacks: Vec>, + // When we're tracking whether a callback counts as having woken the user's code, we check the + // first bool - set to false if we're just calling a Waker, and true if we're calling an actual + // user-provided function. + callbacks: Vec<(bool, Box)>, complete: bool, + callbacks_made: bool, } impl FutureState { - fn complete(&mut self) { - for callback in self.callbacks.drain(..) { + fn complete(&mut self) -> bool { + for (counts_as_call, callback) in self.callbacks.drain(..) { callback.call(); + self.callbacks_made |= counts_as_call; } self.complete = true; + self.callbacks_made } } @@ -168,10 +187,11 @@ impl Future { pub fn register_callback(&self, callback: Box) { let mut state = self.state.lock().unwrap(); if state.complete { + state.callbacks_made = true; mem::drop(state); callback.call(); } else { - state.callbacks.push(callback); + state.callbacks.push((true, callback)); } } @@ -186,12 +206,10 @@ impl Future { } } -mod std_future { - use core::task::Waker; - pub struct StdWaker(pub Waker); - impl super::FutureCallback for StdWaker { - fn call(&self) { self.0.wake_by_ref() } - } +use core::task::Waker; +struct StdWaker(pub Waker); +impl FutureCallback for StdWaker { + fn call(&self) { self.0.wake_by_ref() } } /// (C-not exported) as Rust Futures aren't usable in language bindings. @@ -201,10 +219,11 @@ impl<'a> StdFuture for Future { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut state = self.state.lock().unwrap(); if state.complete { + state.callbacks_made = true; Poll::Ready(()) } else { let waker = cx.waker().clone(); - state.callbacks.push(Box::new(std_future::StdWaker(waker))); + state.callbacks.push((false, Box::new(StdWaker(waker)))); Poll::Pending } } @@ -217,10 +236,88 @@ mod tests { use core::future::Future as FutureTrait; use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + #[test] + fn notifier_pre_notified_future() { + // Previously, if we generated a future after a `Notifier` had been notified, the future + // would never complete. This tests this behavior, ensuring the future instead completes + // immediately. + let notifier = Notifier::new(); + notifier.notify(); + + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + assert!(callback.load(Ordering::SeqCst)); + } + + #[test] + fn notifier_future_completes_wake() { + // Previously, if we were only using the `Future` interface to learn when a `Notifier` has + // been notified, we'd never mark the notifier as not-awaiting-notify. This caused the + // `lightning-background-processor` to persist in a tight loop. + let notifier = Notifier::new(); + + // First check the simple case, ensuring if we get notified a new future isn't woken until + // a second `notify`. + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + assert!(!callback.load(Ordering::SeqCst)); + + notifier.notify(); + assert!(callback.load(Ordering::SeqCst)); + + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + assert!(!callback.load(Ordering::SeqCst)); + + notifier.notify(); + assert!(callback.load(Ordering::SeqCst)); + + // Then check the case where the future is fetched before the notification, but a callback + // is only registered after the `notify`, ensuring that it is still sufficient to ensure we + // don't get an instant-wake when we get a new future. + let future = notifier.get_future(); + notifier.notify(); + + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + assert!(callback.load(Ordering::SeqCst)); + + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + assert!(!callback.load(Ordering::SeqCst)); + } + + #[test] + fn new_future_wipes_notify_bit() { + // Previously, if we were only using the `Future` interface to learn when a `Notifier` has + // been notified, we'd never mark the notifier as not-awaiting-notify if a `Future` is + // fetched after the notify bit has been set. + let notifier = Notifier::new(); + notifier.notify(); + + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + assert!(callback.load(Ordering::SeqCst)); + + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + assert!(!callback.load(Ordering::SeqCst)); + + notifier.notify(); + assert!(callback.load(Ordering::SeqCst)); + } + #[cfg(feature = "std")] #[test] fn test_wait_timeout() { - use sync::Arc; + use crate::sync::Arc; use std::thread; let persistence_notifier = Arc::new(Notifier::new()); @@ -268,6 +365,7 @@ mod tests { state: Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), complete: false, + callbacks_made: false, })) }; let callback = Arc::new(AtomicBool::new(false)); @@ -286,6 +384,7 @@ mod tests { state: Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), complete: false, + callbacks_made: false, })) }; future.state.lock().unwrap().complete(); @@ -323,6 +422,7 @@ mod tests { state: Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), complete: false, + callbacks_made: false, })) }; let mut second_future = Future { state: Arc::clone(&future.state) }; @@ -341,4 +441,95 @@ mod tests { assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(())); } + + #[test] + fn test_dropped_future_doesnt_count() { + // Tests that if a Future gets drop'd before it is poll()ed `Ready` it doesn't count as + // having been woken, leaving the notify-required flag set. + let notifier = Notifier::new(); + notifier.notify(); + + // If we get a future and don't touch it we're definitely still notify-required. + notifier.get_future(); + assert!(notifier.wait_timeout(Duration::from_millis(1))); + assert!(!notifier.wait_timeout(Duration::from_millis(1))); + + // Even if we poll'd once but didn't observe a `Ready`, we should be notify-required. + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert!(notifier.wait_timeout(Duration::from_millis(1))); + + // However, once we do poll `Ready` it should wipe the notify-required flag. + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!notifier.wait_timeout(Duration::from_millis(1))); + } + + #[test] + fn test_poll_post_notify_completes() { + // Tests that if we have a future state that has completed, and we haven't yet requested a + // new future, if we get a notify prior to requesting that second future it is generated + // pre-completed. + let notifier = Notifier::new(); + + notifier.notify(); + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!woken.load(Ordering::SeqCst)); + + notifier.notify(); + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!woken.load(Ordering::SeqCst)); + + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + assert!(!woken.load(Ordering::SeqCst)); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + } + + #[test] + fn test_poll_post_notify_completes_initial_notified() { + // Identical to the previous test, but the first future completes via a wake rather than an + // immediate `Poll::Ready`. + let notifier = Notifier::new(); + + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + + notifier.notify(); + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!woken.load(Ordering::SeqCst)); + + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + assert!(!woken.load(Ordering::SeqCst)); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + } }