]> git.bitcoin.ninja Git - rust-lightning/blob - lightning/src/util/wakers.rs
Persist scorer upon update based on event handling
[rust-lightning] / lightning / src / util / wakers.rs
1 // This file is Copyright its original authors, visible in version control
2 // history.
3 //
4 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7 // You may not use this file except in accordance with one or both of these
8 // licenses.
9
10 //! Utilities which allow users to block on some future notification from LDK. These are
11 //! specifically used by [`ChannelManager`] to allow waiting until the [`ChannelManager`] needs to
12 //! be re-persisted.
13 //!
14 //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
15
16 use alloc::sync::Arc;
17 use core::mem;
18 use crate::sync::Mutex;
19
20 use crate::prelude::*;
21
22 #[cfg(feature = "std")]
23 use crate::sync::Condvar;
24 #[cfg(feature = "std")]
25 use std::time::Duration;
26
27 use core::future::Future as StdFuture;
28 use core::task::{Context, Poll};
29 use core::pin::Pin;
30
31
32 /// Used to signal to one of many waiters that the condition they're waiting on has happened.
33 pub(crate) struct Notifier {
34         notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
35 }
36
37 impl Notifier {
38         pub(crate) fn new() -> Self {
39                 Self {
40                         notify_pending: Mutex::new((false, None)),
41                 }
42         }
43
44         /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
45         pub(crate) fn notify(&self) {
46                 let mut lock = self.notify_pending.lock().unwrap();
47                 if let Some(future_state) = &lock.1 {
48                         if future_state.lock().unwrap().complete() {
49                                 lock.1 = None;
50                                 return;
51                         }
52                 }
53                 lock.0 = true;
54         }
55
56         /// Gets a [`Future`] that will get woken up with any waiters
57         pub(crate) fn get_future(&self) -> Future {
58                 let mut lock = self.notify_pending.lock().unwrap();
59                 if let Some(existing_state) = &lock.1 {
60                         if existing_state.lock().unwrap().callbacks_made {
61                                 // If the existing `FutureState` has completed and actually made callbacks,
62                                 // consider the notification flag to have been cleared and reset the future state.
63                                 lock.1.take();
64                                 lock.0 = false;
65                         }
66                 }
67                 if let Some(existing_state) = &lock.1 {
68                         Future { state: Arc::clone(&existing_state) }
69                 } else {
70                         let state = Arc::new(Mutex::new(FutureState {
71                                 callbacks: Vec::new(),
72                                 complete: lock.0,
73                                 callbacks_made: false,
74                         }));
75                         lock.1 = Some(Arc::clone(&state));
76                         Future { state }
77                 }
78         }
79
80         #[cfg(any(test, feature = "_test_utils"))]
81         pub fn notify_pending(&self) -> bool {
82                 self.notify_pending.lock().unwrap().0
83         }
84 }
85
86 macro_rules! define_callback { ($($bounds: path),*) => {
87 /// A callback which is called when a [`Future`] completes.
88 ///
89 /// Note that this MUST NOT call back into LDK directly, it must instead schedule actions to be
90 /// taken later. Rust users should use the [`std::future::Future`] implementation for [`Future`]
91 /// instead.
92 ///
93 /// Note that the [`std::future::Future`] implementation may only work for runtimes which schedule
94 /// futures when they receive a wake, rather than immediately executing them.
95 pub trait FutureCallback : $($bounds +)* {
96         /// The method which is called.
97         fn call(&self);
98 }
99
100 impl<F: Fn() $(+ $bounds)*> FutureCallback for F {
101         fn call(&self) { (self)(); }
102 }
103 } }
104
105 #[cfg(feature = "std")]
106 define_callback!(Send);
107 #[cfg(not(feature = "std"))]
108 define_callback!();
109
110 pub(crate) struct FutureState {
111         // When we're tracking whether a callback counts as having woken the user's code, we check the
112         // first bool - set to false if we're just calling a Waker, and true if we're calling an actual
113         // user-provided function.
114         callbacks: Vec<(bool, Box<dyn FutureCallback>)>,
115         complete: bool,
116         callbacks_made: bool,
117 }
118
119 impl FutureState {
120         fn complete(&mut self) -> bool {
121                 for (counts_as_call, callback) in self.callbacks.drain(..) {
122                         callback.call();
123                         self.callbacks_made |= counts_as_call;
124                 }
125                 self.complete = true;
126                 self.callbacks_made
127         }
128 }
129
130 /// A simple future which can complete once, and calls some callback(s) when it does so.
131 ///
132 /// Clones can be made and all futures cloned from the same source will complete at the same time.
133 #[derive(Clone)]
134 pub struct Future {
135         state: Arc<Mutex<FutureState>>,
136 }
137
138 impl Future {
139         /// Registers a callback to be called upon completion of this future. If the future has already
140         /// completed, the callback will be called immediately.
141         ///
142         /// This is not exported to bindings users, use the bindings-only `register_callback_fn` instead
143         pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
144                 let mut state = self.state.lock().unwrap();
145                 if state.complete {
146                         state.callbacks_made = true;
147                         mem::drop(state);
148                         callback.call();
149                 } else {
150                         state.callbacks.push((true, callback));
151                 }
152         }
153
154         // C bindings don't (currently) know how to map `Box<dyn Trait>`, and while it could add the
155         // following wrapper, doing it in the bindings is currently much more work than simply doing it
156         // here.
157         /// Registers a callback to be called upon completion of this future. If the future has already
158         /// completed, the callback will be called immediately.
159         #[cfg(c_bindings)]
160         pub fn register_callback_fn<F: 'static + FutureCallback>(&self, callback: F) {
161                 self.register_callback(Box::new(callback));
162         }
163
164         /// Waits until this [`Future`] completes.
165         #[cfg(feature = "std")]
166         pub fn wait(self) {
167                 Sleeper::from_single_future(self).wait();
168         }
169
170         /// Waits until this [`Future`] completes or the given amount of time has elapsed.
171         ///
172         /// Returns true if the [`Future`] completed, false if the time elapsed.
173         #[cfg(feature = "std")]
174         pub fn wait_timeout(self, max_wait: Duration) -> bool {
175                 Sleeper::from_single_future(self).wait_timeout(max_wait)
176         }
177
178         #[cfg(test)]
179         pub fn poll_is_complete(&self) -> bool {
180                 let mut state = self.state.lock().unwrap();
181                 if state.complete {
182                         state.callbacks_made = true;
183                         true
184                 } else { false }
185         }
186 }
187
188 use core::task::Waker;
189 struct StdWaker(pub Waker);
190 impl FutureCallback for StdWaker {
191         fn call(&self) { self.0.wake_by_ref() }
192 }
193
194 /// This is not exported to bindings users as Rust Futures aren't usable in language bindings.
195 impl<'a> StdFuture for Future {
196         type Output = ();
197
198         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
199                 let mut state = self.state.lock().unwrap();
200                 if state.complete {
201                         state.callbacks_made = true;
202                         Poll::Ready(())
203                 } else {
204                         let waker = cx.waker().clone();
205                         state.callbacks.push((false, Box::new(StdWaker(waker))));
206                         Poll::Pending
207                 }
208         }
209 }
210
211 /// A struct which can be used to select across many [`Future`]s at once without relying on a full
212 /// async context.
213 #[cfg(feature = "std")]
214 pub struct Sleeper {
215         notifiers: Vec<Arc<Mutex<FutureState>>>,
216 }
217
218 #[cfg(feature = "std")]
219 impl Sleeper {
220         /// Constructs a new sleeper from one future, allowing blocking on it.
221         pub fn from_single_future(future: Future) -> Self {
222                 Self { notifiers: vec![future.state] }
223         }
224         /// Constructs a new sleeper from two futures, allowing blocking on both at once.
225         // Note that this is the common case - a ChannelManager and ChainMonitor.
226         pub fn from_two_futures(fut_a: Future, fut_b: Future) -> Self {
227                 Self { notifiers: vec![fut_a.state, fut_b.state] }
228         }
229         /// Constructs a new sleeper on many futures, allowing blocking on all at once.
230         pub fn new(futures: Vec<Future>) -> Self {
231                 Self { notifiers: futures.into_iter().map(|f| f.state).collect() }
232         }
233         /// Prepares to go into a wait loop body, creating a condition variable which we can block on
234         /// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
235         /// condition variable being woken.
236         fn setup_wait(&self) -> (Arc<Condvar>, Arc<Mutex<Option<Arc<Mutex<FutureState>>>>>) {
237                 let cv = Arc::new(Condvar::new());
238                 let notified_fut_mtx = Arc::new(Mutex::new(None));
239                 {
240                         for notifier_mtx in self.notifiers.iter() {
241                                 let cv_ref = Arc::clone(&cv);
242                                 let notified_fut_ref = Arc::clone(&notified_fut_mtx);
243                                 let notifier_ref = Arc::clone(&notifier_mtx);
244                                 let mut notifier = notifier_mtx.lock().unwrap();
245                                 if notifier.complete {
246                                         *notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
247                                         break;
248                                 }
249                                 notifier.callbacks.push((false, Box::new(move || {
250                                         *notified_fut_ref.lock().unwrap() = Some(Arc::clone(&notifier_ref));
251                                         cv_ref.notify_all();
252                                 })));
253                         }
254                 }
255                 (cv, notified_fut_mtx)
256         }
257
258         /// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
259         pub fn wait(&self) {
260                 let (cv, notified_fut_mtx) = self.setup_wait();
261                 let notified_fut = cv.wait_while(notified_fut_mtx.lock().unwrap(), |fut_opt| fut_opt.is_none())
262                         .unwrap().take().expect("CV wait shouldn't have returned until the notifying future was set");
263                 notified_fut.lock().unwrap().callbacks_made = true;
264         }
265
266         /// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed or the
267         /// given amount of time has elapsed. Returns true if a [`Future`] completed, false if the time
268         /// elapsed.
269         pub fn wait_timeout(&self, max_wait: Duration) -> bool {
270                 let (cv, notified_fut_mtx) = self.setup_wait();
271                 let notified_fut =
272                         match cv.wait_timeout_while(notified_fut_mtx.lock().unwrap(), max_wait, |fut_opt| fut_opt.is_none()) {
273                                 Ok((_, e)) if e.timed_out() => return false,
274                                 Ok((mut notified_fut, _)) =>
275                                         notified_fut.take().expect("CV wait shouldn't have returned until the notifying future was set"),
276                                 Err(_) => panic!("Previous panic while a lock was held led to a lock panic"),
277                         };
278                 notified_fut.lock().unwrap().callbacks_made = true;
279                 true
280         }
281 }
282
283 #[cfg(test)]
284 mod tests {
285         use super::*;
286         use core::sync::atomic::{AtomicBool, Ordering};
287         use core::future::Future as FutureTrait;
288         use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
289
290         #[test]
291         fn notifier_pre_notified_future() {
292                 // Previously, if we generated a future after a `Notifier` had been notified, the future
293                 // would never complete. This tests this behavior, ensuring the future instead completes
294                 // immediately.
295                 let notifier = Notifier::new();
296                 notifier.notify();
297
298                 let callback = Arc::new(AtomicBool::new(false));
299                 let callback_ref = Arc::clone(&callback);
300                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
301                 assert!(callback.load(Ordering::SeqCst));
302         }
303
304         #[test]
305         fn notifier_future_completes_wake() {
306                 // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
307                 // been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
308                 // `lightning-background-processor` to persist in a tight loop.
309                 let notifier = Notifier::new();
310
311                 // First check the simple case, ensuring if we get notified a new future isn't woken until
312                 // a second `notify`.
313                 let callback = Arc::new(AtomicBool::new(false));
314                 let callback_ref = Arc::clone(&callback);
315                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
316                 assert!(!callback.load(Ordering::SeqCst));
317
318                 notifier.notify();
319                 assert!(callback.load(Ordering::SeqCst));
320
321                 let callback = Arc::new(AtomicBool::new(false));
322                 let callback_ref = Arc::clone(&callback);
323                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
324                 assert!(!callback.load(Ordering::SeqCst));
325
326                 notifier.notify();
327                 assert!(callback.load(Ordering::SeqCst));
328
329                 // Then check the case where the future is fetched before the notification, but a callback
330                 // is only registered after the `notify`, ensuring that it is still sufficient to ensure we
331                 // don't get an instant-wake when we get a new future.
332                 let future = notifier.get_future();
333                 notifier.notify();
334
335                 let callback = Arc::new(AtomicBool::new(false));
336                 let callback_ref = Arc::clone(&callback);
337                 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
338                 assert!(callback.load(Ordering::SeqCst));
339
340                 let callback = Arc::new(AtomicBool::new(false));
341                 let callback_ref = Arc::clone(&callback);
342                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
343                 assert!(!callback.load(Ordering::SeqCst));
344         }
345
346         #[test]
347         fn new_future_wipes_notify_bit() {
348                 // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
349                 // been notified, we'd never mark the notifier as not-awaiting-notify if a `Future` is
350                 // fetched after the notify bit has been set.
351                 let notifier = Notifier::new();
352                 notifier.notify();
353
354                 let callback = Arc::new(AtomicBool::new(false));
355                 let callback_ref = Arc::clone(&callback);
356                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
357                 assert!(callback.load(Ordering::SeqCst));
358
359                 let callback = Arc::new(AtomicBool::new(false));
360                 let callback_ref = Arc::clone(&callback);
361                 notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
362                 assert!(!callback.load(Ordering::SeqCst));
363
364                 notifier.notify();
365                 assert!(callback.load(Ordering::SeqCst));
366         }
367
368         #[cfg(feature = "std")]
369         #[test]
370         fn test_wait_timeout() {
371                 use crate::sync::Arc;
372                 use std::thread;
373
374                 let persistence_notifier = Arc::new(Notifier::new());
375                 let thread_notifier = Arc::clone(&persistence_notifier);
376
377                 let exit_thread = Arc::new(AtomicBool::new(false));
378                 let exit_thread_clone = exit_thread.clone();
379                 thread::spawn(move || {
380                         loop {
381                                 thread_notifier.notify();
382                                 if exit_thread_clone.load(Ordering::SeqCst) {
383                                         break
384                                 }
385                         }
386                 });
387
388                 // Check that we can block indefinitely until updates are available.
389                 let _ = persistence_notifier.get_future().wait();
390
391                 // Check that the Notifier will return after the given duration if updates are
392                 // available.
393                 loop {
394                         if persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
395                                 break
396                         }
397                 }
398
399                 exit_thread.store(true, Ordering::SeqCst);
400
401                 // Check that the Notifier will return after the given duration even if no updates
402                 // are available.
403                 loop {
404                         if !persistence_notifier.get_future().wait_timeout(Duration::from_millis(100)) {
405                                 break
406                         }
407                 }
408         }
409
410         #[test]
411         fn test_future_callbacks() {
412                 let future = Future {
413                         state: Arc::new(Mutex::new(FutureState {
414                                 callbacks: Vec::new(),
415                                 complete: false,
416                                 callbacks_made: false,
417                         }))
418                 };
419                 let callback = Arc::new(AtomicBool::new(false));
420                 let callback_ref = Arc::clone(&callback);
421                 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
422
423                 assert!(!callback.load(Ordering::SeqCst));
424                 future.state.lock().unwrap().complete();
425                 assert!(callback.load(Ordering::SeqCst));
426                 future.state.lock().unwrap().complete();
427         }
428
429         #[test]
430         fn test_pre_completed_future_callbacks() {
431                 let future = Future {
432                         state: Arc::new(Mutex::new(FutureState {
433                                 callbacks: Vec::new(),
434                                 complete: false,
435                                 callbacks_made: false,
436                         }))
437                 };
438                 future.state.lock().unwrap().complete();
439
440                 let callback = Arc::new(AtomicBool::new(false));
441                 let callback_ref = Arc::clone(&callback);
442                 future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
443
444                 assert!(callback.load(Ordering::SeqCst));
445                 assert!(future.state.lock().unwrap().callbacks.is_empty());
446         }
447
448         // Rather annoyingly, there's no safe way in Rust std to construct a Waker despite it being
449         // totally possible to construct from a trait implementation (though somewhat less effecient
450         // compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
451         // waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
452         const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
453         unsafe fn wake_by_ref(ptr: *const ()) { let p = ptr as *const Arc<AtomicBool>; assert!(!(*p).fetch_or(true, Ordering::SeqCst)); }
454         unsafe fn drop(ptr: *const ()) { let p = ptr as *mut Arc<AtomicBool>; let _freed = Box::from_raw(p); }
455         unsafe fn wake(ptr: *const ()) { wake_by_ref(ptr); drop(ptr); }
456         unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
457                 let p = ptr as *const Arc<AtomicBool>;
458                 RawWaker::new(Box::into_raw(Box::new(Arc::clone(&*p))) as *const (), &WAKER_V_TABLE)
459         }
460
461         fn create_waker() -> (Arc<AtomicBool>, Waker) {
462                 let a = Arc::new(AtomicBool::new(false));
463                 let waker = unsafe { Waker::from_raw(waker_clone((&a as *const Arc<AtomicBool>) as *const ())) };
464                 (a, waker)
465         }
466
467         #[test]
468         fn test_future() {
469                 let mut future = Future {
470                         state: Arc::new(Mutex::new(FutureState {
471                                 callbacks: Vec::new(),
472                                 complete: false,
473                                 callbacks_made: false,
474                         }))
475                 };
476                 let mut second_future = Future { state: Arc::clone(&future.state) };
477
478                 let (woken, waker) = create_waker();
479                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
480                 assert!(!woken.load(Ordering::SeqCst));
481
482                 let (second_woken, second_waker) = create_waker();
483                 assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending);
484                 assert!(!second_woken.load(Ordering::SeqCst));
485
486                 future.state.lock().unwrap().complete();
487                 assert!(woken.load(Ordering::SeqCst));
488                 assert!(second_woken.load(Ordering::SeqCst));
489                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
490                 assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(()));
491         }
492
493         #[test]
494         #[cfg(feature = "std")]
495         fn test_dropped_future_doesnt_count() {
496                 // Tests that if a Future gets drop'd before it is poll()ed `Ready` it doesn't count as
497                 // having been woken, leaving the notify-required flag set.
498                 let notifier = Notifier::new();
499                 notifier.notify();
500
501                 // If we get a future and don't touch it we're definitely still notify-required.
502                 notifier.get_future();
503                 assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
504                 assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
505
506                 // Even if we poll'd once but didn't observe a `Ready`, we should be notify-required.
507                 let mut future = notifier.get_future();
508                 let (woken, waker) = create_waker();
509                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
510
511                 notifier.notify();
512                 assert!(woken.load(Ordering::SeqCst));
513                 assert!(notifier.get_future().wait_timeout(Duration::from_millis(1)));
514
515                 // However, once we do poll `Ready` it should wipe the notify-required flag.
516                 let mut future = notifier.get_future();
517                 let (woken, waker) = create_waker();
518                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
519
520                 notifier.notify();
521                 assert!(woken.load(Ordering::SeqCst));
522                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
523                 assert!(!notifier.get_future().wait_timeout(Duration::from_millis(1)));
524         }
525
526         #[test]
527         fn test_poll_post_notify_completes() {
528                 // Tests that if we have a future state that has completed, and we haven't yet requested a
529                 // new future, if we get a notify prior to requesting that second future it is generated
530                 // pre-completed.
531                 let notifier = Notifier::new();
532
533                 notifier.notify();
534                 let mut future = notifier.get_future();
535                 let (woken, waker) = create_waker();
536                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
537                 assert!(!woken.load(Ordering::SeqCst));
538
539                 notifier.notify();
540                 let mut future = notifier.get_future();
541                 let (woken, waker) = create_waker();
542                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
543                 assert!(!woken.load(Ordering::SeqCst));
544
545                 let mut future = notifier.get_future();
546                 let (woken, waker) = create_waker();
547                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
548                 assert!(!woken.load(Ordering::SeqCst));
549
550                 notifier.notify();
551                 assert!(woken.load(Ordering::SeqCst));
552                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
553         }
554
555         #[test]
556         fn test_poll_post_notify_completes_initial_notified() {
557                 // Identical to the previous test, but the first future completes via a wake rather than an
558                 // immediate `Poll::Ready`.
559                 let notifier = Notifier::new();
560
561                 let mut future = notifier.get_future();
562                 let (woken, waker) = create_waker();
563                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
564
565                 notifier.notify();
566                 assert!(woken.load(Ordering::SeqCst));
567                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
568
569                 notifier.notify();
570                 let mut future = notifier.get_future();
571                 let (woken, waker) = create_waker();
572                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
573                 assert!(!woken.load(Ordering::SeqCst));
574
575                 let mut future = notifier.get_future();
576                 let (woken, waker) = create_waker();
577                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
578                 assert!(!woken.load(Ordering::SeqCst));
579
580                 notifier.notify();
581                 assert!(woken.load(Ordering::SeqCst));
582                 assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
583         }
584
585         #[test]
586         #[cfg(feature = "std")]
587         fn test_multi_future_sleep() {
588                 // Tests the `Sleeper` with multiple futures.
589                 let notifier_a = Notifier::new();
590                 let notifier_b = Notifier::new();
591
592                 // Set both notifiers as woken without sleeping yet.
593                 notifier_a.notify();
594                 notifier_b.notify();
595                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
596
597                 // One future has woken us up, but the other should still have a pending notification.
598                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
599
600                 // However once we've slept twice, we should no longer have any pending notifications
601                 assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
602                         .wait_timeout(Duration::from_millis(10)));
603
604                 // Test ordering somewhat more.
605                 notifier_a.notify();
606                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
607         }
608
609         #[test]
610         #[cfg(feature = "std")]
611         fn sleeper_with_pending_callbacks() {
612                 // This is similar to the above `test_multi_future_sleep` test, but in addition registers
613                 // "normal" callbacks which will cause the futures to assume notification has occurred,
614                 // rather than waiting for a woken sleeper.
615                 let notifier_a = Notifier::new();
616                 let notifier_b = Notifier::new();
617
618                 // Set both notifiers as woken without sleeping yet.
619                 notifier_a.notify();
620                 notifier_b.notify();
621
622                 // After sleeping one future (not guaranteed which one, however) will have its notification
623                 // bit cleared.
624                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
625
626                 // By registering a callback on the futures for both notifiers, one will complete
627                 // immediately, but one will remain tied to the notifier, and will complete once the
628                 // notifier is next woken, which will be considered the completion of the notification.
629                 let callback_a = Arc::new(AtomicBool::new(false));
630                 let callback_b = Arc::new(AtomicBool::new(false));
631                 let callback_a_ref = Arc::clone(&callback_a);
632                 let callback_b_ref = Arc::clone(&callback_b);
633                 notifier_a.get_future().register_callback(Box::new(move || assert!(!callback_a_ref.fetch_or(true, Ordering::SeqCst))));
634                 notifier_b.get_future().register_callback(Box::new(move || assert!(!callback_b_ref.fetch_or(true, Ordering::SeqCst))));
635                 assert!(callback_a.load(Ordering::SeqCst) ^ callback_b.load(Ordering::SeqCst));
636
637                 // If we now notify both notifiers again, the other callback will fire, completing the
638                 // notification, and we'll be back to one pending notification.
639                 notifier_a.notify();
640                 notifier_b.notify();
641
642                 assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
643                 Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
644                 assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
645                         .wait_timeout(Duration::from_millis(10)));
646         }
647 }