Implement async versions of process_pending_events
authorWilmer Paulino <wilmer.paulino@gmail.com>
Wed, 2 Nov 2022 19:39:07 +0000 (12:39 -0700)
committerWilmer Paulino <wilmer.paulino@gmail.com>
Thu, 10 Nov 2022 18:57:12 +0000 (10:57 -0800)
lightning/src/chain/chainmonitor.rs
lightning/src/ln/channelmanager.rs

index c70eed1a1c82a5a72751789e68f0d57e7c844a67..17fe69182ee81d060bc5c7c8c092608770104c84 100644 (file)
@@ -36,7 +36,7 @@ use crate::util::atomic_counter::AtomicCounter;
 use crate::util::logger::Logger;
 use crate::util::errors::APIError;
 use crate::util::events;
-use crate::util::events::EventHandler;
+use crate::util::events::{Event, EventHandler};
 use crate::ln::channelmanager::ChannelDetails;
 
 use crate::prelude::*;
@@ -496,6 +496,24 @@ where C::Target: chain::Filter,
                self.process_pending_events(&event_handler);
                events.into_inner()
        }
+
+       /// Processes any events asynchronously in the order they were generated since the last call
+       /// using the given event handler.
+       ///
+       /// See the trait-level documentation of [`EventsProvider`] for requirements.
+       ///
+       /// [`EventsProvider`]: crate::util::events::EventsProvider
+       pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+               &self, handler: H
+       ) {
+               let mut pending_events = Vec::new();
+               for monitor_state in self.monitors.read().unwrap().values() {
+                       pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
+               }
+               for event in pending_events {
+                       handler(event).await;
+               }
+       }
 }
 
 impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
index 7f8c82b41bdb81e846e9f22c3ab871ace41b0260..06ef2aa0dc3422fc457ee05504c395e96f00eedc 100644 (file)
@@ -53,7 +53,7 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VA
 use crate::ln::wire::Encode;
 use crate::chain::keysinterface::{Sign, KeysInterface, KeysManager, Recipient};
 use crate::util::config::{UserConfig, ChannelConfig};
-use crate::util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
+use crate::util::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
 use crate::util::{byte_utils, events};
 use crate::util::wakers::{Future, Notifier};
 use crate::util::scid_utils::fake_scid;
@@ -5728,6 +5728,39 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
        pub fn clear_pending_payments(&self) {
                self.pending_outbound_payments.lock().unwrap().clear()
        }
+
+       /// Processes any events asynchronously in the order they were generated since the last call
+       /// using the given event handler.
+       ///
+       /// See the trait-level documentation of [`EventsProvider`] for requirements.
+       pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
+               &self, handler: H
+       ) {
+               // We'll acquire our total consistency lock until the returned future completes so that
+               // we can be sure no other persists happen while processing events.
+               let _read_guard = self.total_consistency_lock.read().unwrap();
+
+               let mut result = NotifyOption::SkipPersist;
+
+               // TODO: This behavior should be documented. It's unintuitive that we query
+               // ChannelMonitors when clearing other events.
+               if self.process_pending_monitor_events() {
+                       result = NotifyOption::DoPersist;
+               }
+
+               let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
+               if !pending_events.is_empty() {
+                       result = NotifyOption::DoPersist;
+               }
+
+               for event in pending_events {
+                       handler(event).await;
+               }
+
+               if result == NotifyOption::DoPersist {
+                       self.persistence_notifier.notify();
+               }
+       }
 }
 
 impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<M, T, K, F, L>