X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Futil%2Fwakers.rs;h=fdbc22f116600b7162bdbdcafa4a6f314af944e1;hb=626c60600ef3a4ec93d5f13df8d766bf3a7cd664;hp=6d8b03cbd7c4c66e9c686bad5afbec601df96ee5;hpb=7527e4b7df674fc2f5442514bb7b1d2294e59ce8;p=rust-lightning diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 6d8b03cb..fdbc22f1 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -33,6 +33,20 @@ pub(crate) struct Notifier { condvar: Condvar, } +macro_rules! check_woken { + ($guard: expr, $retval: expr) => { { + if $guard.0 { + $guard.0 = false; + if $guard.1.as_ref().map(|l| l.lock().unwrap().complete).unwrap_or(false) { + // If we're about to return as woken, and the future state is marked complete, wipe + // the future state and let the next future wait until we get a new notify. + $guard.1.take(); + } + return $retval; + } + } } +} + impl Notifier { pub(crate) fn new() -> Self { Self { @@ -57,16 +71,9 @@ impl Notifier { pub(crate) fn wait(&self) { loop { let mut guard = self.propagate_future_state_to_notify_flag(); - if guard.0 { - guard.0 = false; - return; - } + check_woken!(guard, ()); guard = self.condvar.wait(guard).unwrap(); - let result = guard.0; - if result { - guard.0 = false; - return - } + check_woken!(guard, ()); } } @@ -75,24 +82,20 @@ impl Notifier { let current_time = Instant::now(); loop { let mut guard = self.propagate_future_state_to_notify_flag(); - if guard.0 { - guard.0 = false; - return true; - } + check_woken!(guard, true); guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0; + check_woken!(guard, true); // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the // desired wait time has actually passed, and if not then restart the loop with a reduced wait // time. Note that this logic can be highly simplified through the use of // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to // 1.42.0. let elapsed = current_time.elapsed(); - let result = guard.0; - if result || elapsed >= max_wait { - guard.0 = false; - return result; + if elapsed >= max_wait { + return false; } match max_wait.checked_sub(elapsed) { - None => return result, + None => return false, Some(_) => continue } } @@ -149,16 +152,19 @@ impl FutureCallback for F { } pub(crate) struct FutureState { - callbacks: Vec>, + // When we're tracking whether a callback counts as having woken the user's code, we check the + // first bool - set to false if we're just calling a Waker, and true if we're calling an actual + // user-provided function. + callbacks: Vec<(bool, Box)>, complete: bool, callbacks_made: bool, } impl FutureState { fn complete(&mut self) { - for callback in self.callbacks.drain(..) { + for (counts_as_call, callback) in self.callbacks.drain(..) { callback.call(); - self.callbacks_made = true; + self.callbacks_made |= counts_as_call; } self.complete = true; } @@ -181,7 +187,7 @@ impl Future { mem::drop(state); callback.call(); } else { - state.callbacks.push(callback); + state.callbacks.push((true, callback)); } } @@ -209,10 +215,11 @@ impl<'a> StdFuture for Future { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut state = self.state.lock().unwrap(); if state.complete { + state.callbacks_made = true; Poll::Ready(()) } else { let waker = cx.waker().clone(); - state.callbacks.push(Box::new(StdWaker(waker))); + state.callbacks.push((false, Box::new(StdWaker(waker)))); Poll::Pending } } @@ -430,4 +437,36 @@ mod tests { assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(())); } + + #[test] + fn test_dropped_future_doesnt_count() { + // Tests that if a Future gets drop'd before it is poll()ed `Ready` it doesn't count as + // having been woken, leaving the notify-required flag set. + let notifier = Notifier::new(); + notifier.notify(); + + // If we get a future and don't touch it we're definitely still notify-required. + notifier.get_future(); + assert!(notifier.wait_timeout(Duration::from_millis(1))); + assert!(!notifier.wait_timeout(Duration::from_millis(1))); + + // Even if we poll'd once but didn't observe a `Ready`, we should be notify-required. + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert!(notifier.wait_timeout(Duration::from_millis(1))); + + // However, once we do poll `Ready` it should wipe the notify-required flag. + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!notifier.wait_timeout(Duration::from_millis(1))); + } }