X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=430f6bbac1d2244b7d49c37d10a51153484a5c28;hb=98417a16dff86c35aa5f51303b0f05177b828ccd;hp=f5aa47d18d76c722d221c90ca6782a5cf1cf51dc;hpb=fc9a4c22d195a75ad5942eed271757f285452214;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index f5aa47d1..430f6bba 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -24,7 +24,7 @@ //! servicing [`ChannelMonitor`] updates from the client. use bitcoin::blockdata::block::BlockHeader; -use bitcoin::hash_types::Txid; +use bitcoin::hash_types::{Txid, BlockHash}; use crate::chain; use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; @@ -36,7 +36,7 @@ use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::Logger; use crate::util::errors::APIError; use crate::util::events; -use crate::util::events::EventHandler; +use crate::util::events::{Event, EventHandler}; use crate::ln::channelmanager::ChannelDetails; use crate::prelude::*; @@ -144,7 +144,7 @@ pub trait Persist { /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; } struct MonitorHolder { @@ -294,7 +294,7 @@ where C::Target: chain::Filter, } log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); - match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) { + match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { ChannelMonitorUpdateStatus::Completed => log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), ChannelMonitorUpdateStatus::PermanentFailure => { @@ -395,6 +395,23 @@ where C::Target: chain::Filter, self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() } + #[cfg(not(c_bindings))] + /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). + pub fn list_pending_monitor_updates(&self) -> HashMap> { + self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) + }).collect() + } + + #[cfg(c_bindings)] + /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). + pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec)> { + self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) + }).collect() + } + + #[cfg(test)] pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor { self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor @@ -475,10 +492,28 @@ where C::Target: chain::Filter, pub fn get_and_clear_pending_events(&self) -> Vec { use crate::util::events::EventsProvider; let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone()); + let event_handler = |event: events::Event| events.borrow_mut().push(event); self.process_pending_events(&event_handler); events.into_inner() } + + /// Processes any events asynchronously in the order they were generated since the last call + /// using the given event handler. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + /// + /// [`EventsProvider`]: crate::util::events::EventsProvider + pub async fn process_pending_events_async Future>( + &self, handler: H + ) { + let mut pending_events = Vec::new(); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); + } + for event in pending_events { + handler(event).await; + } + } } impl @@ -544,7 +579,7 @@ where }); } - fn get_relevant_txids(&self) -> Vec { + fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { let mut txids = Vec::new(); let monitor_states = self.monitors.read().unwrap(); for monitor_state in monitor_states.values() { @@ -611,7 +646,7 @@ where C::Target: chain::Filter, /// Note that we persist the given `ChannelMonitor` update while holding the /// `ChainMonitor` monitors lock. - fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { + fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { // Update the monitor that watches the channel referred to by the given outpoint. let monitors = self.monitors.read().unwrap(); match monitors.get(&funding_txo) { @@ -629,15 +664,15 @@ where C::Target: chain::Filter, Some(monitor_state) => { let monitor = &monitor_state.monitor; log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor)); - let update_res = monitor.update_monitor(&update, &self.broadcaster, &*self.fee_estimator, &self.logger); + let update_res = monitor.update_monitor(update, &self.broadcaster, &*self.fee_estimator, &self.logger); if update_res.is_err() { log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor)); } // Even if updating the monitor returns an error, the monitor's state will // still be changed. So, persist the updated monitor despite the error. - let update_id = MonitorUpdateId::from_monitor_update(&update); + let update_id = MonitorUpdateId::from_monitor_update(update); let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id); + let persist_res = self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id); match persist_res { ChannelMonitorUpdateStatus::InProgress => { pending_monitor_updates.push(update_id); @@ -719,8 +754,8 @@ impl even for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } #[cfg(anchors)] @@ -742,8 +777,8 @@ impl even for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } } @@ -757,7 +792,7 @@ mod tests { use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err}; use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch}; use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS; - use crate::ln::channelmanager::{self, PaymentSendFailure}; + use crate::ln::channelmanager::{PaymentSendFailure, PaymentId}; use crate::ln::functional_test_utils::*; use crate::ln::msgs::ChannelMessageHandler; use crate::util::errors::APIError; @@ -772,7 +807,7 @@ mod tests { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features()); + create_announced_chan_between_nodes(&nodes, 0, 1); // Route two payments to be claimed at the same time. let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); @@ -780,6 +815,7 @@ mod tests { chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear(); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); @@ -788,8 +824,6 @@ mod tests { check_added_monitors!(nodes[1], 1); expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); - chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone(); assert_eq!(persistences.len(), 1); let (funding_txo, updates) = persistences.iter().next().unwrap(); @@ -798,7 +832,22 @@ mod tests { // Note that updates is a HashMap so the ordering here is actually random. This shouldn't // fail either way but if it fails intermittently it's depending on the ordering of updates. let mut update_iter = updates.iter(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); + let next_update = update_iter.next().unwrap().clone(); + // Should contain next_update when pending updates listed. + #[cfg(not(c_bindings))] + assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo) + .unwrap().contains(&next_update)); + #[cfg(c_bindings)] + assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter() + .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, next_update.clone()).unwrap(); + // Should not contain the previously pending next_update when pending updates listed. + #[cfg(not(c_bindings))] + assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo) + .unwrap().contains(&next_update)); + #[cfg(c_bindings)] + assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter() + .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty()); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); @@ -848,8 +897,7 @@ mod tests { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let channel = create_announced_chan_between_nodes( - &nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features()); + let channel = create_announced_chan_between_nodes(&nodes, 0, 1); // Get a route for later and rebalance the channel somewhat send_payment(&nodes[0], &[&nodes[1]], 10_000_000); @@ -883,7 +931,7 @@ mod tests { // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass // the update through to the ChannelMonitor which will refuse it (as the channel is closed). chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret)), + unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret), PaymentId(second_payment_hash.0)), true, APIError::ChannelUnavailable { ref err }, assert!(err.contains("ChannelMonitor storage failure"))); check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update @@ -925,7 +973,7 @@ mod tests { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features()); + create_announced_chan_between_nodes(&nodes, 0, 1); chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear(); chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);