X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=8dba235f2127fd98faf400fa3b83978047b11bdd;hb=b79ff71fe7587b03ab0b7fe8b6229eb4bc2d7f08;hp=c70eed1a1c82a5a72751789e68f0d57e7c844a67;hpb=05cb46723489a91cf14e6e1ada27589a30bd9040;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index c70eed1a..8dba235f 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -36,7 +36,7 @@ use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::Logger; use crate::util::errors::APIError; use crate::util::events; -use crate::util::events::EventHandler; +use crate::util::events::{Event, EventHandler}; use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; @@ -496,6 +496,24 @@ where C::Target: chain::Filter, self.process_pending_events(&event_handler); events.into_inner() } + + /// Processes any events asynchronously in the order they were generated since the last call + /// using the given event handler. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + /// + /// [`EventsProvider`]: crate::util::events::EventsProvider + pub async fn process_pending_events_async Future>( + &self, handler: H + ) { + let mut pending_events = Vec::new(); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); + } + for event in pending_events { + handler(event).await; + } + } } impl @@ -797,6 +815,7 @@ mod tests { chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear(); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); @@ -805,8 +824,6 @@ mod tests { check_added_monitors!(nodes[1], 1); expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); - chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone(); assert_eq!(persistences.len(), 1); let (funding_txo, updates) = persistences.iter().next().unwrap();