From 7fa499c188294cf4179e57cc47050e9a261ed72b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 8 Sep 2023 20:26:29 +0000 Subject: [PATCH] Separate ChannelManager needing persistence from having events Currently, when a ChannelManager generates a notification for the background processor, any pending events are handled and the ChannelManager is always re-persisted. Many channel related messages don't actually change the channel state in a way that changes the persisted channel. For example, an `update_add_htlc` or `update_fail_htlc` message simply adds the change to a queue, changing the channel state when we receive a `commitment_signed` message. In these cases we shouldn't be re-persisting the ChannelManager as it hasn't changed (persisted) state at all. In anticipation of doing so in the next few commits, here we make the public API handle the two concepts (somewhat) separately. The notification still goes out via a single waker, however whether or not to persist is now handled via a separate atomic bool. --- lightning-background-processor/src/lib.rs | 12 +++++------- lightning/src/ln/channelmanager.rs | 22 ++++++++++++++++++++-- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c17ab522e..6a36874a3 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -315,7 +315,7 @@ macro_rules! define_run_body { // see `await_start`'s use below. let mut await_start = None; if $check_slow_await { await_start = Some($get_timer(1)); } - let updates_available = $await; + $await; let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false }; // Exit the loop if the background processor was requested to stop. @@ -324,7 +324,7 @@ macro_rules! define_run_body { break; } - if updates_available { + if $channel_manager.get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); $persister.persist_manager(&*$channel_manager)?; log_trace!($logger, "Done persisting ChannelManager."); @@ -660,11 +660,9 @@ where c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), }; match fut.await { - SelectorOutput::A => true, - SelectorOutput::B => false, + SelectorOutput::A|SelectorOutput::B => {}, SelectorOutput::C(exit) => { should_break = exit; - false } } }, |t| sleeper(Duration::from_secs(t)), @@ -787,10 +785,10 @@ impl BackgroundProcessor { define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), - Sleeper::from_two_futures( + { Sleeper::from_two_futures( channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() - ).wait_timeout(Duration::from_millis(100)), + ).wait_timeout(Duration::from_millis(100)); }, |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index f3dbd9c25..6f3b2e457 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1186,6 +1186,7 @@ where background_events_processed_since_startup: AtomicBool, event_persist_notifier: Notifier, + needs_persist_flag: AtomicBool, entropy_source: ES, node_signer: NS, @@ -1229,6 +1230,7 @@ enum NotifyOption { /// `optionally_notify` which returns a `NotifyOption`. struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { event_persist_notifier: &'a Notifier, + needs_persist_flag: &'a AtomicBool, should_persist: F, // We hold onto this result so the lock doesn't get released immediately. _read_guard: RwLockReadGuard<'a, ()>, @@ -1246,6 +1248,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w PersistenceNotifierGuard { event_persist_notifier: &cm.get_cm().event_persist_notifier, + needs_persist_flag: &cm.get_cm().needs_persist_flag, should_persist: move || { // Pick the "most" action between `persist_check` and the background events // processing and return that. @@ -1266,6 +1269,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w PersistenceNotifierGuard { event_persist_notifier: &cm.get_cm().event_persist_notifier, + needs_persist_flag: &cm.get_cm().needs_persist_flag, should_persist: persist_check, _read_guard: read_guard, } @@ -1275,6 +1279,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { if (self.should_persist)() == NotifyOption::DoPersist { + self.needs_persist_flag.store(true, Ordering::Release); self.event_persist_notifier.notify(); } } @@ -2137,6 +2142,7 @@ macro_rules! process_events_body { } if result == NotifyOption::DoPersist { + $self.needs_persist_flag.store(true, Ordering::Release); $self.event_persist_notifier.notify(); } } @@ -2216,7 +2222,9 @@ where pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), + event_persist_notifier: Notifier::new(), + needs_persist_flag: AtomicBool::new(false), entropy_source, node_signer, @@ -7381,15 +7389,23 @@ where } } - /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted. + /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or + /// may have events that need processing. + /// + /// In order to check if this [`ChannelManager`] needs persisting, call + /// [`Self::get_and_clear_needs_persistence`]. /// /// Note that callbacks registered on the [`Future`] MUST NOT call back into this /// [`ChannelManager`] and should instead register actions to be taken later. - /// pub fn get_event_or_persistence_needed_future(&self) -> Future { self.event_persist_notifier.get_future() } + /// Returns true if this [`ChannelManager`] needs to be persisted. + pub fn get_and_clear_needs_persistence(&self) -> bool { + self.needs_persist_flag.swap(false, Ordering::AcqRel) + } + #[cfg(any(test, feature = "_test_utils"))] pub fn get_event_or_persist_condvar_value(&self) -> bool { self.event_persist_notifier.notify_pending() @@ -9562,7 +9578,9 @@ where pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), + event_persist_notifier: Notifier::new(), + needs_persist_flag: AtomicBool::new(false), entropy_source: args.entropy_source, node_signer: args.node_signer, -- 2.39.5