pub(crate) fn notify(&self) {
let mut lock = self.notify_pending.lock().unwrap();
if let Some(future_state) = &lock.1 {
- if future_state.lock().unwrap().complete() {
+ if complete_future(future_state) {
lock.1 = None;
return;
}
} else {
let state = Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
+ callbacks_with_state: Vec::new(),
complete: lock.0,
callbacks_made: false,
}));
// first bool - set to false if we're just calling a Waker, and true if we're calling an actual
// user-provided function.
callbacks: Vec<(bool, Box<dyn FutureCallback>)>,
+ callbacks_with_state: Vec<(bool, Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>)>,
complete: bool,
callbacks_made: bool,
}
-impl FutureState {
- fn complete(&mut self) -> bool {
- for (counts_as_call, callback) in self.callbacks.drain(..) {
- callback.call();
- self.callbacks_made |= counts_as_call;
- }
- self.complete = true;
- self.callbacks_made
+fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
+ let mut state_lock = this.lock().unwrap();
+ let state = &mut *state_lock;
+ for (counts_as_call, callback) in state.callbacks.drain(..) {
+ callback.call();
+ state.callbacks_made |= counts_as_call;
+ }
+ for (counts_as_call, callback) in state.callbacks_with_state.drain(..) {
+ (callback)(this);
+ state.callbacks_made |= counts_as_call;
}
+ state.complete = true;
+ state.callbacks_made
}
/// A simple future which can complete once, and calls some callback(s) when it does so.
for notifier_mtx in self.notifiers.iter() {
let cv_ref = Arc::clone(&cv);
let notified_fut_ref = Arc::clone(¬ified_fut_mtx);
- let notifier_ref = Arc::clone(¬ifier_mtx);
let mut notifier = notifier_mtx.lock().unwrap();
if notifier.complete {
- *notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
+ *notified_fut_mtx.lock().unwrap() = Some(Arc::clone(¬ifier_mtx));
break;
}
- notifier.callbacks.push((false, Box::new(move || {
- *notified_fut_ref.lock().unwrap() = Some(Arc::clone(¬ifier_ref));
+ notifier.callbacks_with_state.push((false, Box::new(move |notifier_ref| {
+ *notified_fut_ref.lock().unwrap() = Some(Arc::clone(notifier_ref));
cv_ref.notify_all();
})));
}
}
}
+ #[cfg(feature = "std")]
+ #[test]
+ fn test_state_drops() {
+ // Previously, there was a leak if a `Notifier` was `drop`ed without ever being notified
+ // but after having been slept-on. This tests for that leak.
+ use crate::sync::Arc;
+ use std::thread;
+
+ let notifier_a = Arc::new(Notifier::new());
+ let notifier_b = Arc::new(Notifier::new());
+
+ let thread_notifier_a = Arc::clone(¬ifier_a);
+
+ let future_a = notifier_a.get_future();
+ let future_state_a = Arc::downgrade(&future_a.state);
+
+ let future_b = notifier_b.get_future();
+ let future_state_b = Arc::downgrade(&future_b.state);
+
+ let join_handle = thread::spawn(move || {
+ // Let the other thread get to the wait point, then notify it.
+ std::thread::sleep(Duration::from_millis(50));
+ thread_notifier_a.notify();
+ });
+
+ // Wait on the other thread to finish its sleep, note that the leak only happened if we
+ // actually have to sleep here, not if we immediately return.
+ Sleeper::from_two_futures(future_a, future_b).wait();
+
+ join_handle.join().unwrap();
+
+ // then drop the notifiers and make sure the future states are gone.
+ mem::drop(notifier_a);
+ mem::drop(notifier_b);
+
+ assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
+ }
+
#[test]
fn test_future_callbacks() {
let future = Future {
state: Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
+ callbacks_with_state: Vec::new(),
complete: false,
callbacks_made: false,
}))
future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
assert!(!callback.load(Ordering::SeqCst));
- future.state.lock().unwrap().complete();
+ complete_future(&future.state);
assert!(callback.load(Ordering::SeqCst));
- future.state.lock().unwrap().complete();
+ complete_future(&future.state);
}
#[test]
let future = Future {
state: Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
+ callbacks_with_state: Vec::new(),
complete: false,
callbacks_made: false,
}))
};
- future.state.lock().unwrap().complete();
+ complete_future(&future.state);
let callback = Arc::new(AtomicBool::new(false));
let callback_ref = Arc::clone(&callback);
}
// Rather annoyingly, there's no safe way in Rust std to construct a Waker despite it being
- // totally possible to construct from a trait implementation (though somewhat less effecient
+ // totally possible to construct from a trait implementation (though somewhat less efficient
// compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
// waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
let mut future = Future {
state: Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
+ callbacks_with_state: Vec::new(),
complete: false,
callbacks_made: false,
}))
assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending);
assert!(!second_woken.load(Ordering::SeqCst));
- future.state.lock().unwrap().complete();
+ complete_future(&future.state);
assert!(woken.load(Ordering::SeqCst));
assert!(second_woken.load(Ordering::SeqCst));
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));