/// Gets a [`Future`] that will get woken up with any waiters
pub(crate) fn get_future(&self) -> Future {
let mut lock = self.notify_pending.lock().unwrap();
+ let mut self_idx = 0;
if let Some(existing_state) = &lock.1 {
- if existing_state.lock().unwrap().callbacks_made {
+ let mut locked = existing_state.lock().unwrap();
+ if locked.callbacks_made {
// If the existing `FutureState` has completed and actually made callbacks,
// consider the notification flag to have been cleared and reset the future state.
+ mem::drop(locked);
lock.1.take();
lock.0 = false;
+ } else {
+ self_idx = locked.next_idx;
+ locked.next_idx += 1;
}
}
if let Some(existing_state) = &lock.1 {
- Future { state: Arc::clone(&existing_state) }
+ Future { state: Arc::clone(&existing_state), self_idx }
} else {
let state = Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
callbacks_with_state: Vec::new(),
complete: lock.0,
callbacks_made: false,
+ next_idx: 1,
}));
lock.1 = Some(Arc::clone(&state));
- Future { state }
+ Future { state, self_idx: 0 }
}
}
// we only count it after another `poll()` and the second wakes a `Sleeper` which handles
// setting `callbacks_made` itself).
callbacks: Vec<Box<dyn FutureCallback>>,
- std_future_callbacks: Vec<StdWaker>,
+ std_future_callbacks: Vec<(usize, StdWaker)>,
callbacks_with_state: Vec<Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>>,
complete: bool,
callbacks_made: bool,
+ next_idx: usize,
}
fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
callback.call();
state.callbacks_made = true;
}
- for waker in state.std_future_callbacks.drain(..) {
+ for (_, waker) in state.std_future_callbacks.drain(..) {
waker.0.wake_by_ref();
}
for callback in state.callbacks_with_state.drain(..) {
}
/// A simple future which can complete once, and calls some callback(s) when it does so.
-///
-/// Clones can be made and all futures cloned from the same source will complete at the same time.
-#[derive(Clone)]
pub struct Future {
state: Arc<Mutex<FutureState>>,
+ self_idx: usize,
}
impl Future {
Poll::Ready(())
} else {
let waker = cx.waker().clone();
- state.std_future_callbacks.push(StdWaker(waker));
+ state.std_future_callbacks.push((self.self_idx, StdWaker(waker)));
Poll::Pending
}
}
callbacks_with_state: Vec::new(),
complete: false,
callbacks_made: false,
- }))
+ next_idx: 1,
+ })),
+ self_idx: 0,
};
let callback = Arc::new(AtomicBool::new(false));
let callback_ref = Arc::clone(&callback);
let future = Future {
state: Arc::new(Mutex::new(FutureState {
callbacks: Vec::new(),
+ std_future_callbacks: Vec::new(),
callbacks_with_state: Vec::new(),
complete: false,
callbacks_made: false,
- }))
+ next_idx: 1,
+ })),
+ self_idx: 0,
};
complete_future(&future.state);
callbacks_with_state: Vec::new(),
complete: false,
callbacks_made: false,
- }))
+ next_idx: 2,
+ })),
+ self_idx: 0,
};
- let mut second_future = Future { state: Arc::clone(&future.state) };
+ let mut second_future = Future { state: Arc::clone(&future.state), self_idx: 1 };
let (woken, waker) = create_waker();
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);