Implement the ability to block on multiple futures at once
[rust-lightning] / lightning / src / util / wakers.rs
1 // This file is Copyright its original authors, visible in version control
2 // history.
3 //
4 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7 // You may not use this file except in accordance with one or both of these
8 // licenses.
9
10 //! Utilities which allow users to block on some future notification from LDK. These are
11 //! specifically used by [`ChannelManager`] to allow waiting until the [`ChannelManager`] needs to
12 //! be re-persisted.
13 //!
14 //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
15
16 use alloc::sync::Arc;
17 use core::mem;
18 use crate::sync::{Condvar, Mutex};
19
20 use crate::prelude::*;
21
22 #[cfg(any(test, feature = "std"))]
23 use std::time::Duration;
24
25 use core::future::Future as StdFuture;
26 use core::task::{Context, Poll};
27 use core::pin::Pin;
28
29
30 /// Used to signal to one of many waiters that the condition they're waiting on has happened.
31 pub(crate) struct Notifier {
32         notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
33 }
34
35 impl Notifier {
36         pub(crate) fn new() -> Self {
37                 Self {
38                         notify_pending: Mutex::new((false, None)),
39                 }
40         }
41
42         pub(crate) fn wait(&self) {
43                 Sleeper::from_single_future(self.get_future()).wait();
44         }
45
46         #[cfg(any(test, feature = "std"))]
47         pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
48                 Sleeper::from_single_future(self.get_future()).wait_timeout(max_wait)
49         }
50
51         /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
52         pub(crate) fn notify(&self) {
53                 let mut lock = self.notify_pending.lock().unwrap();
54                 if let Some(future_state) = &lock.1 {
55                         if future_state.lock().unwrap().complete() {
56                                 lock.1 = None;
57                                 return;
58                         }
59                 }
60                 lock.0 = true;
61         }
62
63         /// Gets a [`Future`] that will get woken up with any waiters
64         pub(crate) fn get_future(&self) -> Future {
65                 let mut lock = self.notify_pending.lock().unwrap();
66                 if let Some(existing_state) = &lock.1 {
67                         if existing_state.lock().unwrap().callbacks_made {
68                                 // If the existing `FutureState` has completed and actually made callbacks,
69                                 // consider the notification flag to have been cleared and reset the future state.
70                                 lock.1.take();
71                                 lock.0 = false;
72                         }
73                 }
74                 if let Some(existing_state) = &lock.1 {
75                         Future { state: Arc::clone(&existing_state) }
76                 } else {
77                         let state = Arc::new(Mutex::new(FutureState {
78                                 callbacks: Vec::new(),
79                                 complete: lock.0,
80                                 callbacks_made: false,
81                         }));
82                         lock.1 = Some(Arc::clone(&state));
83                         Future { state }
84                 }
85         }
86
87         #[cfg(any(test, feature = "_test_utils"))]
88         pub fn notify_pending(&self) -> bool {
89                 self.notify_pending.lock().unwrap().0
90         }
91 }
92
93 macro_rules! define_callback { ($($bounds: path),*) => {
94 /// A callback which is called when a [`Future`] completes.
95 ///
96 /// Note that this MUST NOT call back into LDK directly, it must instead schedule actions to be
97 /// taken later. Rust users should use the [`std::future::Future`] implementation for [`Future`]
98 /// instead.
99 ///
100 /// Note that the [`std::future::Future`] implementation may only work for runtimes which schedule
101 /// futures when they receive a wake, rather than immediately executing them.
102 pub trait FutureCallback : $($bounds +)* {
103         /// The method which is called.
104         fn call(&self);
105 }
106
107 impl<F: Fn() $(+ $bounds)*> FutureCallback for F {
108         fn call(&self) { (self)(); }
109 }
110 } }
111
112 #[cfg(feature = "std")]
113 define_callback!(Send);
114 #[cfg(not(feature = "std"))]
115 define_callback!();
116
117 pub(crate) struct FutureState {
118         // When we're tracking whether a callback counts as having woken the user's code, we check the
119         // first bool - set to false if we're just calling a Waker, and true if we're calling an actual
120         // user-provided function.
121         callbacks: Vec<(bool, Box<dyn FutureCallback>)>,
122         complete: bool,
123         callbacks_made: bool,
124 }
125
126 impl FutureState {
127         fn complete(&mut self) -> bool {
128                 for (counts_as_call, callback) in self.callbacks.drain(..) {
129                         callback.call();
130                         self.callbacks_made |= counts_as_call;
131                 }
132                 self.complete = true;
133                 self.callbacks_made
134         }
135 }
136
137 /// A simple future which can complete once, and calls some callback(s) when it does so.
138 ///
139 /// Clones can be made and all futures cloned from the same source will complete at the same time.
140 #[derive(Clone)]
141 pub struct Future {
142         state: Arc<Mutex<FutureState>>,
143 }
144
145 impl Future {
146         /// Registers a callback to be called upon completion of this future. If the future has already
147         /// completed, the callback will be called immediately.
148         ///
149         /// This is not exported to bindings users, use the bindings-only `register_callback_fn` instead
150         pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
151                 let mut state = self.state.lock().unwrap();
152                 if state.complete {
153                         state.callbacks_made = true;
154                         mem::drop(state);
155                         callback.call();
156                 } else {
157                         state.callbacks.push((true, callback));
158                 }
159         }
160
161         // C bindings don't (currently) know how to map `Box<dyn Trait>`, and while it could add the
162         // following wrapper, doing it in the bindings is currently much more work than simply doing it
163         // here.
164         /// Registers a callback to be called upon completion of this future. If the future has already
165         /// completed, the callback will be called immediately.
166         #[cfg(c_bindings)]
167         pub fn register_callback_fn<F: 'static + FutureCallback>(&self, callback: F) {
168                 self.register_callback(Box::new(callback));
169         }
170 }
171
172 use core::task::Waker;
173 struct StdWaker(pub Waker);
174 impl FutureCallback for StdWaker {
175         fn call(&self) { self.0.wake_by_ref() }
176 }
177
178 /// This is not exported to bindings users as Rust Futures aren't usable in language bindings.
179 impl<'a> StdFuture for Future {
180         type Output = ();
181
182         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
183                 let mut state = self.state.lock().unwrap();
184                 if state.complete {
185                         state.callbacks_made = true;
186                         Poll::Ready(())
187                 } else {
188                         let waker = cx.waker().clone();
189                         state.callbacks.push((false, Box::new(StdWaker(waker))));
190                         Poll::Pending
191                 }
192         }
193 }
194
195 /// A struct which can be used to select across many [`Future`]s at once without relying on a full
196 /// async context.
197 pub struct Sleeper {
198         notifiers: Vec<Arc<Mutex<FutureState>>>,
199 }
200
201 impl Sleeper {
202         /// Constructs a new sleeper from one future, allowing blocking on it.
203         pub fn from_single_future(future: Future) -> Self {
204                 Self { notifiers: vec![future.state] }
205         }
206         /// Constructs a new sleeper from two futures, allowing blocking on both at once.
207         // Note that this is the common case - a ChannelManager and ChainMonitor.
208         pub fn from_two_futures(fut_a: Future, fut_b: Future) -> Self {
209                 Self { notifiers: vec![fut_a.state, fut_b.state] }
210         }
211         /// Constructs a new sleeper on many futures, allowing blocking on all at once.
212         pub fn new(futures: Vec<Future>) -> Self {
213                 Self { notifiers: futures.into_iter().map(|f| f.state).collect() }
214         }
215         /// Prepares to go into a wait loop body, creating a condition variable which we can block on
216         /// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
217         /// condition variable being woken.
218         fn setup_wait(&self) -> (Arc<Condvar>, Arc<Mutex<Option<Arc<Mutex<FutureState>>>>>) {
219                 let cv = Arc::new(Condvar::new());
220                 let notified_fut_mtx = Arc::new(Mutex::new(None));
221                 {
222                         for notifier_mtx in self.notifiers.iter() {
223                                 let cv_ref = Arc::clone(&cv);
224                                 let notified_fut_ref = Arc::clone(&notified_fut_mtx);
225                                 let notifier_ref = Arc::clone(&notifier_mtx);
226                                 let mut notifier = notifier_mtx.lock().unwrap();
227                                 if notifier.complete {
228                                         *notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
229                                         break;
230                                 }
231                                 notifier.callbacks.push((false, Box::new(move || {
232                                         *notified_fut_ref.lock().unwrap() = Some(Arc::clone(&notifier_ref));
233                                         cv_ref.notify_all();
234                                 })));
235                         }
236                 }
237                 (cv, notified_fut_mtx)
238         }
239
240         /// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
241         pub fn wait(&self) {
242                 let (cv, notified_fut_mtx) = self.setup_wait();
243                 let notified_fut = cv.wait_while(notified_fut_mtx.lock().unwrap(), |fut_opt| fut_opt.is_none())
244                         .unwrap().take().expect("CV wait shouldn't have returned until the notifying future was set");
245                 notified_fut.lock().unwrap().callbacks_made = true;
246         }
247
248         /// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed or the
249         /// given amount of time has elapsed. Returns true if a [`Future`] completed, false if the time
250         /// elapsed.
251         #[cfg(any(test, feature = "std"))]
252         pub fn wait_timeout(&self, max_wait: Duration) -> bool {
253                 let (cv, notified_fut_mtx) = self.setup_wait();
254                 let notified_fut =
255                         match cv.wait_timeout_while(notified_fut_mtx.lock().unwrap(), max_wait, |fut_opt| fut_opt.is_none()) {
256                                 Ok((_, e)) if e.timed_out() => return false,
257                                 Ok((mut notified_fut, _)) =>
258                                         notified_fut.take().expect("CV wait shouldn't have returned until the notifying future was set"),
259                                 Err(_) => panic!("Previous panic while a lock was held led to a lock panic"),
260                         };
261                 notified_fut.lock().unwrap().callbacks_made = true;
262                 true
263         }
264 }
265
266 #[cfg(test)]
267 mod tests {
268         use super::*;
269         use core::sync::atomic::{AtomicBool, Ordering};
270         use core::future::Future as FutureTrait;
271         use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
272
273         #[test]
274         fn notifier_pre_notified_future() {
275                 // Previously, if we generated a future after a `Notifier` had been notified, the future
276                 // would never complete. This tests this behavior, ensuring the future instead completes
277                 // immediately.
278                 let notifier = Notifier::new();
279                 notifier.notify();
280
281                 let callback = Arc::new(AtomicBool::new(false));
282                 let callback_ref = Arc::clone(&callback);
283                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
284                 assert!(callback.load(Ordering::SeqCst));
285         }
286
287         #[test]
288         fn notifier_future_completes_wake() {
289                 // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
290                 // been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
291                 // `lightning-background-processor` to persist in a tight loop.
292                 let notifier = Notifier::new();
293
294                 // First check the simple case, ensuring if we get notified a new future isn't woken until
295                 // a second `notify`.
296                 let callback = Arc::new(AtomicBool::new(false));
297                 let callback_ref = Arc::clone(&callback);
298                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
299                 assert!(!callback.load(Ordering::SeqCst));
300
301                 notifier.notify();
302                 assert!(callback.load(Ordering::SeqCst));
303
304                 let callback = Arc::new(AtomicBool::new(false));
305                 let callback_ref = Arc::clone(&callback);
306                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
307                 assert!(!callback.load(Ordering::SeqCst));
308
309                 notifier.notify();
310                 assert!(callback.load(Ordering::SeqCst));
311
312                 // Then check the case where the future is fetched before the notification, but a callback
313                 // is only registered after the `notify`, ensuring that it is still sufficient to ensure we
314                 // don't get an instant-wake when we get a new future.
315                 let future = notifier.get_future();
316                 notifier.notify();
317
318                 let callback = Arc::new(AtomicBool::new(false));
319                 let callback_ref = Arc::clone(&callback);
320                 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
321                 assert!(callback.load(Ordering::SeqCst));
322
323                 let callback = Arc::new(AtomicBool::new(false));
324                 let callback_ref = Arc::clone(&callback);
325                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
326                 assert!(!callback.load(Ordering::SeqCst));
327         }
328
329         #[test]
330         fn new_future_wipes_notify_bit() {
331                 // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
332                 // been notified, we'd never mark the notifier as not-awaiting-notify if a `Future` is
333                 // fetched after the notify bit has been set.
334                 let notifier = Notifier::new();
335                 notifier.notify();
336
337                 let callback = Arc::new(AtomicBool::new(false));
338                 let callback_ref = Arc::clone(&callback);
339                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
340                 assert!(callback.load(Ordering::SeqCst));
341
342                 let callback = Arc::new(AtomicBool::new(false));
343                 let callback_ref = Arc::clone(&callback);
344                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
345                 assert!(!callback.load(Ordering::SeqCst));
346
347                 notifier.notify();
348                 assert!(callback.load(Ordering::SeqCst));
349         }
350
351         #[cfg(feature = "std")]
352         #[test]
353         fn test_wait_timeout() {
354                 use crate::sync::Arc;
355                 use std::thread;
356
357                 let persistence_notifier = Arc::new(Notifier::new());
358                 let thread_notifier = Arc::clone(&persistence_notifier);
359
360                 let exit_thread = Arc::new(AtomicBool::new(false));
361                 let exit_thread_clone = exit_thread.clone();
362                 thread::spawn(move || {
363                         loop {
364                                 thread_notifier.notify();
365                                 if exit_thread_clone.load(Ordering::SeqCst) {
366                                         break
367                                 }
368                         }
369                 });
370
371                 // Check that we can block indefinitely until updates are available.
372                 let _ = persistence_notifier.wait();
373
374                 // Check that the Notifier will return after the given duration if updates are
375                 // available.
376                 loop {
377                         if persistence_notifier.wait_timeout(Duration::from_millis(100)) {
378                                 break
379                         }
380                 }
381
382                 exit_thread.store(true, Ordering::SeqCst);
383
384                 // Check that the Notifier will return after the given duration even if no updates
385                 // are available.
386                 loop {
387                         if !persistence_notifier.wait_timeout(Duration::from_millis(100)) {
388                                 break
389                         }
390                 }
391         }
392
393         #[test]
394         fn test_future_callbacks() {
395                 let future = Future {
396                         state: Arc::new(Mutex::new(FutureState {
397                                 callbacks: Vec::new(),
398                                 complete: false,
399                                 callbacks_made: false,
400                         }))
401                 };
402                 let callback = Arc::new(AtomicBool::new(false));
403                 let callback_ref = Arc::clone(&callback);
404                 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
405
406                 assert!(!callback.load(Ordering::SeqCst));
407                 future.state.lock().unwrap().complete();
408                 assert!(callback.load(Ordering::SeqCst));
409                 future.state.lock().unwrap().complete();
410         }
411
412         #[test]
413         fn test_pre_completed_future_callbacks() {
414                 let future = Future {
415                         state: Arc::new(Mutex::new(FutureState {
416                                 callbacks: Vec::new(),
417                                 complete: false,
418                                 callbacks_made: false,
419                         }))
420                 };
421                 future.state.lock().unwrap().complete();
422
423                 let callback = Arc::new(AtomicBool::new(false));
424                 let callback_ref = Arc::clone(&callback);
425                 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
426
427                 assert!(callback.load(Ordering::SeqCst));
428                 assert!(future.state.lock().unwrap().callbacks.is_empty());
429         }
430
431         // Rather annoyingly, there's no safe way in Rust std to construct a Waker despite it being
432         // totally possible to construct from a trait implementation (though somewhat less effecient
433         // compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
434         // waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
435         const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
436         unsafe fn wake_by_ref(ptr: *const ()) { let p = ptr as *const Arc<AtomicBool>; assert!(!(*p).fetch_or(true, Ordering::SeqCst)); }
437         unsafe fn drop(ptr: *const ()) { let p = ptr as *mut Arc<AtomicBool>; let _freed = Box::from_raw(p); }
438         unsafe fn wake(ptr: *const ()) { wake_by_ref(ptr); drop(ptr); }
439         unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
440                 let p = ptr as *const Arc<AtomicBool>;
441                 RawWaker::new(Box::into_raw(Box::new(Arc::clone(&*p))) as *const (), &WAKER_V_TABLE)
442         }
443
444         fn create_waker() -> (Arc<AtomicBool>, Waker) {
445                 let a = Arc::new(AtomicBool::new(false));
446                 let waker = unsafe { Waker::from_raw(waker_clone((&a as *const Arc<AtomicBool>) as *const ())) };
447                 (a, waker)
448         }
449
450         #[test]
451         fn test_future() {
452                 let mut future = Future {
453                         state: Arc::new(Mutex::new(FutureState {
454                                 callbacks: Vec::new(),
455                                 complete: false,
456                                 callbacks_made: false,
457                         }))
458                 };
459                 let mut second_future = Future { state: Arc::clone(&future.state) };
460
461                 let (woken, waker) = create_waker();
462                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
463                 assert!(!woken.load(Ordering::SeqCst));
464
465                 let (second_woken, second_waker) = create_waker();
466                 assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending);
467                 assert!(!second_woken.load(Ordering::SeqCst));
468
469                 future.state.lock().unwrap().complete();
470                 assert!(woken.load(Ordering::SeqCst));
471                 assert!(second_woken.load(Ordering::SeqCst));
472                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
473                 assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(()));
474         }
475
476         #[test]
477         fn test_dropped_future_doesnt_count() {
478                 // Tests that if a Future gets drop'd before it is poll()ed `Ready` it doesn't count as
479                 // having been woken, leaving the notify-required flag set.
480                 let notifier = Notifier::new();
481                 notifier.notify();
482
483                 // If we get a future and don't touch it we're definitely still notify-required.
484                 notifier.get_future();
485                 assert!(notifier.wait_timeout(Duration::from_millis(1)));
486                 assert!(!notifier.wait_timeout(Duration::from_millis(1)));
487
488                 // Even if we poll'd once but didn't observe a `Ready`, we should be notify-required.
489                 let mut future = notifier.get_future();
490                 let (woken, waker) = create_waker();
491                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
492
493                 notifier.notify();
494                 assert!(woken.load(Ordering::SeqCst));
495                 assert!(notifier.wait_timeout(Duration::from_millis(1)));
496
497                 // However, once we do poll `Ready` it should wipe the notify-required flag.
498                 let mut future = notifier.get_future();
499                 let (woken, waker) = create_waker();
500                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
501
502                 notifier.notify();
503                 assert!(woken.load(Ordering::SeqCst));
504                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
505                 assert!(!notifier.wait_timeout(Duration::from_millis(1)));
506         }
507
508         #[test]
509         fn test_poll_post_notify_completes() {
510                 // Tests that if we have a future state that has completed, and we haven't yet requested a
511                 // new future, if we get a notify prior to requesting that second future it is generated
512                 // pre-completed.
513                 let notifier = Notifier::new();
514
515                 notifier.notify();
516                 let mut future = notifier.get_future();
517                 let (woken, waker) = create_waker();
518                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
519                 assert!(!woken.load(Ordering::SeqCst));
520
521                 notifier.notify();
522                 let mut future = notifier.get_future();
523                 let (woken, waker) = create_waker();
524                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
525                 assert!(!woken.load(Ordering::SeqCst));
526
527                 let mut future = notifier.get_future();
528                 let (woken, waker) = create_waker();
529                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
530                 assert!(!woken.load(Ordering::SeqCst));
531
532                 notifier.notify();
533                 assert!(woken.load(Ordering::SeqCst));
534                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
535         }
536
537         #[test]
538         fn test_poll_post_notify_completes_initial_notified() {
539                 // Identical to the previous test, but the first future completes via a wake rather than an
540                 // immediate `Poll::Ready`.
541                 let notifier = Notifier::new();
542
543                 let mut future = notifier.get_future();
544                 let (woken, waker) = create_waker();
545                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
546
547                 notifier.notify();
548                 assert!(woken.load(Ordering::SeqCst));
549                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
550
551                 notifier.notify();
552                 let mut future = notifier.get_future();
553                 let (woken, waker) = create_waker();
554                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
555                 assert!(!woken.load(Ordering::SeqCst));
556
557                 let mut future = notifier.get_future();
558                 let (woken, waker) = create_waker();
559                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
560                 assert!(!woken.load(Ordering::SeqCst));
561
562                 notifier.notify();
563                 assert!(woken.load(Ordering::SeqCst));
564                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
565         }
566
567         #[test]
568         fn test_multi_future_sleep() {
569                 // Tests the `Sleeper` with multiple futures.
570                 let notifier_a = Notifier::new();
571                 let notifier_b = Notifier::new();
572
573                 // Set both notifiers as woken without sleeping yet.
574                 notifier_a.notify();
575                 notifier_b.notify();
576                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
577
578                 // One future has woken us up, but the other should still have a pending notification.
579                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
580
581                 // However once we've slept twice, we should no longer have any pending notifications
582                 assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
583                         .wait_timeout(Duration::from_millis(10)));
584
585                 // Test ordering somewhat more.
586                 notifier_a.notify();
587                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
588         }
589
590         #[test]
591         #[cfg(feature = "std")]
592         fn sleeper_with_pending_callbacks() {
593                 // This is similar to the above `test_multi_future_sleep` test, but in addition registers
594                 // "normal" callbacks which will cause the futures to assume notification has occurred,
595                 // rather than waiting for a woken sleeper.
596                 let notifier_a = Notifier::new();
597                 let notifier_b = Notifier::new();
598
599                 // Set both notifiers as woken without sleeping yet.
600                 notifier_a.notify();
601                 notifier_b.notify();
602
603                 // After sleeping one future (not guaranteed which one, however) will have its notification
604                 // bit cleared.
605                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
606
607                 // By registering a callback on the futures for both notifiers, one will complete
608                 // immediately, but one will remain tied to the notifier, and will complete once the
609                 // notifier is next woken, which will be considered the completion of the notification.
610                 let callback_a = Arc::new(AtomicBool::new(false));
611                 let callback_b = Arc::new(AtomicBool::new(false));
612                 let callback_a_ref = Arc::clone(&callback_a);
613                 let callback_b_ref = Arc::clone(&callback_b);
614                 notifier_a.get_future().register_callback(Box::new(move || assert!(!callback_a_ref.fetch_or(true, Ordering::SeqCst))));
615                 notifier_b.get_future().register_callback(Box::new(move || assert!(!callback_b_ref.fetch_or(true, Ordering::SeqCst))));
616                 assert!(callback_a.load(Ordering::SeqCst) ^ callback_b.load(Ordering::SeqCst));
617
618                 // If we now notify both notifiers again, the other callback will fire, completing the
619                 // notification, and we'll be back to one pending notification.
620                 notifier_a.notify();
621                 notifier_b.notify();
622
623                 assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
624                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
625                 assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
626                         .wait_timeout(Duration::from_millis(10)));
627         }
628 }