X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=17fe69182ee81d060bc5c7c8c092608770104c84;hb=49c9f1885dd7a564c0c78ad5f73ea4792c0171a8;hp=74e520a2a376cc7296fc2d9a1ab9efd56e3f0f3e;hpb=a10223d1ff874c293622a433eb2b23568435330d;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 74e520a2..17fe6918 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -24,7 +24,7 @@ //! servicing [`ChannelMonitor`] updates from the client. use bitcoin::blockdata::block::BlockHeader; -use bitcoin::hash_types::Txid; +use bitcoin::hash_types::{Txid, BlockHash}; use crate::chain; use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; @@ -36,7 +36,7 @@ use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::Logger; use crate::util::errors::APIError; use crate::util::events; -use crate::util::events::EventHandler; +use crate::util::events::{Event, EventHandler}; use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; @@ -395,6 +395,23 @@ where C::Target: chain::Filter, self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() } + #[cfg(not(c_bindings))] + /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). + pub fn list_pending_monitor_updates(&self) -> HashMap> { + self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) + }).collect() + } + + #[cfg(c_bindings)] + /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). + pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec)> { + self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) + }).collect() + } + + #[cfg(test)] pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor { self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor @@ -475,10 +492,28 @@ where C::Target: chain::Filter, pub fn get_and_clear_pending_events(&self) -> Vec { use crate::util::events::EventsProvider; let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone()); + let event_handler = |event: events::Event| events.borrow_mut().push(event); self.process_pending_events(&event_handler); events.into_inner() } + + /// Processes any events asynchronously in the order they were generated since the last call + /// using the given event handler. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + /// + /// [`EventsProvider`]: crate::util::events::EventsProvider + pub async fn process_pending_events_async Future>( + &self, handler: H + ) { + let mut pending_events = Vec::new(); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); + } + for event in pending_events { + handler(event).await; + } + } } impl @@ -544,7 +579,7 @@ where }); } - fn get_relevant_txids(&self) -> Vec { + fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { let mut txids = Vec::new(); let monitor_states = self.monitors.read().unwrap(); for monitor_state in monitor_states.values() { @@ -719,8 +754,8 @@ impl even for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } #[cfg(anchors)] @@ -742,8 +777,8 @@ impl even for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } } @@ -798,7 +833,22 @@ mod tests { // Note that updates is a HashMap so the ordering here is actually random. This shouldn't // fail either way but if it fails intermittently it's depending on the ordering of updates. let mut update_iter = updates.iter(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); + let next_update = update_iter.next().unwrap().clone(); + // Should contain next_update when pending updates listed. + #[cfg(not(c_bindings))] + assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo) + .unwrap().contains(&next_update)); + #[cfg(c_bindings)] + assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter() + .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, next_update.clone()).unwrap(); + // Should not contain the previously pending next_update when pending updates listed. + #[cfg(not(c_bindings))] + assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo) + .unwrap().contains(&next_update)); + #[cfg(c_bindings)] + assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter() + .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty()); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();