From: Matt Corallo Date: Wed, 26 Apr 2023 05:01:13 +0000 (+0000) Subject: Fix a leak in `FutureState` when a `Notifier` is dropped un-woken X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=357781504fc347470de096d87785911dde2f75ff;p=rust-lightning Fix a leak in `FutureState` when a `Notifier` is dropped un-woken If a `Notifier` has an internal `FutureState` which gathers some sleeper callbacks, but is never actaully woken, those callbacks will leak due to a circular `Arc` reference when the `Notifier` is `drop`'d. Because `Notifier`s are rarely `drop`'d in production this isn't a huge deal, but shows up materially in bindings tests as they spawn many nodes over the course of a short test. Fixes #2232 --- diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 2dbea705..37c036da 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -69,6 +69,7 @@ impl Notifier { } else { let state = Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), + callbacks_with_state: Vec::new(), complete: lock.0, callbacks_made: false, })); @@ -112,6 +113,7 @@ pub(crate) struct FutureState { // first bool - set to false if we're just calling a Waker, and true if we're calling an actual // user-provided function. callbacks: Vec<(bool, Box)>, + callbacks_with_state: Vec<(bool, Box>) -> () + Send>)>, complete: bool, callbacks_made: bool, } @@ -123,6 +125,10 @@ fn complete_future(this: &Arc>) -> bool { callback.call(); state.callbacks_made |= counts_as_call; } + for (counts_as_call, callback) in state.callbacks_with_state.drain(..) { + (callback)(this); + state.callbacks_made |= counts_as_call; + } state.complete = true; state.callbacks_made } @@ -240,14 +246,13 @@ impl Sleeper { for notifier_mtx in self.notifiers.iter() { let cv_ref = Arc::clone(&cv); let notified_fut_ref = Arc::clone(¬ified_fut_mtx); - let notifier_ref = Arc::clone(¬ifier_mtx); let mut notifier = notifier_mtx.lock().unwrap(); if notifier.complete { - *notified_fut_mtx.lock().unwrap() = Some(notifier_ref); + *notified_fut_mtx.lock().unwrap() = Some(Arc::clone(¬ifier_mtx)); break; } - notifier.callbacks.push((false, Box::new(move || { - *notified_fut_ref.lock().unwrap() = Some(Arc::clone(¬ifier_ref)); + notifier.callbacks_with_state.push((false, Box::new(move |notifier_ref| { + *notified_fut_ref.lock().unwrap() = Some(Arc::clone(notifier_ref)); cv_ref.notify_all(); }))); } @@ -407,11 +412,50 @@ mod tests { } } + #[cfg(feature = "std")] + #[test] + fn test_state_drops() { + // Previously, there was a leak if a `Notifier` was `drop`ed without ever being notified + // but after having been slept-on. This tests for that leak. + use crate::sync::Arc; + use std::thread; + + let notifier_a = Arc::new(Notifier::new()); + let notifier_b = Arc::new(Notifier::new()); + + let thread_notifier_a = Arc::clone(¬ifier_a); + + let future_a = notifier_a.get_future(); + let future_state_a = Arc::downgrade(&future_a.state); + + let future_b = notifier_b.get_future(); + let future_state_b = Arc::downgrade(&future_b.state); + + let join_handle = thread::spawn(move || { + // Let the other thread get to the wait point, then notify it. + std::thread::sleep(Duration::from_millis(50)); + thread_notifier_a.notify(); + }); + + // Wait on the other thread to finish its sleep, note that the leak only happened if we + // actually have to sleep here, not if we immediately return. + Sleeper::from_two_futures(future_a, future_b).wait(); + + join_handle.join().unwrap(); + + // then drop the notifiers and make sure the future states are gone. + mem::drop(notifier_a); + mem::drop(notifier_b); + + assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none()); + } + #[test] fn test_future_callbacks() { let future = Future { state: Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), + callbacks_with_state: Vec::new(), complete: false, callbacks_made: false, })) @@ -431,6 +475,7 @@ mod tests { let future = Future { state: Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), + callbacks_with_state: Vec::new(), complete: false, callbacks_made: false, })) @@ -469,6 +514,7 @@ mod tests { let mut future = Future { state: Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), + callbacks_with_state: Vec::new(), complete: false, callbacks_made: false, }))