use alloc::sync::Arc;
use core::mem;
-use sync::{Condvar, Mutex};
+use crate::sync::{Condvar, Mutex, MutexGuard};
-use prelude::*;
+use crate::prelude::*;
#[cfg(any(test, feature = "std"))]
use std::time::{Duration, Instant};
condvar: Condvar,
}
+macro_rules! check_woken {
+ ($guard: expr, $retval: expr) => { {
+ if $guard.0 {
+ $guard.0 = false;
+ if $guard.1.as_ref().map(|l| l.lock().unwrap().complete).unwrap_or(false) {
+ // If we're about to return as woken, and the future state is marked complete, wipe
+ // the future state and let the next future wait until we get a new notify.
+ $guard.1.take();
+ }
+ return $retval;
+ }
+ } }
+}
+
impl Notifier {
pub(crate) fn new() -> Self {
Self {
}
}
+ fn propagate_future_state_to_notify_flag(&self) -> MutexGuard<(bool, Option<Arc<Mutex<FutureState>>>)> {
+ let mut lock = self.notify_pending.lock().unwrap();
+ if let Some(existing_state) = &lock.1 {
+ if existing_state.lock().unwrap().callbacks_made {
+ // If the existing `FutureState` has completed and actually made callbacks,
+ // consider the notification flag to have been cleared and reset the future state.
+ lock.1.take();
+ lock.0 = false;
+ }
+ }
+ lock
+ }
+
pub(crate) fn wait(&self) {
loop {
- let mut guard = self.notify_pending.lock().unwrap();
- if guard.0 {
- guard.0 = false;
- return;
- }
+ let mut guard = self.propagate_future_state_to_notify_flag();
+ check_woken!(guard, ());
guard = self.condvar.wait(guard).unwrap();
- let result = guard.0;
- if result {
- guard.0 = false;
- return
- }
+ check_woken!(guard, ());
}
}
pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
let current_time = Instant::now();
loop {
- let mut guard = self.notify_pending.lock().unwrap();
- if guard.0 {
- guard.0 = false;
- return true;
- }
+ let mut guard = self.propagate_future_state_to_notify_flag();
+ check_woken!(guard, true);
guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0;
+ check_woken!(guard, true);
// Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
// desired wait time has actually passed, and if not then restart the loop with a reduced wait
// time. Note that this logic can be highly simplified through the use of
// `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
// 1.42.0.
let elapsed = current_time.elapsed();
- let result = guard.0;
- if result || elapsed >= max_wait {
- guard.0 = false;
- return result;
+ if elapsed >= max_wait {
+ return false;
}
match max_wait.checked_sub(elapsed) {
- None => return result,
+ None => return false,
Some(_) => continue
}
}
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
pub(crate) fn notify(&self) {
let mut lock = self.notify_pending.lock().unwrap();
- lock.0 = true;
- if let Some(future_state) = lock.1.take() {
+ if let Some(future_state) = &lock.1 {
future_state.lock().unwrap().complete();
}
+ lock.0 = true;
mem::drop(lock);
self.condvar.notify_all();
}
/// Gets a [`Future`] that will get woken up with any waiters
pub(crate) fn get_future(&self) -> Future {
- let mut lock = self.notify_pending.lock().unwrap();
- if lock.0 {
- Future {
- state: Arc::new(Mutex::new(FutureState {
- callbacks: Vec::new(),
- complete: false,
- }))
- }
- } else if let Some(existing_state) = &lock.1 {
+ let mut lock = self.propagate_future_state_to_notify_flag();
+ if let Some(existing_state) = &lock.1 {
Future { state: Arc::clone(&existing_state) }
} else {
let state = Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
- complete: false,
+ complete: lock.0,
+ callbacks_made: false,
}));
lock.1 = Some(Arc::clone(&state));
Future { state }
}
pub(crate) struct FutureState {
- callbacks: Vec<Box<dyn FutureCallback>>,
+ // When we're tracking whether a callback counts as having woken the user's code, we check the
+ // first bool - set to false if we're just calling a Waker, and true if we're calling an actual
+ // user-provided function.
+ callbacks: Vec<(bool, Box<dyn FutureCallback>)>,
complete: bool,
+ callbacks_made: bool,
}
impl FutureState {
fn complete(&mut self) {
- for callback in self.callbacks.drain(..) {
+ for (counts_as_call, callback) in self.callbacks.drain(..) {
callback.call();
+ self.callbacks_made |= counts_as_call;
}
self.complete = true;
}
pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
let mut state = self.state.lock().unwrap();
if state.complete {
+ state.callbacks_made = true;
mem::drop(state);
callback.call();
} else {
- state.callbacks.push(callback);
+ state.callbacks.push((true, callback));
}
}
}
}
-mod std_future {
- use core::task::Waker;
- pub struct StdWaker(pub Waker);
- impl super::FutureCallback for StdWaker {
- fn call(&self) { self.0.wake_by_ref() }
- }
+use core::task::Waker;
+struct StdWaker(pub Waker);
+impl FutureCallback for StdWaker {
+ fn call(&self) { self.0.wake_by_ref() }
}
/// (C-not exported) as Rust Futures aren't usable in language bindings.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.state.lock().unwrap();
if state.complete {
+ state.callbacks_made = true;
Poll::Ready(())
} else {
let waker = cx.waker().clone();
- state.callbacks.push(Box::new(std_future::StdWaker(waker)));
+ state.callbacks.push((false, Box::new(StdWaker(waker))));
Poll::Pending
}
}
use core::future::Future as FutureTrait;
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
+ #[test]
+ fn notifier_pre_notified_future() {
+ // Previously, if we generated a future after a `Notifier` had been notified, the future
+ // would never complete. This tests this behavior, ensuring the future instead completes
+ // immediately.
+ let notifier = Notifier::new();
+ notifier.notify();
+
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+ assert!(callback.load(Ordering::SeqCst));
+ }
+
+ #[test]
+ fn notifier_future_completes_wake() {
+ // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
+ // been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
+ // `lightning-background-processor` to persist in a tight loop.
+ let notifier = Notifier::new();
+
+ // First check the simple case, ensuring if we get notified a new future isn't woken until
+ // a second `notify`.
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+ assert!(!callback.load(Ordering::SeqCst));
+
+ notifier.notify();
+ assert!(callback.load(Ordering::SeqCst));
+
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+ assert!(!callback.load(Ordering::SeqCst));
+
+ notifier.notify();
+ assert!(callback.load(Ordering::SeqCst));
+
+ // Then check the case where the future is fetched before the notification, but a callback
+ // is only registered after the `notify`, ensuring that it is still sufficient to ensure we
+ // don't get an instant-wake when we get a new future.
+ let future = notifier.get_future();
+ notifier.notify();
+
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+ assert!(callback.load(Ordering::SeqCst));
+
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+ assert!(!callback.load(Ordering::SeqCst));
+ }
+
+ #[test]
+ fn new_future_wipes_notify_bit() {
+ // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
+ // been notified, we'd never mark the notifier as not-awaiting-notify if a `Future` is
+ // fetched after the notify bit has been set.
+ let notifier = Notifier::new();
+ notifier.notify();
+
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+ assert!(callback.load(Ordering::SeqCst));
+
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+ assert!(!callback.load(Ordering::SeqCst));
+
+ notifier.notify();
+ assert!(callback.load(Ordering::SeqCst));
+ }
+
#[cfg(feature = "std")]
#[test]
fn test_wait_timeout() {
- use sync::Arc;
+ use crate::sync::Arc;
use std::thread;
let persistence_notifier = Arc::new(Notifier::new());
state: Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
complete: false,
+ callbacks_made: false,
}))
};
let callback = Arc::new(AtomicBool::new(false));
state: Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
complete: false,
+ callbacks_made: false,
}))
};
future.state.lock().unwrap().complete();
state: Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
complete: false,
+ callbacks_made: false,
}))
};
let mut second_future = Future { state: Arc::clone(&future.state) };
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(()));
}
+
+ #[test]
+ fn test_dropped_future_doesnt_count() {
+ // Tests that if a Future gets drop'd before it is poll()ed `Ready` it doesn't count as
+ // having been woken, leaving the notify-required flag set.
+ let notifier = Notifier::new();
+ notifier.notify();
+
+ // If we get a future and don't touch it we're definitely still notify-required.
+ notifier.get_future();
+ assert!(notifier.wait_timeout(Duration::from_millis(1)));
+ assert!(!notifier.wait_timeout(Duration::from_millis(1)));
+
+ // Even if we poll'd once but didn't observe a `Ready`, we should be notify-required.
+ let mut future = notifier.get_future();
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+
+ notifier.notify();
+ assert!(woken.load(Ordering::SeqCst));
+ assert!(notifier.wait_timeout(Duration::from_millis(1)));
+
+ // However, once we do poll `Ready` it should wipe the notify-required flag.
+ let mut future = notifier.get_future();
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+
+ notifier.notify();
+ assert!(woken.load(Ordering::SeqCst));
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+ assert!(!notifier.wait_timeout(Duration::from_millis(1)));
+ }
}