Currently, when a ChannelManager generates a notification for the
background processor, any pending events are handled and the
ChannelManager is always re-persisted.
Many channel related messages don't actually change the channel
state in a way that changes the persisted channel. For example,
an `update_add_htlc` or `update_fail_htlc` message simply adds the
change to a queue, changing the channel state when we receive a
`commitment_signed` message.
In these cases we shouldn't be re-persisting the ChannelManager as
it hasn't changed (persisted) state at all. In anticipation of
doing so in the next few commits, here we make the public API
handle the two concepts (somewhat) separately. The notification
still goes out via a single waker, however whether or not to
persist is now handled via a separate atomic bool.
// see `await_start`'s use below.
let mut await_start = None;
if $check_slow_await { await_start = Some($get_timer(1)); }
// see `await_start`'s use below.
let mut await_start = None;
if $check_slow_await { await_start = Some($get_timer(1)); }
- let updates_available = $await;
let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
// Exit the loop if the background processor was requested to stop.
let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
// Exit the loop if the background processor was requested to stop.
+ if $channel_manager.get_and_clear_needs_persistence() {
log_trace!($logger, "Persisting ChannelManager...");
$persister.persist_manager(&*$channel_manager)?;
log_trace!($logger, "Done persisting ChannelManager.");
log_trace!($logger, "Persisting ChannelManager...");
$persister.persist_manager(&*$channel_manager)?;
log_trace!($logger, "Done persisting ChannelManager.");
c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
};
match fut.await {
c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
};
match fut.await {
- SelectorOutput::A => true,
- SelectorOutput::B => false,
+ SelectorOutput::A|SelectorOutput::B => {},
SelectorOutput::C(exit) => {
should_break = exit;
SelectorOutput::C(exit) => {
should_break = exit;
}
}
}, |t| sleeper(Duration::from_secs(t)),
}
}
}, |t| sleeper(Duration::from_secs(t)),
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
- Sleeper::from_two_futures(
+ { Sleeper::from_two_futures(
channel_manager.get_event_or_persistence_needed_future(),
chain_monitor.get_update_future()
channel_manager.get_event_or_persistence_needed_future(),
chain_monitor.get_update_future()
- ).wait_timeout(Duration::from_millis(100)),
+ ).wait_timeout(Duration::from_millis(100)); },
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
background_events_processed_since_startup: AtomicBool,
event_persist_notifier: Notifier,
background_events_processed_since_startup: AtomicBool,
event_persist_notifier: Notifier,
+ needs_persist_flag: AtomicBool,
entropy_source: ES,
node_signer: NS,
entropy_source: ES,
node_signer: NS,
/// `optionally_notify` which returns a `NotifyOption`.
struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
event_persist_notifier: &'a Notifier,
/// `optionally_notify` which returns a `NotifyOption`.
struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> {
event_persist_notifier: &'a Notifier,
+ needs_persist_flag: &'a AtomicBool,
should_persist: F,
// We hold onto this result so the lock doesn't get released immediately.
_read_guard: RwLockReadGuard<'a, ()>,
should_persist: F,
// We hold onto this result so the lock doesn't get released immediately.
_read_guard: RwLockReadGuard<'a, ()>,
PersistenceNotifierGuard {
event_persist_notifier: &cm.get_cm().event_persist_notifier,
PersistenceNotifierGuard {
event_persist_notifier: &cm.get_cm().event_persist_notifier,
+ needs_persist_flag: &cm.get_cm().needs_persist_flag,
should_persist: move || {
// Pick the "most" action between `persist_check` and the background events
// processing and return that.
should_persist: move || {
// Pick the "most" action between `persist_check` and the background events
// processing and return that.
PersistenceNotifierGuard {
event_persist_notifier: &cm.get_cm().event_persist_notifier,
PersistenceNotifierGuard {
event_persist_notifier: &cm.get_cm().event_persist_notifier,
+ needs_persist_flag: &cm.get_cm().needs_persist_flag,
should_persist: persist_check,
_read_guard: read_guard,
}
should_persist: persist_check,
_read_guard: read_guard,
}
impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
fn drop(&mut self) {
if (self.should_persist)() == NotifyOption::DoPersist {
impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> {
fn drop(&mut self) {
if (self.should_persist)() == NotifyOption::DoPersist {
+ self.needs_persist_flag.store(true, Ordering::Release);
self.event_persist_notifier.notify();
}
}
self.event_persist_notifier.notify();
}
}
}
if result == NotifyOption::DoPersist {
}
if result == NotifyOption::DoPersist {
+ $self.needs_persist_flag.store(true, Ordering::Release);
$self.event_persist_notifier.notify();
}
}
$self.event_persist_notifier.notify();
}
}
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
event_persist_notifier: Notifier::new(),
event_persist_notifier: Notifier::new(),
+ needs_persist_flag: AtomicBool::new(false),
entropy_source,
node_signer,
entropy_source,
node_signer,
- /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted.
+ /// Gets a [`Future`] that completes when this [`ChannelManager`] may need to be persisted or
+ /// may have events that need processing.
+ ///
+ /// In order to check if this [`ChannelManager`] needs persisting, call
+ /// [`Self::get_and_clear_needs_persistence`].
///
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
/// [`ChannelManager`] and should instead register actions to be taken later.
///
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
/// [`ChannelManager`] and should instead register actions to be taken later.
pub fn get_event_or_persistence_needed_future(&self) -> Future {
self.event_persist_notifier.get_future()
}
pub fn get_event_or_persistence_needed_future(&self) -> Future {
self.event_persist_notifier.get_future()
}
+ /// Returns true if this [`ChannelManager`] needs to be persisted.
+ pub fn get_and_clear_needs_persistence(&self) -> bool {
+ self.needs_persist_flag.swap(false, Ordering::AcqRel)
+ }
+
#[cfg(any(test, feature = "_test_utils"))]
pub fn get_event_or_persist_condvar_value(&self) -> bool {
self.event_persist_notifier.notify_pending()
#[cfg(any(test, feature = "_test_utils"))]
pub fn get_event_or_persist_condvar_value(&self) -> bool {
self.event_persist_notifier.notify_pending()
pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
event_persist_notifier: Notifier::new(),
event_persist_notifier: Notifier::new(),
+ needs_persist_flag: AtomicBool::new(false),
entropy_source: args.entropy_source,
node_signer: args.node_signer,
entropy_source: args.entropy_source,
node_signer: args.node_signer,