X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=430f6bbac1d2244b7d49c37d10a51153484a5c28;hb=98417a16dff86c35aa5f51303b0f05177b828ccd;hp=503e6bdee0669551d1853932447a5db08fc92c17;hpb=b20aea1cb0fb9f19c646e96518ef9845e6593264;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 503e6bde..430f6bba 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -24,25 +24,26 @@ //! servicing [`ChannelMonitor`] updates from the client. use bitcoin::blockdata::block::BlockHeader; -use bitcoin::hash_types::Txid; - -use chain; -use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput}; -use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; -use chain::transaction::{OutPoint, TransactionData}; -use chain::keysinterface::Sign; -use util::atomic_counter::AtomicCounter; -use util::logger::Logger; -use util::errors::APIError; -use util::events; -use util::events::EventHandler; -use ln::channelmanager::ChannelDetails; - -use prelude::*; -use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; +use bitcoin::hash_types::{Txid, BlockHash}; + +use crate::chain; +use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; +use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; +use crate::chain::transaction::{OutPoint, TransactionData}; +use crate::chain::keysinterface::Sign; +use crate::util::atomic_counter::AtomicCounter; +use crate::util::logger::Logger; +use crate::util::errors::APIError; +use crate::util::events; +use crate::util::events::{Event, EventHandler}; +use crate::ln::channelmanager::ChannelDetails; + +use crate::prelude::*; +use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::ops::Deref; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use bitcoin::secp256k1::PublicKey; #[derive(Clone, Copy, Hash, PartialEq, Eq)] /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents @@ -77,20 +78,21 @@ impl MonitorUpdateId { /// /// Each method can return three possible values: /// * If persistence (including any relevant `fsync()` calls) happens immediately, the -/// implementation should return `Ok(())`, indicating normal channel operation should continue. +/// implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal +/// channel operation should continue. /// * If persistence happens asynchronously, implementations should first ensure the /// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return -/// `Err(ChannelMonitorUpdateErr::TemporaryFailure)` while the update continues in the -/// background. Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be -/// called with the corresponding [`MonitorUpdateId`]. +/// [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background. +/// Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with +/// the corresponding [`MonitorUpdateId`]. /// /// Note that unlike the direct [`chain::Watch`] interface, /// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. /// /// * If persistence fails for some reason, implementations should return -/// `Err(ChannelMonitorUpdateErr::PermanentFailure)`, in which case the channel will likely be +/// [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be /// closed without broadcasting the latest state. See -/// [`ChannelMonitorUpdateErr::PermanentFailure`] for more details. +/// [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details. pub trait Persist { /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup. @@ -100,14 +102,14 @@ pub trait Persist { /// and the stored channel data). Note that you **must** persist every new monitor to disk. /// /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], - /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`]. + /// if you return [`ChannelMonitorUpdateStatus::InProgress`]. /// /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor` - /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors. + /// and [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; + fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given /// update. @@ -135,14 +137,14 @@ pub trait Persist { /// whereas updates are small and `O(1)`. /// /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], - /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`]. + /// if you return [`ChannelMonitorUpdateStatus::InProgress`]. /// /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`, /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and - /// [`ChannelMonitorUpdateErr`] for requirements when returning errors. + /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; + fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; } struct MonitorHolder { @@ -150,9 +152,9 @@ struct MonitorHolder { /// The full set of pending monitor updates for this Channel. /// /// Note that this lock must be held during updates to prevent a race where we call - /// update_persisted_channel, the user returns a TemporaryFailure, and then calls - /// channel_monitor_updated immediately, racing our insertion of the pending update into the - /// contained Vec. + /// update_persisted_channel, the user returns a + /// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated + /// immediately, racing our insertion of the pending update into the contained Vec. /// /// Beyond the synchronization of updates themselves, we cannot handle user events until after /// any chain updates have been stored on disk. Thus, we scan this list when returning updates @@ -235,7 +237,7 @@ pub struct ChainMonitor)>>, + pending_monitor_events: Mutex, Option)>>, /// The best block height seen, used as a proxy for the passage of time. highest_chain_height: AtomicUsize, } @@ -262,82 +264,67 @@ where C::Target: chain::Filter, where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { - let mut dependent_txdata = Vec::new(); - { - let monitor_states = self.monitors.write().unwrap(); - if let Some(height) = best_height { - // If the best block height is being updated, update highest_chain_height under the - // monitors write lock. - let old_height = self.highest_chain_height.load(Ordering::Acquire); - let new_height = height as usize; - if new_height > old_height { - self.highest_chain_height.store(new_height, Ordering::Release); - } + let monitor_states = self.monitors.write().unwrap(); + if let Some(height) = best_height { + // If the best block height is being updated, update highest_chain_height under the + // monitors write lock. + let old_height = self.highest_chain_height.load(Ordering::Acquire); + let new_height = height as usize; + if new_height > old_height { + self.highest_chain_height.store(new_height, Ordering::Release); } + } - for (funding_outpoint, monitor_state) in monitor_states.iter() { - let monitor = &monitor_state.monitor; - let mut txn_outputs; - { - txn_outputs = process(monitor, txdata); - let update_id = MonitorUpdateId { - contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), - }; - let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - if let Some(height) = best_height { - if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { - // If there are not ChainSync persists awaiting completion, go ahead and - // set last_chain_persist_height here - we wouldn't want the first - // TemporaryFailure to always immediately be considered "overly delayed". - monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); - } + for (funding_outpoint, monitor_state) in monitor_states.iter() { + let monitor = &monitor_state.monitor; + let mut txn_outputs; + { + txn_outputs = process(monitor, txdata); + let update_id = MonitorUpdateId { + contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), + }; + let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + if let Some(height) = best_height { + if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { + // If there are not ChainSync persists awaiting completion, go ahead and + // set last_chain_persist_height here - we wouldn't want the first + // InProgress to always immediately be considered "overly delayed". + monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); } + } - log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); - match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) { - Ok(()) => - log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), - Err(ChannelMonitorUpdateErr::PermanentFailure) => { - monitor_state.channel_perm_failed.store(true, Ordering::Release); - self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)])); - }, - Err(ChannelMonitorUpdateErr::TemporaryFailure) => { - log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); - pending_monitor_updates.push(update_id); - }, - } + log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { + ChannelMonitorUpdateStatus::Completed => + log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)), + ChannelMonitorUpdateStatus::PermanentFailure => { + monitor_state.channel_perm_failed.store(true, Ordering::Release); + self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id())); + }, + ChannelMonitorUpdateStatus::InProgress => { + log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); + pending_monitor_updates.push(update_id); + }, } + } - // Register any new outputs with the chain source for filtering, storing any dependent - // transactions from within the block that previously had not been included in txdata. - if let Some(ref chain_source) = self.chain_source { - let block_hash = header.block_hash(); - for (txid, mut outputs) in txn_outputs.drain(..) { - for (idx, output) in outputs.drain(..) { - // Register any new outputs with the chain source for filtering and recurse - // if it indicates that there are dependent transactions within the block - // that had not been previously included in txdata. - let output = WatchedOutput { - block_hash: Some(block_hash), - outpoint: OutPoint { txid, index: idx as u16 }, - script_pubkey: output.script_pubkey, - }; - if let Some(tx) = chain_source.register_output(output) { - dependent_txdata.push(tx); - } - } + // Register any new outputs with the chain source for filtering, storing any dependent + // transactions from within the block that previously had not been included in txdata. + if let Some(ref chain_source) = self.chain_source { + let block_hash = header.block_hash(); + for (txid, mut outputs) in txn_outputs.drain(..) { + for (idx, output) in outputs.drain(..) { + // Register any new outputs with the chain source for filtering + let output = WatchedOutput { + block_hash: Some(block_hash), + outpoint: OutPoint { txid, index: idx as u16 }, + script_pubkey: output.script_pubkey, + }; + chain_source.register_output(output) } } } } - - // Recursively call for any dependent transactions that were identified by the chain source. - if !dependent_txdata.is_empty() { - dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index); - dependent_txdata.dedup_by_key(|(index, _tx)| *index); - let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect(); - self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around - } } /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels. @@ -408,18 +395,35 @@ where C::Target: chain::Filter, self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() } + #[cfg(not(c_bindings))] + /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). + pub fn list_pending_monitor_updates(&self) -> HashMap> { + self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) + }).collect() + } + + #[cfg(c_bindings)] + /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). + pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec)> { + self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { + (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) + }).collect() + } + + #[cfg(test)] pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor { self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor } /// Indicates the persistence of a [`ChannelMonitor`] has completed after - /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation. + /// [`ChannelMonitorUpdateStatus::InProgress`] was returned from an update operation. /// /// Thus, the anticipated use is, at a high level: /// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the /// update to disk and begins updating any remote (e.g. watchtower/backup) copies, - /// returning [`ChannelMonitorUpdateErr::TemporaryFailure`], + /// returning [`ChannelMonitorUpdateStatus::InProgress`], /// 2) once all remote copies are updated, you call this function with the /// `completed_update_id` that completed, and once all pending updates have completed the /// channel will be re-enabled. @@ -452,13 +456,13 @@ where C::Target: chain::Filter, if monitor_is_pending_updates || monitor_data.channel_perm_failed.load(Ordering::Acquire) { // If there are still monitor updates pending (or an old monitor update // finished after a later one perm-failed), we cannot yet construct an - // UpdateCompleted event. + // Completed event. return Ok(()); } - self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted { + self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed { funding_txo, monitor_update_id: monitor_data.monitor.get_latest_update_id(), - }])); + }], monitor_data.monitor.get_counterparty_node_id())); }, MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => { if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) { @@ -476,20 +480,40 @@ where C::Target: chain::Filter, /// channel_monitor_updated once with the highest ID. #[cfg(any(test, fuzzing))] pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) { - self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted { + let monitors = self.monitors.read().unwrap(); + let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id()); + self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed { funding_txo, monitor_update_id, - }])); + }], counterparty_node_id)); } #[cfg(any(test, fuzzing, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { - use util::events::EventsProvider; + use crate::util::events::EventsProvider; let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone()); + let event_handler = |event: events::Event| events.borrow_mut().push(event); self.process_pending_events(&event_handler); events.into_inner() } + + /// Processes any events asynchronously in the order they were generated since the last call + /// using the given event handler. + /// + /// See the trait-level documentation of [`EventsProvider`] for requirements. + /// + /// [`EventsProvider`]: crate::util::events::EventsProvider + pub async fn process_pending_events_async Future>( + &self, handler: H + ) { + let mut pending_events = Vec::new(); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); + } + for event in pending_events { + handler(event).await; + } + } } impl @@ -555,7 +579,7 @@ where }); } - fn get_relevant_txids(&self) -> Vec { + fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { let mut txids = Vec::new(); let monitor_states = self.monitors.read().unwrap(); for monitor_state in monitor_states.values() { @@ -582,27 +606,31 @@ where C::Target: chain::Filter, /// /// Note that we persist the given `ChannelMonitor` while holding the `ChainMonitor` /// monitors lock. - fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { + fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> ChannelMonitorUpdateStatus { let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(funding_outpoint) { hash_map::Entry::Occupied(_) => { log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); - return Err(ChannelMonitorUpdateErr::PermanentFailure)}, + return ChannelMonitorUpdateStatus::PermanentFailure + }, hash_map::Entry::Vacant(e) => e, }; log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor)); let update_id = MonitorUpdateId::from_new_monitor(&monitor); let mut pending_monitor_updates = Vec::new(); let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id); - if persist_res.is_err() { - log_error!(self.logger, "Failed to persist new ChannelMonitor for channel {}: {:?}", log_funding_info!(monitor), persist_res); - } else { - log_trace!(self.logger, "Finished persisting new ChannelMonitor for channel {}", log_funding_info!(monitor)); - } - if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) { - return persist_res; - } else if persist_res.is_err() { - pending_monitor_updates.push(update_id); + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); + pending_monitor_updates.push(update_id); + }, + ChannelMonitorUpdateStatus::PermanentFailure => { + log_error!(self.logger, "Persistence of new ChannelMonitor for channel {} failed", log_funding_info!(monitor)); + return persist_res; + }, + ChannelMonitorUpdateStatus::Completed => { + log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor)); + } } if let Some(ref chain_source) = self.chain_source { monitor.load_outputs_to_watch(chain_source); @@ -618,7 +646,7 @@ where C::Target: chain::Filter, /// Note that we persist the given `ChannelMonitor` update while holding the /// `ChainMonitor` monitors lock. - fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> { + fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus { // Update the monitor that watches the channel referred to by the given outpoint. let monitors = self.monitors.read().unwrap(); match monitors.get(&funding_txo) { @@ -631,34 +659,37 @@ where C::Target: chain::Filter, #[cfg(any(test, fuzzing))] panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); #[cfg(not(any(test, fuzzing)))] - Err(ChannelMonitorUpdateErr::PermanentFailure) + ChannelMonitorUpdateStatus::PermanentFailure }, Some(monitor_state) => { let monitor = &monitor_state.monitor; log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor)); - let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger); + let update_res = monitor.update_monitor(update, &self.broadcaster, &*self.fee_estimator, &self.logger); if update_res.is_err() { log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor)); } // Even if updating the monitor returns an error, the monitor's state will // still be changed. So, persist the updated monitor despite the error. - let update_id = MonitorUpdateId::from_monitor_update(&update); + let update_id = MonitorUpdateId::from_monitor_update(update); let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id); - if let Err(e) = persist_res { - if e == ChannelMonitorUpdateErr::TemporaryFailure { + let persist_res = self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id); + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { pending_monitor_updates.push(update_id); - } else { + log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor)); + }, + ChannelMonitorUpdateStatus::PermanentFailure => { monitor_state.channel_perm_failed.store(true, Ordering::Release); - } - log_error!(self.logger, "Failed to persist ChannelMonitor update for channel {}: {:?}", log_funding_info!(monitor), e); - } else { - log_trace!(self.logger, "Finished persisting ChannelMonitor update for channel {}", log_funding_info!(monitor)); + log_error!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} failed", log_funding_info!(monitor)); + }, + ChannelMonitorUpdateStatus::Completed => { + log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor)); + }, } if update_res.is_err() { - Err(ChannelMonitorUpdateErr::PermanentFailure) + ChannelMonitorUpdateStatus::PermanentFailure } else if monitor_state.channel_perm_failed.load(Ordering::Acquire) { - Err(ChannelMonitorUpdateErr::PermanentFailure) + ChannelMonitorUpdateStatus::PermanentFailure } else { persist_res } @@ -666,7 +697,7 @@ where C::Target: chain::Filter, } } - fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec)> { + fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec, Option)> { let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap()); @@ -695,7 +726,8 @@ where C::Target: chain::Filter, let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); if monitor_events.len() > 0 { let monitor_outpoint = monitor_state.monitor.get_funding_txo().0; - pending_monitor_events.push((monitor_outpoint, monitor_events)); + let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id(); + pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id)); } } } @@ -710,81 +742,61 @@ impl even L::Target: Logger, P::Target: Persist, { + #[cfg(not(anchors))] + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. + /// + /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in + /// order to handle these events. + /// + /// [`SpendableOutputs`]: events::Event::SpendableOutputs + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + let mut pending_events = Vec::new(); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); + } + for event in pending_events { + handler.handle_event(event); + } + } + #[cfg(anchors)] /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// + /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`] + /// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain + /// within each channel. As the confirmation of a commitment transaction may be critical to the + /// safety of funds, this method must be invoked frequently, ideally once for every chain tip + /// update (block connected or disconnected). + /// /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in /// order to handle these events. /// /// [`SpendableOutputs`]: events::Event::SpendableOutputs + /// [`BumpTransaction`]: events::Event::BumpTransaction fn process_pending_events(&self, handler: H) where H::Target: EventHandler { let mut pending_events = Vec::new(); for monitor_state in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - for event in pending_events.drain(..) { - handler.handle_event(&event); + for event in pending_events { + handler.handle_event(event); } } } #[cfg(test)] mod tests { - use bitcoin::BlockHeader; - use ::{check_added_monitors, check_closed_broadcast, check_closed_event}; - use ::{expect_payment_sent, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg}; - use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err}; - use chain::{ChannelMonitorUpdateErr, Confirm, Watch}; - use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS; - use ln::channelmanager::PaymentSendFailure; - use ln::features::InitFeatures; - use ln::functional_test_utils::*; - use ln::msgs::ChannelMessageHandler; - use util::errors::APIError; - use util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider}; - use util::test_utils::{OnRegisterOutput, TxOutReference}; - - /// Tests that in-block dependent transactions are processed by `block_connected` when not - /// included in `txdata` but returned by [`chain::Filter::register_output`]. For instance, - /// a (non-anchor) commitment transaction's HTLC output may be spent in the same block as the - /// commitment transaction itself. An Electrum client may filter the commitment transaction but - /// needs to return the HTLC transaction so it can be processed. - #[test] - fn connect_block_checks_dependent_transactions() { - let chanmon_cfgs = create_chanmon_cfgs(2); - let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); - let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); - let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let channel = create_announced_chan_between_nodes( - &nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); - - // Send a payment, saving nodes[0]'s revoked commitment and HTLC-Timeout transactions. - let (commitment_tx, htlc_tx) = { - let payment_preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 5_000_000).0; - let mut txn = get_local_commitment_txn!(nodes[0], channel.2); - claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage); - - assert_eq!(txn.len(), 2); - (txn.remove(0), txn.remove(0)) - }; - - // Set expectations on nodes[1]'s chain source to return dependent transactions. - let htlc_output = TxOutReference(commitment_tx.clone(), 0); - let to_local_output = TxOutReference(commitment_tx.clone(), 1); - let htlc_timeout_output = TxOutReference(htlc_tx.clone(), 0); - nodes[1].chain_source - .expect(OnRegisterOutput { with: htlc_output, returns: Some((1, htlc_tx)) }) - .expect(OnRegisterOutput { with: to_local_output, returns: None }) - .expect(OnRegisterOutput { with: htlc_timeout_output, returns: None }); - - // Notify nodes[1] that nodes[0]'s revoked commitment transaction was mined. The chain - // source should return the dependent HTLC transaction when the HTLC output is registered. - mine_transaction(&nodes[1], &commitment_tx); - - // Clean up so uninteresting assertions don't fail. - check_added_monitors!(nodes[1], 1); - nodes[1].node.get_and_clear_pending_msg_events(); - nodes[1].node.get_and_clear_pending_events(); - } + use bitcoin::{BlockHeader, TxMerkleNode}; + use bitcoin::hashes::Hash; + use crate::{check_added_monitors, check_closed_broadcast, check_closed_event}; + use crate::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg}; + use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err}; + use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch}; + use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS; + use crate::ln::channelmanager::{PaymentSendFailure, PaymentId}; + use crate::ln::functional_test_utils::*; + use crate::ln::msgs::ChannelMessageHandler; + use crate::util::errors::APIError; + use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider}; #[test] fn test_async_ooo_offchain_updates() { @@ -795,21 +807,22 @@ mod tests { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); + create_announced_chan_between_nodes(&nodes, 0, 1); // Route two payments to be claimed at the same time. - let payment_preimage_1 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0; - let payment_preimage_2 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0; + let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); + let (payment_preimage_2, payment_hash_2, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear(); - chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure)); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors!(nodes[1], 1); + expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); nodes[1].node.claim_funds(payment_preimage_2); check_added_monitors!(nodes[1], 1); - - chanmon_cfgs[1].persister.set_update_ret(Ok(())); + expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000); let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone(); assert_eq!(persistences.len(), 1); @@ -819,7 +832,22 @@ mod tests { // Note that updates is a HashMap so the ordering here is actually random. This shouldn't // fail either way but if it fails intermittently it's depending on the ordering of updates. let mut update_iter = updates.iter(); - nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); + let next_update = update_iter.next().unwrap().clone(); + // Should contain next_update when pending updates listed. + #[cfg(not(c_bindings))] + assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo) + .unwrap().contains(&next_update)); + #[cfg(c_bindings)] + assert!(nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter() + .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); + nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, next_update.clone()).unwrap(); + // Should not contain the previously pending next_update when pending updates listed. + #[cfg(not(c_bindings))] + assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().get(funding_txo) + .unwrap().contains(&next_update)); + #[cfg(c_bindings)] + assert!(!nodes[1].chain_monitor.chain_monitor.list_pending_monitor_updates().iter() + .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update)); assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty()); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap(); @@ -869,16 +897,16 @@ mod tests { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let channel = create_announced_chan_between_nodes( - &nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); + let channel = create_announced_chan_between_nodes(&nodes, 0, 1); // Get a route for later and rebalance the channel somewhat send_payment(&nodes[0], &[&nodes[1]], 10_000_000); let (route, second_payment_hash, _, second_payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000); // First route a payment that we will claim on chain and give the recipient the preimage. - let payment_preimage = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0; + let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); nodes[1].node.claim_funds(payment_preimage); + expect_payment_claimed!(nodes[1], payment_hash, 1_000_000); nodes[1].node.get_and_clear_pending_msg_events(); check_added_monitors!(nodes[1], 1); let remote_txn = get_local_commitment_txn!(nodes[1], channel.2); @@ -886,14 +914,14 @@ mod tests { // Temp-fail the block connection which will hold the channel-closed event chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear(); - chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure)); + chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); // Connect B's commitment transaction, but only to the ChainMonitor/ChannelMonitor. The // channel is now closed, but the ChannelManager doesn't know that yet. let new_header = BlockHeader { version: 2, time: 0, bits: 0, nonce: 0, prev_blockhash: nodes[0].best_block_info().0, - merkle_root: Default::default() }; + merkle_root: TxMerkleNode::all_zeros() }; nodes[0].chain_monitor.chain_monitor.transactions_confirmed(&new_header, &[(0, &remote_txn[0]), (1, &remote_txn[1])], nodes[0].best_block_info().1 + 1); assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty()); @@ -902,8 +930,8 @@ mod tests { // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass // the update through to the ChannelMonitor which will refuse it (as the channel is closed). - chanmon_cfgs[0].persister.set_update_ret(Ok(())); - unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret)), + chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); + unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret), PaymentId(second_payment_hash.0)), true, APIError::ChannelUnavailable { ref err }, assert!(err.contains("ChannelMonitor storage failure"))); check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update @@ -919,7 +947,7 @@ mod tests { let latest_header = BlockHeader { version: 2, time: 0, bits: 0, nonce: 0, prev_blockhash: nodes[0].best_block_info().0, - merkle_root: Default::default() }; + merkle_root: TxMerkleNode::all_zeros() }; nodes[0].chain_monitor.chain_monitor.best_block_updated(&latest_header, nodes[0].best_block_info().1 + LATENCY_GRACE_PERIOD_BLOCKS); } else { let persistences = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clone(); @@ -945,10 +973,10 @@ mod tests { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); + create_announced_chan_between_nodes(&nodes, 0, 1); chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear(); - chanmon_cfgs[0].persister.set_update_ret(Err(ChannelMonitorUpdateErr::PermanentFailure)); + chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure); connect_blocks(&nodes[0], 1); // Before processing events, the ChannelManager will still think the Channel is open and