X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=17fe69182ee81d060bc5c7c8c092608770104c84;hb=49c9f1885dd7a564c0c78ad5f73ea4792c0171a8;hp=3949810bf3ef33da01c3bbbd9b1cce0a1cfe65fc;hpb=843b8263d5af413d687ea7de36db364251b6f352;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 3949810b..17fe6918 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -24,23 +24,23 @@ //! servicing [`ChannelMonitor`] updates from the client. use bitcoin::blockdata::block::BlockHeader; -use bitcoin::hash_types::Txid; - -use chain; -use chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; -use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; -use chain::transaction::{OutPoint, TransactionData}; -use chain::keysinterface::Sign; -use util::atomic_counter::AtomicCounter; -use util::logger::Logger; -use util::errors::APIError; -use util::events; -use util::events::EventHandler; -use ln::channelmanager::ChannelDetails; - -use prelude::*; -use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; +use bitcoin::hash_types::{Txid, BlockHash}; + +use crate::chain; +use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; +use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; +use crate::chain::transaction::{OutPoint, TransactionData}; +use crate::chain::keysinterface::Sign; +use crate::util::atomic_counter::AtomicCounter; +use crate::util::logger::Logger; +use crate::util::errors::APIError; +use crate::util::events; +use crate::util::events::{Event, EventHandler}; +use crate::ln::channelmanager::ChannelDetails; + +use crate::prelude::*; +use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::ops::Deref; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; @@ -395,6 +395,23 @@ where C::Target: chain::Filter, self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() } + #[cfg(not(c_bindings))] + /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). + pub fn list_pending_monitor_updates(&self) -> HashMap> { + self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) + }).collect() + } + + #[cfg(c_bindings)] + /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). + pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec)> { + self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) + }).collect() + } + + #[cfg(test)] pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor { self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor @@ -473,12 +490,30 @@ where C::Target: chain::Filter, #[cfg(any(test, fuzzing, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { - use util::events::EventsProvider; + use crate::util::events::EventsProvider; let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone()); + let event_handler = |event: events::Event| events.borrow_mut().push(event); self.process_pending_events(&event_handler); events.into_inner() } + + /// Processes any events asynchronously in the order they were generated since the last call + /// using the given event handler. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + /// + /// [`EventsProvider`]: crate::util::events::EventsProvider + pub async fn process_pending_events_async Future>( + &self, handler: H + ) { + let mut pending_events = Vec::new(); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); + } + for event in pending_events { + handler(event).await; + } + } } impl @@ -544,7 +579,7 @@ where }); } - fn get_relevant_txids(&self) -> Vec { + fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { let mut txids = Vec::new(); let monitor_states = self.monitors.read().unwrap(); for monitor_state in monitor_states.values() { @@ -719,8 +754,8 @@ impl even for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } #[cfg(anchors)] @@ -742,8 +777,8 @@ impl even for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } } @@ -752,16 +787,16 @@ impl even mod tests { use bitcoin::{BlockHeader, TxMerkleNode}; use bitcoin::hashes::Hash; - use ::{check_added_monitors, check_closed_broadcast, check_closed_event}; - use ::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg}; - use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err}; - use chain::{ChannelMonitorUpdateStatus, Confirm, Watch}; - use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS; - use ln::channelmanager::{self, PaymentSendFailure}; - use ln::functional_test_utils::*; - use ln::msgs::ChannelMessageHandler; - use util::errors::APIError; - use util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider}; + use crate::{check_added_monitors, check_closed_broadcast, check_closed_event}; + use crate::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg}; + use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err}; + use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch}; + use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS; + use crate::ln::channelmanager::{self, PaymentSendFailure, PaymentId}; + use crate::ln::functional_test_utils::*; + use crate::ln::msgs::ChannelMessageHandler; + use crate::util::errors::APIError; + use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider}; #[test] fn test_async_ooo_offchain_updates() { @@ -798,7 +833,22 @@ mod tests { // Note that updates is a HashMap so the ordering here is actually random. This shouldn't // fail either way but if it fails intermittently it's depending on the ordering of updates. let mut update_iter = updates.iter(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); + let next_update = update_iter.next().unwrap().clone(); + // Should contain next_update when pending updates listed. + #[cfg(not(c_bindings))] + assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo) + .unwrap().contains(&next_update)); + #[cfg(c_bindings)] + assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter() + .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, next_update.clone()).unwrap(); + // Should not contain the previously pending next_update when pending updates listed. + #[cfg(not(c_bindings))] + assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo) + .unwrap().contains(&next_update)); + #[cfg(c_bindings)] + assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter() + .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty()); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); @@ -883,7 +933,7 @@ mod tests { // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass // the update through to the ChannelMonitor which will refuse it (as the channel is closed). chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret)), + unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret), PaymentId(second_payment_hash.0)), true, APIError::ChannelUnavailable { ref err }, assert!(err.contains("ChannelMonitor storage failure"))); check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update