X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Futil%2Fwakers.rs;h=f86fc376cee0202323b9924057157aaf8379f86a;hb=ea15f0f448137311d95f645606a3bff99bcfca0d;hp=976ce69f8859d3e533580a8767de08b040106680;hpb=5f053e43af4971d614574bb00c41a4e75fe8dbfd;p=rust-lightning diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 976ce69f..f86fc376 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -105,7 +105,10 @@ impl Notifier { pub(crate) fn notify(&self) { let mut lock = self.notify_pending.lock().unwrap(); if let Some(future_state) = &lock.1 { - future_state.lock().unwrap().complete(); + if future_state.lock().unwrap().complete() { + lock.1 = None; + return; + } } lock.0 = true; mem::drop(lock); @@ -152,18 +155,22 @@ impl FutureCallback for F { } pub(crate) struct FutureState { - callbacks: Vec>, + // When we're tracking whether a callback counts as having woken the user's code, we check the + // first bool - set to false if we're just calling a Waker, and true if we're calling an actual + // user-provided function. + callbacks: Vec<(bool, Box)>, complete: bool, callbacks_made: bool, } impl FutureState { - fn complete(&mut self) { - for callback in self.callbacks.drain(..) { + fn complete(&mut self) -> bool { + for (counts_as_call, callback) in self.callbacks.drain(..) { callback.call(); - self.callbacks_made = true; + self.callbacks_made |= counts_as_call; } self.complete = true; + self.callbacks_made } } @@ -184,7 +191,7 @@ impl Future { mem::drop(state); callback.call(); } else { - state.callbacks.push(callback); + state.callbacks.push((true, callback)); } } @@ -212,10 +219,11 @@ impl<'a> StdFuture for Future { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut state = self.state.lock().unwrap(); if state.complete { + state.callbacks_made = true; Poll::Ready(()) } else { let waker = cx.waker().clone(); - state.callbacks.push(Box::new(StdWaker(waker))); + state.callbacks.push((false, Box::new(StdWaker(waker)))); Poll::Pending } } @@ -433,4 +441,95 @@ mod tests { assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(())); } + + #[test] + fn test_dropped_future_doesnt_count() { + // Tests that if a Future gets drop'd before it is poll()ed `Ready` it doesn't count as + // having been woken, leaving the notify-required flag set. + let notifier = Notifier::new(); + notifier.notify(); + + // If we get a future and don't touch it we're definitely still notify-required. + notifier.get_future(); + assert!(notifier.wait_timeout(Duration::from_millis(1))); + assert!(!notifier.wait_timeout(Duration::from_millis(1))); + + // Even if we poll'd once but didn't observe a `Ready`, we should be notify-required. + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert!(notifier.wait_timeout(Duration::from_millis(1))); + + // However, once we do poll `Ready` it should wipe the notify-required flag. + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!notifier.wait_timeout(Duration::from_millis(1))); + } + + #[test] + fn test_poll_post_notify_completes() { + // Tests that if we have a future state that has completed, and we haven't yet requested a + // new future, if we get a notify prior to requesting that second future it is generated + // pre-completed. + let notifier = Notifier::new(); + + notifier.notify(); + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!woken.load(Ordering::SeqCst)); + + notifier.notify(); + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!woken.load(Ordering::SeqCst)); + + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + assert!(!woken.load(Ordering::SeqCst)); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + } + + #[test] + fn test_poll_post_notify_completes_initial_notified() { + // Identical to the previous test, but the first future completes via a wake rather than an + // immediate `Poll::Ready`. + let notifier = Notifier::new(); + + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + + notifier.notify(); + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!woken.load(Ordering::SeqCst)); + + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + assert!(!woken.load(Ordering::SeqCst)); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + } }