Fix a leak in `FutureState` when a `Notifier` is dropped un-woken
authorMatt Corallo <git@bluematt.me>
Wed, 26 Apr 2023 05:01:13 +0000 (05:01 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 26 Apr 2023 17:38:04 +0000 (17:38 +0000)
If a `Notifier` has an internal `FutureState` which gathers some
sleeper callbacks, but is never actaully woken, those callbacks
will leak due to a circular `Arc` reference when the `Notifier` is
`drop`'d.

Because `Notifier`s are rarely `drop`'d in production this isn't a
huge deal, but shows up materially in bindings tests as they spawn
many nodes over the course of a short test.

Fixes #2232

lightning/src/util/wakers.rs

index 2dbea705a3160b44cdaa59dd455b1ced9055e039..37c036da9594747ea81b142e151ff960c2892dad 100644 (file)
@@ -69,6 +69,7 @@ impl Notifier {
                } else {
                        let state = Arc::new(Mutex::new(FutureState {
                                callbacks: Vec::new(),
+                               callbacks_with_state: Vec::new(),
                                complete: lock.0,
                                callbacks_made: false,
                        }));
@@ -112,6 +113,7 @@ pub(crate) struct FutureState {
        // first bool - set to false if we're just calling a Waker, and true if we're calling an actual
        // user-provided function.
        callbacks: Vec<(bool, Box<dyn FutureCallback>)>,
+       callbacks_with_state: Vec<(bool, Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>)>,
        complete: bool,
        callbacks_made: bool,
 }
@@ -123,6 +125,10 @@ fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
                callback.call();
                state.callbacks_made |= counts_as_call;
        }
+       for (counts_as_call, callback) in state.callbacks_with_state.drain(..) {
+               (callback)(this);
+               state.callbacks_made |= counts_as_call;
+       }
        state.complete = true;
        state.callbacks_made
 }
@@ -240,14 +246,13 @@ impl Sleeper {
                        for notifier_mtx in self.notifiers.iter() {
                                let cv_ref = Arc::clone(&cv);
                                let notified_fut_ref = Arc::clone(&notified_fut_mtx);
-                               let notifier_ref = Arc::clone(&notifier_mtx);
                                let mut notifier = notifier_mtx.lock().unwrap();
                                if notifier.complete {
-                                       *notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
+                                       *notified_fut_mtx.lock().unwrap() = Some(Arc::clone(&notifier_mtx));
                                        break;
                                }
-                               notifier.callbacks.push((false, Box::new(move || {
-                                       *notified_fut_ref.lock().unwrap() = Some(Arc::clone(&notifier_ref));
+                               notifier.callbacks_with_state.push((false, Box::new(move |notifier_ref| {
+                                       *notified_fut_ref.lock().unwrap() = Some(Arc::clone(notifier_ref));
                                        cv_ref.notify_all();
                                })));
                        }
@@ -407,11 +412,50 @@ mod tests {
                }
        }
 
+       #[cfg(feature = "std")]
+       #[test]
+       fn test_state_drops() {
+               // Previously, there was a leak if a `Notifier` was `drop`ed without ever being notified
+               // but after having been slept-on. This tests for that leak.
+               use crate::sync::Arc;
+               use std::thread;
+
+               let notifier_a = Arc::new(Notifier::new());
+               let notifier_b = Arc::new(Notifier::new());
+
+               let thread_notifier_a = Arc::clone(&notifier_a);
+
+               let future_a = notifier_a.get_future();
+               let future_state_a = Arc::downgrade(&future_a.state);
+
+               let future_b = notifier_b.get_future();
+               let future_state_b = Arc::downgrade(&future_b.state);
+
+               let join_handle = thread::spawn(move || {
+                       // Let the other thread get to the wait point, then notify it.
+                       std::thread::sleep(Duration::from_millis(50));
+                       thread_notifier_a.notify();
+               });
+
+               // Wait on the other thread to finish its sleep, note that the leak only happened if we
+               // actually have to sleep here, not if we immediately return.
+               Sleeper::from_two_futures(future_a, future_b).wait();
+
+               join_handle.join().unwrap();
+
+               // then drop the notifiers and make sure the future states are gone.
+               mem::drop(notifier_a);
+               mem::drop(notifier_b);
+
+               assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
+       }
+
        #[test]
        fn test_future_callbacks() {
                let future = Future {
                        state: Arc::new(Mutex::new(FutureState {
                                callbacks: Vec::new(),
+                               callbacks_with_state: Vec::new(),
                                complete: false,
                                callbacks_made: false,
                        }))
@@ -431,6 +475,7 @@ mod tests {
                let future = Future {
                        state: Arc::new(Mutex::new(FutureState {
                                callbacks: Vec::new(),
+                               callbacks_with_state: Vec::new(),
                                complete: false,
                                callbacks_made: false,
                        }))
@@ -469,6 +514,7 @@ mod tests {
                let mut future = Future {
                        state: Arc::new(Mutex::new(FutureState {
                                callbacks: Vec::new(),
+                               callbacks_with_state: Vec::new(),
                                complete: false,
                                callbacks_made: false,
                        }))