X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=f1ce0f79ae9cd99860283ec72c00df49430c87a3;hb=1b6a7c1314a3840f57af4457977a5c98a864edd1;hp=0cf2d56a14eb5a79bb489b7fb7c27e80c7305597;hpb=6b707c6f23dfa84bbb362bcdd37ba53c71ec4f53;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 0cf2d56a..f1ce0f79 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -13,38 +13,104 @@ //! update [`ChannelMonitor`]s accordingly. If any on-chain events need further processing, it will //! make those available as [`MonitorEvent`]s to be consumed. //! -//! `ChainMonitor` is parameterized by an optional chain source, which must implement the +//! [`ChainMonitor`] is parameterized by an optional chain source, which must implement the //! [`chain::Filter`] trait. This provides a mechanism to signal new relevant outputs back to light //! clients, such that transactions spending those outputs are included in block data. //! -//! `ChainMonitor` may be used directly to monitor channels locally or as a part of a distributed -//! setup to monitor channels remotely. In the latter case, a custom `chain::Watch` implementation +//! [`ChainMonitor`] may be used directly to monitor channels locally or as a part of a distributed +//! setup to monitor channels remotely. In the latter case, a custom [`chain::Watch`] implementation //! would be responsible for routing each update to a remote server and for retrieving monitor -//! events. The remote server would make use of `ChainMonitor` for block processing and for -//! servicing `ChannelMonitor` updates from the client. -//! -//! [`ChainMonitor`]: struct.ChainMonitor.html -//! [`chain::Filter`]: ../trait.Filter.html -//! [`chain::Watch`]: ../trait.Watch.html -//! [`ChannelMonitor`]: ../channelmonitor/struct.ChannelMonitor.html -//! [`MonitorEvent`]: ../channelmonitor/enum.MonitorEvent.html +//! events. The remote server would make use of [`ChainMonitor`] for block processing and for +//! servicing [`ChannelMonitor`] updates from the client. use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::hash_types::Txid; use chain; -use chain::Filter; +use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput}; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use chain::channelmonitor; -use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist}; +use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs}; use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; use util::logger::Logger; use util::events; -use util::events::Event; +use util::events::EventHandler; +use ln::channelmanager::ChannelDetails; + +use prelude::*; +use sync::{RwLock, RwLockReadGuard}; +use core::ops::Deref; + +/// `Persist` defines behavior for persisting channel monitors: this could mean +/// writing once to disk, and/or uploading to one or more backup services. +/// +/// Note that for every new monitor, you **must** persist the new `ChannelMonitor` +/// to disk/backups. And, on every update, you **must** persist either the +/// `ChannelMonitorUpdate` or the updated monitor itself. Otherwise, there is risk +/// of situations such as revoking a transaction, then crashing before this +/// revocation can be persisted, then unintentionally broadcasting a revoked +/// transaction and losing money. This is a risk because previous channel states +/// are toxic, so it's important that whatever channel state is persisted is +/// kept up-to-date. +pub trait Persist { + /// Persist a new channel's data. The data can be stored any way you want, but + /// the identifier provided by Rust-Lightning is the channel's outpoint (and + /// it is up to you to maintain a correct mapping between the outpoint and the + /// stored channel data). Note that you **must** persist every new monitor to + /// disk. See the `Persist` trait documentation for more details. + /// + /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor` + /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors. + /// + /// [`Writeable::write`]: crate::util::ser::Writeable::write + fn persist_new_channel(&self, id: OutPoint, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + + /// Update one channel's data. The provided `ChannelMonitor` has already + /// applied the given update. + /// + /// Note that on every update, you **must** persist either the + /// `ChannelMonitorUpdate` or the updated monitor itself to disk/backups. See + /// the `Persist` trait documentation for more details. + /// + /// If an implementer chooses to persist the updates only, they need to make + /// sure that all the updates are applied to the `ChannelMonitors` *before* + /// the set of channel monitors is given to the `ChannelManager` + /// deserialization routine. See [`ChannelMonitor::update_monitor`] for + /// applying a monitor update to a monitor. If full `ChannelMonitors` are + /// persisted, then there is no need to persist individual updates. + /// + /// Note that there could be a performance tradeoff between persisting complete + /// channel monitors on every update vs. persisting only updates and applying + /// them in batches. The size of each monitor grows `O(number of state updates)` + /// whereas updates are small and `O(1)`. + /// + /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`, + /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and + /// [`ChannelMonitorUpdateErr`] for requirements when returning errors. + /// + /// [`Writeable::write`]: crate::util::ser::Writeable::write + fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; +} + +struct MonitorHolder { + monitor: ChannelMonitor, +} -use std::collections::{HashMap, hash_map}; -use std::sync::RwLock; -use std::ops::Deref; +/// A read-only reference to a current ChannelMonitor. +/// +/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is +/// released. +pub struct LockedChannelMonitor<'a, ChannelSigner: Sign> { + lock: RwLockReadGuard<'a, HashMap>>, + funding_txo: OutPoint, +} + +impl Deref for LockedChannelMonitor<'_, ChannelSigner> { + type Target = ChannelMonitor; + fn deref(&self) -> &ChannelMonitor { + &self.lock.get(&self.funding_txo).expect("Checked at construction").monitor + } +} /// An implementation of [`chain::Watch`] for monitoring channels. /// @@ -53,18 +119,16 @@ use std::ops::Deref; /// or used independently to monitor channels remotely. See the [module-level documentation] for /// details. /// -/// [`chain::Watch`]: ../trait.Watch.html -/// [`ChannelManager`]: ../../ln/channelmanager/struct.ChannelManager.html -/// [module-level documentation]: index.html +/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager +/// [module-level documentation]: crate::chain::chainmonitor pub struct ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - P::Target: channelmonitor::Persist, + P::Target: Persist, { - /// The monitors - pub monitors: RwLock>>, + monitors: RwLock>>, chain_source: Option, broadcaster: T, logger: L, @@ -77,10 +141,10 @@ where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - P::Target: channelmonitor::Persist, + P::Target: Persist, { /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view - /// of a channel and reacting accordingly based on transactions in the connected block. See + /// of a channel and reacting accordingly based on transactions in the given chain data. See /// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will /// be returned by [`chain::Watch::release_pending_monitor_events`]. /// @@ -88,34 +152,43 @@ where C::Target: chain::Filter, /// calls must not exclude any transactions matching the new outputs nor any in-block /// descendants of such transactions. It is not necessary to re-fetch the block to obtain /// updated `txdata`. - /// - /// [`ChannelMonitor::block_connected`]: ../channelmonitor/struct.ChannelMonitor.html#method.block_connected - /// [`chain::Watch::release_pending_monitor_events`]: ../trait.Watch.html#tymethod.release_pending_monitor_events - /// [`chain::Filter`]: ../trait.Filter.html - pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { - let monitors = self.monitors.read().unwrap(); - for monitor in monitors.values() { - let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + fn process_chain_data(&self, header: &BlockHeader, txdata: &TransactionData, process: FN) + where + FN: Fn(&ChannelMonitor, &TransactionData) -> Vec + { + let mut dependent_txdata = Vec::new(); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + let mut txn_outputs = process(&monitor_state.monitor, txdata); + // Register any new outputs with the chain source for filtering, storing any dependent + // transactions from within the block that previously had not been included in txdata. if let Some(ref chain_source) = self.chain_source { - for (txid, outputs) in txn_outputs.drain(..) { - for (idx, output) in outputs.iter() { - chain_source.register_output(&OutPoint { txid, index: *idx as u16 }, &output.script_pubkey); + let block_hash = header.block_hash(); + for (txid, mut outputs) in txn_outputs.drain(..) { + for (idx, output) in outputs.drain(..) { + // Register any new outputs with the chain source for filtering and recurse + // if it indicates that there are dependent transactions within the block + // that had not been previously included in txdata. + let output = WatchedOutput { + block_hash: Some(block_hash), + outpoint: OutPoint { txid, index: idx as u16 }, + script_pubkey: output.script_pubkey, + }; + if let Some(tx) = chain_source.register_output(output) { + dependent_txdata.push(tx); + } } } } } - } - /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view - /// of a channel based on the disconnected block. See [`ChannelMonitor::block_disconnected`] for - /// details. - /// - /// [`ChannelMonitor::block_disconnected`]: ../channelmonitor/struct.ChannelMonitor.html#method.block_disconnected - pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) { - let monitors = self.monitors.read().unwrap(); - for monitor in monitors.values() { - monitor.block_disconnected(header, disconnected_height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + // Recursively call for any dependent transactions that were identified by the chain source. + if !dependent_txdata.is_empty() { + dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index); + dependent_txdata.dedup_by_key(|(index, _tx)| *index); + let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect(); + self.process_chain_data(header, &txdata, process); } } @@ -126,8 +199,6 @@ where C::Target: chain::Filter, /// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may /// always need to fetch full blocks absent another means for determining which blocks contain /// transactions relevant to the watched channels. - /// - /// [`chain::Filter`]: ../trait.Filter.html pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P) -> Self { Self { monitors: RwLock::new(HashMap::new()), @@ -138,35 +209,154 @@ where C::Target: chain::Filter, persister, } } + + /// Gets the balances in the contained [`ChannelMonitor`]s which are claimable on-chain or + /// claims which are awaiting confirmation. + /// + /// Includes the balances from each [`ChannelMonitor`] *except* those included in + /// `ignored_channels`, allowing you to filter out balances from channels which are still open + /// (and whose balance should likely be pulled from the [`ChannelDetails`]). + /// + /// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for + /// inclusion in the return value. + pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec { + let mut ret = Vec::new(); + let monitor_states = self.monitors.read().unwrap(); + for (_, monitor_state) in monitor_states.iter().filter(|(funding_outpoint, _)| { + for chan in ignored_channels { + if chan.funding_txo.as_ref() == Some(funding_outpoint) { + return false; + } + } + true + }) { + ret.append(&mut monitor_state.monitor.get_claimable_balances()); + } + ret + } + + /// Gets the [`LockedChannelMonitor`] for a given funding outpoint, returning an `Err` if no + /// such [`ChannelMonitor`] is currently being monitored for. + /// + /// Note that the result holds a mutex over our monitor set, and should not be held + /// indefinitely. + pub fn get_monitor(&self, funding_txo: OutPoint) -> Result, ()> { + let lock = self.monitors.read().unwrap(); + if lock.get(&funding_txo).is_some() { + Ok(LockedChannelMonitor { lock, funding_txo }) + } else { + Err(()) + } + } + + /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored. + /// + /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always + /// monitoring for on-chain state resolutions. + pub fn list_monitors(&self) -> Vec { + self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() + } + + #[cfg(test)] + pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor { + self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor + } + + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] + pub fn get_and_clear_pending_events(&self) -> Vec { + use util::events::EventsProvider; + let events = core::cell::RefCell::new(Vec::new()); + let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone()); + self.process_pending_events(&event_handler); + events.into_inner() + } } -impl +impl chain::Listen for ChainMonitor where - ChannelSigner: Sign, C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - P::Target: channelmonitor::Persist, + P::Target: Persist, { fn block_connected(&self, block: &Block, height: u32) { + let header = &block.header; let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); - ChainMonitor::block_connected(self, &block.header, &txdata, height); + log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height); + self.process_chain_data(header, &txdata, |monitor, txdata| { + monitor.block_connected( + header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + }); } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - ChainMonitor::block_disconnected(self, header, height); + let monitor_states = self.monitors.read().unwrap(); + log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height); + for monitor_state in monitor_states.values() { + monitor_state.monitor.block_disconnected( + header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + } } } -impl +impl +chain::Confirm for ChainMonitor +where + C::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + P::Target: Persist, +{ + fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { + log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash()); + self.process_chain_data(header, txdata, |monitor, txdata| { + monitor.transactions_confirmed( + header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + }); + } + + fn transaction_unconfirmed(&self, txid: &Txid) { + log_debug!(self.logger, "Transaction {} reorganized out of chain", txid); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + } + } + + fn best_block_updated(&self, header: &BlockHeader, height: u32) { + log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height); + self.process_chain_data(header, &[], |monitor, txdata| { + // While in practice there shouldn't be any recursive calls when given empty txdata, + // it's still possible if a chain::Filter implementation returns a transaction. + debug_assert!(txdata.is_empty()); + monitor.best_block_updated( + header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + }); + } + + fn get_relevant_txids(&self) -> Vec { + let mut txids = Vec::new(); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + txids.append(&mut monitor_state.monitor.get_relevant_txids()); + } + + txids.sort_unstable(); + txids.dedup(); + txids + } +} + +impl chain::Watch for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - P::Target: channelmonitor::Persist, + P::Target: Persist, { /// Adds the monitor that watches the channel referred to by the given outpoint. /// @@ -174,8 +364,6 @@ where C::Target: chain::Filter, /// /// Note that we persist the given `ChannelMonitor` while holding the `ChainMonitor` /// monitors lock. - /// - /// [`chain::Filter`]: ../trait.Filter.html fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(funding_outpoint) { @@ -184,9 +372,12 @@ where C::Target: chain::Filter, return Err(ChannelMonitorUpdateErr::PermanentFailure)}, hash_map::Entry::Vacant(e) => e, }; - if let Err(e) = self.persister.persist_new_channel(funding_outpoint, &monitor) { - log_error!(self.logger, "Failed to persist new channel data"); - return Err(e); + let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor); + if persist_res.is_err() { + log_error!(self.logger, "Failed to persist new channel data: {:?}", persist_res); + } + if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) { + return persist_res; } { let funding_txo = monitor.get_funding_txo(); @@ -196,8 +387,8 @@ where C::Target: chain::Filter, monitor.load_outputs_to_watch(chain_source); } } - entry.insert(monitor); - Ok(()) + entry.insert(MonitorHolder { monitor }); + persist_res } /// Note that we persist the given `ChannelMonitor` update while holding the @@ -217,7 +408,8 @@ where C::Target: chain::Filter, #[cfg(not(any(test, feature = "fuzztarget")))] Err(ChannelMonitorUpdateErr::PermanentFailure) }, - Some(monitor) => { + Some(monitor_state) => { + let monitor = &monitor_state.monitor; log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor)); let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger); if let Err(e) = &update_res { @@ -240,8 +432,8 @@ where C::Target: chain::Filter, fn release_pending_monitor_events(&self) -> Vec { let mut pending_monitor_events = Vec::new(); - for monitor in self.monitors.read().unwrap().values() { - pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events()); + for monitor_state in self.monitors.read().unwrap().values() { + pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); } pending_monitor_events } @@ -252,13 +444,73 @@ impl even T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - P::Target: channelmonitor::Persist, + P::Target: Persist, { - fn get_and_clear_pending_events(&self) -> Vec { + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. + /// + /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in + /// order to handle these events. + /// + /// [`SpendableOutputs`]: events::Event::SpendableOutputs + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { let mut pending_events = Vec::new(); - for monitor in self.monitors.read().unwrap().values() { - pending_events.append(&mut monitor.get_and_clear_pending_events()); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } - pending_events + for event in pending_events.drain(..) { + handler.handle_event(&event); + } + } +} + +#[cfg(test)] +mod tests { + use ::{check_added_monitors, get_local_commitment_txn}; + use ln::features::InitFeatures; + use ln::functional_test_utils::*; + use util::events::MessageSendEventsProvider; + use util::test_utils::{OnRegisterOutput, TxOutReference}; + + /// Tests that in-block dependent transactions are processed by `block_connected` when not + /// included in `txdata` but returned by [`chain::Filter::register_output`]. For instance, + /// a (non-anchor) commitment transaction's HTLC output may be spent in the same block as the + /// commitment transaction itself. An Electrum client may filter the commitment transaction but + /// needs to return the HTLC transaction so it can be processed. + #[test] + fn connect_block_checks_dependent_transactions() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let channel = create_announced_chan_between_nodes( + &nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); + + // Send a payment, saving nodes[0]'s revoked commitment and HTLC-Timeout transactions. + let (commitment_tx, htlc_tx) = { + let payment_preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 5_000_000).0; + let mut txn = get_local_commitment_txn!(nodes[0], channel.2); + claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage); + + assert_eq!(txn.len(), 2); + (txn.remove(0), txn.remove(0)) + }; + + // Set expectations on nodes[1]'s chain source to return dependent transactions. + let htlc_output = TxOutReference(commitment_tx.clone(), 0); + let to_local_output = TxOutReference(commitment_tx.clone(), 1); + let htlc_timeout_output = TxOutReference(htlc_tx.clone(), 0); + nodes[1].chain_source + .expect(OnRegisterOutput { with: htlc_output, returns: Some((1, htlc_tx)) }) + .expect(OnRegisterOutput { with: to_local_output, returns: None }) + .expect(OnRegisterOutput { with: htlc_timeout_output, returns: None }); + + // Notify nodes[1] that nodes[0]'s revoked commitment transaction was mined. The chain + // source should return the dependent HTLC transaction when the HTLC output is registered. + mine_transaction(&nodes[1], &commitment_tx); + + // Clean up so uninteresting assertions don't fail. + check_added_monitors!(nodes[1], 1); + nodes[1].node.get_and_clear_pending_msg_events(); + nodes[1].node.get_and_clear_pending_events(); } }