+
+ #[cfg(feature = "std")]
+ #[test]
+ fn test_state_drops() {
+ // Previously, there was a leak if a `Notifier` was `drop`ed without ever being notified
+ // but after having been slept-on. This tests for that leak.
+ use crate::sync::Arc;
+ use std::thread;
+
+ let notifier_a = Arc::new(Notifier::new());
+ let notifier_b = Arc::new(Notifier::new());
+
+ let thread_notifier_a = Arc::clone(¬ifier_a);
+
+ let future_a = notifier_a.get_future();
+ let future_state_a = Arc::downgrade(&future_a.state);
+
+ let future_b = notifier_b.get_future();
+ let future_state_b = Arc::downgrade(&future_b.state);
+
+ let join_handle = thread::spawn(move || {
+ // Let the other thread get to the wait point, then notify it.
+ std::thread::sleep(Duration::from_millis(50));
+ thread_notifier_a.notify();
+ });
+
+ // Wait on the other thread to finish its sleep, note that the leak only happened if we
+ // actually have to sleep here, not if we immediately return.
+ Sleeper::from_two_futures(future_a, future_b).wait();
+
+ join_handle.join().unwrap();
+
+ // then drop the notifiers and make sure the future states are gone.
+ mem::drop(notifier_a);
+ mem::drop(notifier_b);
+
+ assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
+ }
+
+ #[test]
+ fn test_future_callbacks() {
+ let future = Future {
+ state: Arc::new(Mutex::new(FutureState {
+ callbacks: Vec::new(),
+ callbacks_with_state: Vec::new(),
+ complete: false,
+ callbacks_made: false,
+ }))
+ };
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+
+ assert!(!callback.load(Ordering::SeqCst));
+ complete_future(&future.state);
+ assert!(callback.load(Ordering::SeqCst));
+ complete_future(&future.state);
+ }
+
+ #[test]
+ fn test_pre_completed_future_callbacks() {
+ let future = Future {
+ state: Arc::new(Mutex::new(FutureState {
+ callbacks: Vec::new(),
+ callbacks_with_state: Vec::new(),
+ complete: false,
+ callbacks_made: false,
+ }))
+ };
+ complete_future(&future.state);
+
+ let callback = Arc::new(AtomicBool::new(false));
+ let callback_ref = Arc::clone(&callback);
+ future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
+
+ assert!(callback.load(Ordering::SeqCst));
+ assert!(future.state.lock().unwrap().callbacks.is_empty());
+ }
+
+ // Rather annoyingly, there's no safe way in Rust std to construct a Waker despite it being
+ // totally possible to construct from a trait implementation (though somewhat less effecient
+ // compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
+ // waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
+ const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
+ unsafe fn wake_by_ref(ptr: *const ()) { let p = ptr as *const Arc<AtomicBool>; assert!(!(*p).fetch_or(true, Ordering::SeqCst)); }
+ unsafe fn drop(ptr: *const ()) { let p = ptr as *mut Arc<AtomicBool>; let _freed = Box::from_raw(p); }
+ unsafe fn wake(ptr: *const ()) { wake_by_ref(ptr); drop(ptr); }
+ unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
+ let p = ptr as *const Arc<AtomicBool>;
+ RawWaker::new(Box::into_raw(Box::new(Arc::clone(&*p))) as *const (), &WAKER_V_TABLE)
+ }
+
+ fn create_waker() -> (Arc<AtomicBool>, Waker) {
+ let a = Arc::new(AtomicBool::new(false));
+ let waker = unsafe { Waker::from_raw(waker_clone((&a as *const Arc<AtomicBool>) as *const ())) };
+ (a, waker)
+ }
+
+ #[test]
+ fn test_future() {
+ let mut future = Future {
+ state: Arc::new(Mutex::new(FutureState {
+ callbacks: Vec::new(),
+ callbacks_with_state: Vec::new(),
+ complete: false,
+ callbacks_made: false,
+ }))
+ };
+ let mut second_future = Future { state: Arc::clone(&future.state) };
+
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+ assert!(!woken.load(Ordering::SeqCst));
+
+ let (second_woken, second_waker) = create_waker();
+ assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending);
+ assert!(!second_woken.load(Ordering::SeqCst));
+
+ complete_future(&future.state);
+ assert!(woken.load(Ordering::SeqCst));
+ assert!(second_woken.load(Ordering::SeqCst));
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+ assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(()));
+ }
+
+ #[test]
+ #[cfg(feature = "std")]
+ fn test_dropped_future_doesnt_count() {
+ // Tests that if a Future gets drop'd before it is poll()ed `Ready` it doesn't count as
+ // having been woken, leaving the notify-required flag set.
+ let notifier = Notifier::new();
+ notifier.notify();
+
+ // If we get a future and don't touch it we're definitely still notify-required.
+ notifier.get_future();
+ assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
+ assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
+
+ // Even if we poll'd once but didn't observe a `Ready`, we should be notify-required.
+ let mut future = notifier.get_future();
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+
+ notifier.notify();
+ assert!(woken.load(Ordering::SeqCst));
+ assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
+
+ // However, once we do poll `Ready` it should wipe the notify-required flag.
+ let mut future = notifier.get_future();
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+
+ notifier.notify();
+ assert!(woken.load(Ordering::SeqCst));
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+ assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
+ }
+
+ #[test]
+ fn test_poll_post_notify_completes() {
+ // Tests that if we have a future state that has completed, and we haven't yet requested a
+ // new future, if we get a notify prior to requesting that second future it is generated
+ // pre-completed.
+ let notifier = Notifier::new();
+
+ notifier.notify();
+ let mut future = notifier.get_future();
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+ assert!(!woken.load(Ordering::SeqCst));
+
+ notifier.notify();
+ let mut future = notifier.get_future();
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+ assert!(!woken.load(Ordering::SeqCst));
+
+ let mut future = notifier.get_future();
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+ assert!(!woken.load(Ordering::SeqCst));
+
+ notifier.notify();
+ assert!(woken.load(Ordering::SeqCst));
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+ }
+
+ #[test]
+ fn test_poll_post_notify_completes_initial_notified() {
+ // Identical to the previous test, but the first future completes via a wake rather than an
+ // immediate `Poll::Ready`.
+ let notifier = Notifier::new();
+
+ let mut future = notifier.get_future();
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+
+ notifier.notify();
+ assert!(woken.load(Ordering::SeqCst));
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+
+ notifier.notify();
+ let mut future = notifier.get_future();
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+ assert!(!woken.load(Ordering::SeqCst));
+
+ let mut future = notifier.get_future();
+ let (woken, waker) = create_waker();
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
+ assert!(!woken.load(Ordering::SeqCst));
+
+ notifier.notify();
+ assert!(woken.load(Ordering::SeqCst));
+ assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
+ }
+
+ #[test]
+ #[cfg(feature = "std")]
+ fn test_multi_future_sleep() {
+ // Tests the `Sleeper` with multiple futures.
+ let notifier_a = Notifier::new();
+ let notifier_b = Notifier::new();
+
+ // Set both notifiers as woken without sleeping yet.
+ notifier_a.notify();
+ notifier_b.notify();
+ Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+
+ // One future has woken us up, but the other should still have a pending notification.
+ Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+
+ // However once we've slept twice, we should no longer have any pending notifications
+ assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
+ .wait_timeout(Duration::from_millis(10)));
+
+ // Test ordering somewhat more.
+ notifier_a.notify();
+ Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+ }
+
+ #[test]
+ #[cfg(feature = "std")]
+ fn sleeper_with_pending_callbacks() {
+ // This is similar to the above `test_multi_future_sleep` test, but in addition registers
+ // "normal" callbacks which will cause the futures to assume notification has occurred,
+ // rather than waiting for a woken sleeper.
+ let notifier_a = Notifier::new();
+ let notifier_b = Notifier::new();
+
+ // Set both notifiers as woken without sleeping yet.
+ notifier_a.notify();
+ notifier_b.notify();
+
+ // After sleeping one future (not guaranteed which one, however) will have its notification
+ // bit cleared.
+ Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+
+ // By registering a callback on the futures for both notifiers, one will complete
+ // immediately, but one will remain tied to the notifier, and will complete once the
+ // notifier is next woken, which will be considered the completion of the notification.
+ let callback_a = Arc::new(AtomicBool::new(false));
+ let callback_b = Arc::new(AtomicBool::new(false));
+ let callback_a_ref = Arc::clone(&callback_a);
+ let callback_b_ref = Arc::clone(&callback_b);
+ notifier_a.get_future().register_callback(Box::new(move || assert!(!callback_a_ref.fetch_or(true, Ordering::SeqCst))));
+ notifier_b.get_future().register_callback(Box::new(move || assert!(!callback_b_ref.fetch_or(true, Ordering::SeqCst))));
+ assert!(callback_a.load(Ordering::SeqCst) ^ callback_b.load(Ordering::SeqCst));
+
+ // If we now notify both notifiers again, the other callback will fire, completing the
+ // notification, and we'll be back to one pending notification.
+ notifier_a.notify();
+ notifier_b.notify();
+
+ assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
+ Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
+ assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
+ .wait_timeout(Duration::from_millis(10)));
+ }