Separate ChannelManager needing persistence from having events
authorMatt Corallo <git@bluematt.me>
Fri, 8 Sep 2023 20:26:29 +0000 (20:26 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 12 Sep 2023 19:06:34 +0000 (19:06 +0000)
Currently, when a ChannelManager generates a notification for the
background processor, any pending events are handled and the
ChannelManager is always re-persisted.

Many channel related messages don't actually change the channel
state in a way that changes the persisted channel. For example,
an `update_add_htlc` or `update_fail_htlc` message simply adds the
change to a queue, changing the channel state when we receive a
`commitment_signed` message.

In these cases we shouldn't be re-persisting the ChannelManager as
it hasn't changed (persisted) state at all. In anticipation of
doing so in the next few commits, here we make the public API
handle the two concepts (somewhat) separately. The notification
still goes out via a single waker, however whether or not to
persist is now handled via a separate atomic bool.

lightning-background-processor/src/lib.rs
lightning/src/ln/channelmanager.rs

index c17ab522eba8bcdf2133c8d36eac6b5b5b7932ec..6a36874a384bf9ca23a91d2cef6557853943aa2f 100644 (file)
@@ -315,7 +315,7 @@ macro_rules! define_run_body {
                        // see `await_start`'s use below.
                        let mut await_start = None;
                        if $check_slow_await { await_start = Some($get_timer(1)); }
-                       let updates_available = $await;
+                       $await;
                        let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
 
                        // Exit the loop if the background processor was requested to stop.
@@ -324,7 +324,7 @@ macro_rules! define_run_body {
                                break;
                        }
 
-                       if updates_available {
+                       if $channel_manager.get_and_clear_needs_persistence() {
                                log_trace!($logger, "Persisting ChannelManager...");
                                $persister.persist_manager(&*$channel_manager)?;
                                log_trace!($logger, "Done persisting ChannelManager.");
@@ -660,11 +660,9 @@ where
                                c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
                        };
                        match fut.await {
-                               SelectorOutput::A => true,
-                               SelectorOutput::B => false,
+                               SelectorOutput::A|SelectorOutput::B => {},
                                SelectorOutput::C(exit) => {
                                        should_break = exit;
-                                       false
                                }
                        }
                }, |t| sleeper(Duration::from_secs(t)),
@@ -787,10 +785,10 @@ impl BackgroundProcessor {
                        define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
                                channel_manager, channel_manager.process_pending_events(&event_handler),
                                gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
-                               Sleeper::from_two_futures(
+                               Sleeper::from_two_futures(
                                        channel_manager.get_event_or_persistence_needed_future(),
                                        chain_monitor.get_update_future()
-                               ).wait_timeout(Duration::from_millis(100)),
+                               ).wait_timeout(Duration::from_millis(100)); },
                                |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
index f3dbd9c253fad5016c574671e39db238f75b131a..6f3b2e4577017714fcbcebadc24346afcf53d959 100644 (file)
@@ -1186,6 +1186,7 @@ where
        background_events_processed_since_startup: AtomicBool,
 
        event_persist_notifier: Notifier,
+       needs_persist_flag: AtomicBool,
 
        entropy_source: ES,
        node_signer: NS,
@@ -1229,6 +1230,7 @@ enum NotifyOption {
 /// `optionally_notify` which returns a `NotifyOption`.
 struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
        event_persist_notifier: &'a Notifier,
+       needs_persist_flag: &'a AtomicBool,
        should_persist: F,
        // We hold onto this result so the lock doesn't get released immediately.
        _read_guard: RwLockReadGuard<'a, ()>,
@@ -1246,6 +1248,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w
 
                PersistenceNotifierGuard {
                        event_persist_notifier: &cm.get_cm().event_persist_notifier,
+                       needs_persist_flag: &cm.get_cm().needs_persist_flag,
                        should_persist: move || {
                                // Pick the "most" action between `persist_check` and the background events
                                // processing and return that.
@@ -1266,6 +1269,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w
 
                PersistenceNotifierGuard {
                        event_persist_notifier: &cm.get_cm().event_persist_notifier,
+                       needs_persist_flag: &cm.get_cm().needs_persist_flag,
                        should_persist: persist_check,
                        _read_guard: read_guard,
                }
@@ -1275,6 +1279,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w
 impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
        fn drop(&mut self) {
                if (self.should_persist)() == NotifyOption::DoPersist {
+                       self.needs_persist_flag.store(true, Ordering::Release);
                        self.event_persist_notifier.notify();
                }
        }
@@ -2137,6 +2142,7 @@ macro_rules! process_events_body {
                        }
 
                        if result == NotifyOption::DoPersist {
+                               $self.needs_persist_flag.store(true, Ordering::Release);
                                $self.event_persist_notifier.notify();
                        }
                }
@@ -2216,7 +2222,9 @@ where
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
+
                        event_persist_notifier: Notifier::new(),
+                       needs_persist_flag: AtomicBool::new(false),
 
                        entropy_source,
                        node_signer,
@@ -7381,15 +7389,23 @@ where
                }
        }
 
-       /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted.
+       /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or
+       /// may have events that need processing.
+       ///
+       /// In order to check if this [`ChannelManager`] needs persisting, call
+       /// [`Self::get_and_clear_needs_persistence`].
        ///
        /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
        /// [`ChannelManager`] and should instead register actions to be taken later.
-       ///
        pub fn get_event_or_persistence_needed_future(&self) -> Future {
                self.event_persist_notifier.get_future()
        }
 
+       /// Returns true if this [`ChannelManager`] needs to be persisted.
+       pub fn get_and_clear_needs_persistence(&self) -> bool {
+               self.needs_persist_flag.swap(false, Ordering::AcqRel)
+       }
+
        #[cfg(any(test, feature = "_test_utils"))]
        pub fn get_event_or_persist_condvar_value(&self) -> bool {
                self.event_persist_notifier.notify_pending()
@@ -9562,7 +9578,9 @@ where
                        pending_background_events: Mutex::new(pending_background_events),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
+
                        event_persist_notifier: Notifier::new(),
+                       needs_persist_flag: AtomicBool::new(false),
 
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,