Move the pub `wait` methods from `ChannelManager` to `Future`
[rust-lightning] / lightning / src / util / wakers.rs
1 // This file is Copyright its original authors, visible in version control
2 // history.
3 //
4 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7 // You may not use this file except in accordance with one or both of these
8 // licenses.
9
10 //! Utilities which allow users to block on some future notification from LDK. These are
11 //! specifically used by [`ChannelManager`] to allow waiting until the [`ChannelManager`] needs to
12 //! be re-persisted.
13 //!
14 //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
15
16 use alloc::sync::Arc;
17 use core::mem;
18 use crate::sync::{Condvar, Mutex};
19
20 use crate::prelude::*;
21
22 #[cfg(any(test, feature = "std"))]
23 use std::time::Duration;
24
25 use core::future::Future as StdFuture;
26 use core::task::{Context, Poll};
27 use core::pin::Pin;
28
29
30 /// Used to signal to one of many waiters that the condition they're waiting on has happened.
31 pub(crate) struct Notifier {
32         notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
33 }
34
35 impl Notifier {
36         pub(crate) fn new() -> Self {
37                 Self {
38                         notify_pending: Mutex::new((false, None)),
39                 }
40         }
41
42         /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
43         pub(crate) fn notify(&self) {
44                 let mut lock = self.notify_pending.lock().unwrap();
45                 if let Some(future_state) = &lock.1 {
46                         if future_state.lock().unwrap().complete() {
47                                 lock.1 = None;
48                                 return;
49                         }
50                 }
51                 lock.0 = true;
52         }
53
54         /// Gets a [`Future`] that will get woken up with any waiters
55         pub(crate) fn get_future(&self) -> Future {
56                 let mut lock = self.notify_pending.lock().unwrap();
57                 if let Some(existing_state) = &lock.1 {
58                         if existing_state.lock().unwrap().callbacks_made {
59                                 // If the existing `FutureState` has completed and actually made callbacks,
60                                 // consider the notification flag to have been cleared and reset the future state.
61                                 lock.1.take();
62                                 lock.0 = false;
63                         }
64                 }
65                 if let Some(existing_state) = &lock.1 {
66                         Future { state: Arc::clone(&existing_state) }
67                 } else {
68                         let state = Arc::new(Mutex::new(FutureState {
69                                 callbacks: Vec::new(),
70                                 complete: lock.0,
71                                 callbacks_made: false,
72                         }));
73                         lock.1 = Some(Arc::clone(&state));
74                         Future { state }
75                 }
76         }
77
78         #[cfg(any(test, feature = "_test_utils"))]
79         pub fn notify_pending(&self) -> bool {
80                 self.notify_pending.lock().unwrap().0
81         }
82 }
83
84 macro_rules! define_callback { ($($bounds: path),*) => {
85 /// A callback which is called when a [`Future`] completes.
86 ///
87 /// Note that this MUST NOT call back into LDK directly, it must instead schedule actions to be
88 /// taken later. Rust users should use the [`std::future::Future`] implementation for [`Future`]
89 /// instead.
90 ///
91 /// Note that the [`std::future::Future`] implementation may only work for runtimes which schedule
92 /// futures when they receive a wake, rather than immediately executing them.
93 pub trait FutureCallback : $($bounds +)* {
94         /// The method which is called.
95         fn call(&self);
96 }
97
98 impl<F: Fn() $(+ $bounds)*> FutureCallback for F {
99         fn call(&self) { (self)(); }
100 }
101 } }
102
103 #[cfg(feature = "std")]
104 define_callback!(Send);
105 #[cfg(not(feature = "std"))]
106 define_callback!();
107
108 pub(crate) struct FutureState {
109         // When we're tracking whether a callback counts as having woken the user's code, we check the
110         // first bool - set to false if we're just calling a Waker, and true if we're calling an actual
111         // user-provided function.
112         callbacks: Vec<(bool, Box<dyn FutureCallback>)>,
113         complete: bool,
114         callbacks_made: bool,
115 }
116
117 impl FutureState {
118         fn complete(&mut self) -> bool {
119                 for (counts_as_call, callback) in self.callbacks.drain(..) {
120                         callback.call();
121                         self.callbacks_made |= counts_as_call;
122                 }
123                 self.complete = true;
124                 self.callbacks_made
125         }
126 }
127
128 /// A simple future which can complete once, and calls some callback(s) when it does so.
129 ///
130 /// Clones can be made and all futures cloned from the same source will complete at the same time.
131 #[derive(Clone)]
132 pub struct Future {
133         state: Arc<Mutex<FutureState>>,
134 }
135
136 impl Future {
137         /// Registers a callback to be called upon completion of this future. If the future has already
138         /// completed, the callback will be called immediately.
139         ///
140         /// This is not exported to bindings users, use the bindings-only `register_callback_fn` instead
141         pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
142                 let mut state = self.state.lock().unwrap();
143                 if state.complete {
144                         state.callbacks_made = true;
145                         mem::drop(state);
146                         callback.call();
147                 } else {
148                         state.callbacks.push((true, callback));
149                 }
150         }
151
152         // C bindings don't (currently) know how to map `Box<dyn Trait>`, and while it could add the
153         // following wrapper, doing it in the bindings is currently much more work than simply doing it
154         // here.
155         /// Registers a callback to be called upon completion of this future. If the future has already
156         /// completed, the callback will be called immediately.
157         #[cfg(c_bindings)]
158         pub fn register_callback_fn<F: 'static + FutureCallback>(&self, callback: F) {
159                 self.register_callback(Box::new(callback));
160         }
161
162         /// Waits until this [`Future`] completes.
163         pub fn wait(self) {
164                 Sleeper::from_single_future(self).wait();
165         }
166
167         /// Waits until this [`Future`] completes or the given amount of time has elapsed.
168         ///
169         /// Returns true if the [`Future`] completed, false if the time elapsed.
170         #[cfg(any(test, feature = "std"))]
171         pub fn wait_timeout(self, max_wait: Duration) -> bool {
172                 Sleeper::from_single_future(self).wait_timeout(max_wait)
173         }
174 }
175
176 use core::task::Waker;
177 struct StdWaker(pub Waker);
178 impl FutureCallback for StdWaker {
179         fn call(&self) { self.0.wake_by_ref() }
180 }
181
182 /// This is not exported to bindings users as Rust Futures aren't usable in language bindings.
183 impl<'a> StdFuture for Future {
184         type Output = ();
185
186         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
187                 let mut state = self.state.lock().unwrap();
188                 if state.complete {
189                         state.callbacks_made = true;
190                         Poll::Ready(())
191                 } else {
192                         let waker = cx.waker().clone();
193                         state.callbacks.push((false, Box::new(StdWaker(waker))));
194                         Poll::Pending
195                 }
196         }
197 }
198
199 /// A struct which can be used to select across many [`Future`]s at once without relying on a full
200 /// async context.
201 pub struct Sleeper {
202         notifiers: Vec<Arc<Mutex<FutureState>>>,
203 }
204
205 impl Sleeper {
206         /// Constructs a new sleeper from one future, allowing blocking on it.
207         pub fn from_single_future(future: Future) -> Self {
208                 Self { notifiers: vec![future.state] }
209         }
210         /// Constructs a new sleeper from two futures, allowing blocking on both at once.
211         // Note that this is the common case - a ChannelManager and ChainMonitor.
212         pub fn from_two_futures(fut_a: Future, fut_b: Future) -> Self {
213                 Self { notifiers: vec![fut_a.state, fut_b.state] }
214         }
215         /// Constructs a new sleeper on many futures, allowing blocking on all at once.
216         pub fn new(futures: Vec<Future>) -> Self {
217                 Self { notifiers: futures.into_iter().map(|f| f.state).collect() }
218         }
219         /// Prepares to go into a wait loop body, creating a condition variable which we can block on
220         /// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
221         /// condition variable being woken.
222         fn setup_wait(&self) -> (Arc<Condvar>, Arc<Mutex<Option<Arc<Mutex<FutureState>>>>>) {
223                 let cv = Arc::new(Condvar::new());
224                 let notified_fut_mtx = Arc::new(Mutex::new(None));
225                 {
226                         for notifier_mtx in self.notifiers.iter() {
227                                 let cv_ref = Arc::clone(&cv);
228                                 let notified_fut_ref = Arc::clone(&notified_fut_mtx);
229                                 let notifier_ref = Arc::clone(&notifier_mtx);
230                                 let mut notifier = notifier_mtx.lock().unwrap();
231                                 if notifier.complete {
232                                         *notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
233                                         break;
234                                 }
235                                 notifier.callbacks.push((false, Box::new(move || {
236                                         *notified_fut_ref.lock().unwrap() = Some(Arc::clone(&notifier_ref));
237                                         cv_ref.notify_all();
238                                 })));
239                         }
240                 }
241                 (cv, notified_fut_mtx)
242         }
243
244         /// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
245         pub fn wait(&self) {
246                 let (cv, notified_fut_mtx) = self.setup_wait();
247                 let notified_fut = cv.wait_while(notified_fut_mtx.lock().unwrap(), |fut_opt| fut_opt.is_none())
248                         .unwrap().take().expect("CV wait shouldn't have returned until the notifying future was set");
249                 notified_fut.lock().unwrap().callbacks_made = true;
250         }
251
252         /// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed or the
253         /// given amount of time has elapsed. Returns true if a [`Future`] completed, false if the time
254         /// elapsed.
255         #[cfg(any(test, feature = "std"))]
256         pub fn wait_timeout(&self, max_wait: Duration) -> bool {
257                 let (cv, notified_fut_mtx) = self.setup_wait();
258                 let notified_fut =
259                         match cv.wait_timeout_while(notified_fut_mtx.lock().unwrap(), max_wait, |fut_opt| fut_opt.is_none()) {
260                                 Ok((_, e)) if e.timed_out() => return false,
261                                 Ok((mut notified_fut, _)) =>
262                                         notified_fut.take().expect("CV wait shouldn't have returned until the notifying future was set"),
263                                 Err(_) => panic!("Previous panic while a lock was held led to a lock panic"),
264                         };
265                 notified_fut.lock().unwrap().callbacks_made = true;
266                 true
267         }
268 }
269
270 #[cfg(test)]
271 mod tests {
272         use super::*;
273         use core::sync::atomic::{AtomicBool, Ordering};
274         use core::future::Future as FutureTrait;
275         use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
276
277         #[test]
278         fn notifier_pre_notified_future() {
279                 // Previously, if we generated a future after a `Notifier` had been notified, the future
280                 // would never complete. This tests this behavior, ensuring the future instead completes
281                 // immediately.
282                 let notifier = Notifier::new();
283                 notifier.notify();
284
285                 let callback = Arc::new(AtomicBool::new(false));
286                 let callback_ref = Arc::clone(&callback);
287                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
288                 assert!(callback.load(Ordering::SeqCst));
289         }
290
291         #[test]
292         fn notifier_future_completes_wake() {
293                 // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
294                 // been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
295                 // `lightning-background-processor` to persist in a tight loop.
296                 let notifier = Notifier::new();
297
298                 // First check the simple case, ensuring if we get notified a new future isn't woken until
299                 // a second `notify`.
300                 let callback = Arc::new(AtomicBool::new(false));
301                 let callback_ref = Arc::clone(&callback);
302                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
303                 assert!(!callback.load(Ordering::SeqCst));
304
305                 notifier.notify();
306                 assert!(callback.load(Ordering::SeqCst));
307
308                 let callback = Arc::new(AtomicBool::new(false));
309                 let callback_ref = Arc::clone(&callback);
310                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
311                 assert!(!callback.load(Ordering::SeqCst));
312
313                 notifier.notify();
314                 assert!(callback.load(Ordering::SeqCst));
315
316                 // Then check the case where the future is fetched before the notification, but a callback
317                 // is only registered after the `notify`, ensuring that it is still sufficient to ensure we
318                 // don't get an instant-wake when we get a new future.
319                 let future = notifier.get_future();
320                 notifier.notify();
321
322                 let callback = Arc::new(AtomicBool::new(false));
323                 let callback_ref = Arc::clone(&callback);
324                 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
325                 assert!(callback.load(Ordering::SeqCst));
326
327                 let callback = Arc::new(AtomicBool::new(false));
328                 let callback_ref = Arc::clone(&callback);
329                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
330                 assert!(!callback.load(Ordering::SeqCst));
331         }
332
333         #[test]
334         fn new_future_wipes_notify_bit() {
335                 // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
336                 // been notified, we'd never mark the notifier as not-awaiting-notify if a `Future` is
337                 // fetched after the notify bit has been set.
338                 let notifier = Notifier::new();
339                 notifier.notify();
340
341                 let callback = Arc::new(AtomicBool::new(false));
342                 let callback_ref = Arc::clone(&callback);
343                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
344                 assert!(callback.load(Ordering::SeqCst));
345
346                 let callback = Arc::new(AtomicBool::new(false));
347                 let callback_ref = Arc::clone(&callback);
348                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
349                 assert!(!callback.load(Ordering::SeqCst));
350
351                 notifier.notify();
352                 assert!(callback.load(Ordering::SeqCst));
353         }
354
355         #[cfg(feature = "std")]
356         #[test]
357         fn test_wait_timeout() {
358                 use crate::sync::Arc;
359                 use std::thread;
360
361                 let persistence_notifier = Arc::new(Notifier::new());
362                 let thread_notifier = Arc::clone(&persistence_notifier);
363
364                 let exit_thread = Arc::new(AtomicBool::new(false));
365                 let exit_thread_clone = exit_thread.clone();
366                 thread::spawn(move || {
367                         loop {
368                                 thread_notifier.notify();
369                                 if exit_thread_clone.load(Ordering::SeqCst) {
370                                         break
371                                 }
372                         }
373                 });
374
375                 // Check that we can block indefinitely until updates are available.
376                 let _ = persistence_notifier.get_future().wait();
377
378                 // Check that the Notifier will return after the given duration if updates are
379                 // available.
380                 loop {
381                         if persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
382                                 break
383                         }
384                 }
385
386                 exit_thread.store(true, Ordering::SeqCst);
387
388                 // Check that the Notifier will return after the given duration even if no updates
389                 // are available.
390                 loop {
391                         if !persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
392                                 break
393                         }
394                 }
395         }
396
397         #[test]
398         fn test_future_callbacks() {
399                 let future = Future {
400                         state: Arc::new(Mutex::new(FutureState {
401                                 callbacks: Vec::new(),
402                                 complete: false,
403                                 callbacks_made: false,
404                         }))
405                 };
406                 let callback = Arc::new(AtomicBool::new(false));
407                 let callback_ref = Arc::clone(&callback);
408                 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
409
410                 assert!(!callback.load(Ordering::SeqCst));
411                 future.state.lock().unwrap().complete();
412                 assert!(callback.load(Ordering::SeqCst));
413                 future.state.lock().unwrap().complete();
414         }
415
416         #[test]
417         fn test_pre_completed_future_callbacks() {
418                 let future = Future {
419                         state: Arc::new(Mutex::new(FutureState {
420                                 callbacks: Vec::new(),
421                                 complete: false,
422                                 callbacks_made: false,
423                         }))
424                 };
425                 future.state.lock().unwrap().complete();
426
427                 let callback = Arc::new(AtomicBool::new(false));
428                 let callback_ref = Arc::clone(&callback);
429                 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
430
431                 assert!(callback.load(Ordering::SeqCst));
432                 assert!(future.state.lock().unwrap().callbacks.is_empty());
433         }
434
435         // Rather annoyingly, there's no safe way in Rust std to construct a Waker despite it being
436         // totally possible to construct from a trait implementation (though somewhat less effecient
437         // compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
438         // waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
439         const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
440         unsafe fn wake_by_ref(ptr: *const ()) { let p = ptr as *const Arc<AtomicBool>; assert!(!(*p).fetch_or(true, Ordering::SeqCst)); }
441         unsafe fn drop(ptr: *const ()) { let p = ptr as *mut Arc<AtomicBool>; let _freed = Box::from_raw(p); }
442         unsafe fn wake(ptr: *const ()) { wake_by_ref(ptr); drop(ptr); }
443         unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
444                 let p = ptr as *const Arc<AtomicBool>;
445                 RawWaker::new(Box::into_raw(Box::new(Arc::clone(&*p))) as *const (), &WAKER_V_TABLE)
446         }
447
448         fn create_waker() -> (Arc<AtomicBool>, Waker) {
449                 let a = Arc::new(AtomicBool::new(false));
450                 let waker = unsafe { Waker::from_raw(waker_clone((&a as *const Arc<AtomicBool>) as *const ())) };
451                 (a, waker)
452         }
453
454         #[test]
455         fn test_future() {
456                 let mut future = Future {
457                         state: Arc::new(Mutex::new(FutureState {
458                                 callbacks: Vec::new(),
459                                 complete: false,
460                                 callbacks_made: false,
461                         }))
462                 };
463                 let mut second_future = Future { state: Arc::clone(&future.state) };
464
465                 let (woken, waker) = create_waker();
466                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
467                 assert!(!woken.load(Ordering::SeqCst));
468
469                 let (second_woken, second_waker) = create_waker();
470                 assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending);
471                 assert!(!second_woken.load(Ordering::SeqCst));
472
473                 future.state.lock().unwrap().complete();
474                 assert!(woken.load(Ordering::SeqCst));
475                 assert!(second_woken.load(Ordering::SeqCst));
476                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
477                 assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(()));
478         }
479
480         #[test]
481         fn test_dropped_future_doesnt_count() {
482                 // Tests that if a Future gets drop'd before it is poll()ed `Ready` it doesn't count as
483                 // having been woken, leaving the notify-required flag set.
484                 let notifier = Notifier::new();
485                 notifier.notify();
486
487                 // If we get a future and don't touch it we're definitely still notify-required.
488                 notifier.get_future();
489                 assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
490                 assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
491
492                 // Even if we poll'd once but didn't observe a `Ready`, we should be notify-required.
493                 let mut future = notifier.get_future();
494                 let (woken, waker) = create_waker();
495                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
496
497                 notifier.notify();
498                 assert!(woken.load(Ordering::SeqCst));
499                 assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
500
501                 // However, once we do poll `Ready` it should wipe the notify-required flag.
502                 let mut future = notifier.get_future();
503                 let (woken, waker) = create_waker();
504                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
505
506                 notifier.notify();
507                 assert!(woken.load(Ordering::SeqCst));
508                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
509                 assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
510         }
511
512         #[test]
513         fn test_poll_post_notify_completes() {
514                 // Tests that if we have a future state that has completed, and we haven't yet requested a
515                 // new future, if we get a notify prior to requesting that second future it is generated
516                 // pre-completed.
517                 let notifier = Notifier::new();
518
519                 notifier.notify();
520                 let mut future = notifier.get_future();
521                 let (woken, waker) = create_waker();
522                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
523                 assert!(!woken.load(Ordering::SeqCst));
524
525                 notifier.notify();
526                 let mut future = notifier.get_future();
527                 let (woken, waker) = create_waker();
528                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
529                 assert!(!woken.load(Ordering::SeqCst));
530
531                 let mut future = notifier.get_future();
532                 let (woken, waker) = create_waker();
533                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
534                 assert!(!woken.load(Ordering::SeqCst));
535
536                 notifier.notify();
537                 assert!(woken.load(Ordering::SeqCst));
538                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
539         }
540
541         #[test]
542         fn test_poll_post_notify_completes_initial_notified() {
543                 // Identical to the previous test, but the first future completes via a wake rather than an
544                 // immediate `Poll::Ready`.
545                 let notifier = Notifier::new();
546
547                 let mut future = notifier.get_future();
548                 let (woken, waker) = create_waker();
549                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
550
551                 notifier.notify();
552                 assert!(woken.load(Ordering::SeqCst));
553                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
554
555                 notifier.notify();
556                 let mut future = notifier.get_future();
557                 let (woken, waker) = create_waker();
558                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
559                 assert!(!woken.load(Ordering::SeqCst));
560
561                 let mut future = notifier.get_future();
562                 let (woken, waker) = create_waker();
563                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
564                 assert!(!woken.load(Ordering::SeqCst));
565
566                 notifier.notify();
567                 assert!(woken.load(Ordering::SeqCst));
568                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
569         }
570
571         #[test]
572         fn test_multi_future_sleep() {
573                 // Tests the `Sleeper` with multiple futures.
574                 let notifier_a = Notifier::new();
575                 let notifier_b = Notifier::new();
576
577                 // Set both notifiers as woken without sleeping yet.
578                 notifier_a.notify();
579                 notifier_b.notify();
580                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
581
582                 // One future has woken us up, but the other should still have a pending notification.
583                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
584
585                 // However once we've slept twice, we should no longer have any pending notifications
586                 assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
587                         .wait_timeout(Duration::from_millis(10)));
588
589                 // Test ordering somewhat more.
590                 notifier_a.notify();
591                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
592         }
593
594         #[test]
595         #[cfg(feature = "std")]
596         fn sleeper_with_pending_callbacks() {
597                 // This is similar to the above `test_multi_future_sleep` test, but in addition registers
598                 // "normal" callbacks which will cause the futures to assume notification has occurred,
599                 // rather than waiting for a woken sleeper.
600                 let notifier_a = Notifier::new();
601                 let notifier_b = Notifier::new();
602
603                 // Set both notifiers as woken without sleeping yet.
604                 notifier_a.notify();
605                 notifier_b.notify();
606
607                 // After sleeping one future (not guaranteed which one, however) will have its notification
608                 // bit cleared.
609                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
610
611                 // By registering a callback on the futures for both notifiers, one will complete
612                 // immediately, but one will remain tied to the notifier, and will complete once the
613                 // notifier is next woken, which will be considered the completion of the notification.
614                 let callback_a = Arc::new(AtomicBool::new(false));
615                 let callback_b = Arc::new(AtomicBool::new(false));
616                 let callback_a_ref = Arc::clone(&callback_a);
617                 let callback_b_ref = Arc::clone(&callback_b);
618                 notifier_a.get_future().register_callback(Box::new(move || assert!(!callback_a_ref.fetch_or(true, Ordering::SeqCst))));
619                 notifier_b.get_future().register_callback(Box::new(move || assert!(!callback_b_ref.fetch_or(true, Ordering::SeqCst))));
620                 assert!(callback_a.load(Ordering::SeqCst) ^ callback_b.load(Ordering::SeqCst));
621
622                 // If we now notify both notifiers again, the other callback will fire, completing the
623                 // notification, and we'll be back to one pending notification.
624                 notifier_a.notify();
625                 notifier_b.notify();
626
627                 assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
628                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
629                 assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
630                         .wait_timeout(Duration::from_millis(10)));
631         }
632 }