From: Wilmer Paulino Date: Wed, 2 Nov 2022 19:39:07 +0000 (-0700) Subject: Implement async versions of process_pending_events X-Git-Tag: v0.0.113~45^2~3 X-Git-Url: http://git.bitcoin.ninja/?a=commitdiff_plain;h=55b714c01d059b637bd9d913dd4f5d8c0dc24c3e;p=rust-lightning Implement async versions of process_pending_events --- diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index c70eed1a1..17fe69182 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -36,7 +36,7 @@ use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::Logger; use crate::util::errors::APIError; use crate::util::events; -use crate::util::events::EventHandler; +use crate::util::events::{Event, EventHandler}; use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; @@ -496,6 +496,24 @@ where C::Target: chain::Filter, self.process_pending_events(&event_handler); events.into_inner() } + + /// Processes any events asynchronously in the order they were generated since the last call + /// using the given event handler. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + /// + /// [`EventsProvider`]: crate::util::events::EventsProvider + pub async fn process_pending_events_async Future>( + &self, handler: H + ) { + let mut pending_events = Vec::new(); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); + } + for event in pending_events { + handler(event).await; + } + } } impl diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 7f8c82b41..06ef2aa0d 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -53,7 +53,7 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VA use crate::ln::wire::Encode; use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, Recipient}; use crate::util::config::{UserConfig, ChannelConfig}; -use crate::util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; +use crate::util::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; use crate::util::{byte_utils, events}; use crate::util::wakers::{Future, Notifier}; use crate::util::scid_utils::fake_scid; @@ -5728,6 +5728,39 @@ impl ChannelManager Future>( + &self, handler: H + ) { + // We'll acquire our total consistency lock until the returned future completes so that + // we can be sure no other persists happen while processing events. + let _read_guard = self.total_consistency_lock.read().unwrap(); + + let mut result = NotifyOption::SkipPersist; + + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + + let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } + + for event in pending_events { + handler(event).await; + } + + if result == NotifyOption::DoPersist { + self.persistence_notifier.notify(); + } + } } impl MessageSendEventsProvider for ChannelManager