X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;fp=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=f1ce0f79ae9cd99860283ec72c00df49430c87a3;hb=dda86a0cf2df1590f3f987b1f5df44792c7215c7;hp=0d3f87645ce4c4c1250d2d42a3a085f419ac13a6;hpb=da498d7974bb1e8de5ccec090a1819e63f882350;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 0d3f8764..f1ce0f79 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -27,10 +27,9 @@ use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::hash_types::Txid; use chain; -use chain::{Filter, WatchedOutput}; +use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput}; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use chain::channelmonitor; -use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, Balance, MonitorEvent, Persist, TransactionOutputs}; +use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs}; use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; use util::logger::Logger; @@ -39,9 +38,80 @@ use util::events::EventHandler; use ln::channelmanager::ChannelDetails; use prelude::*; -use sync::RwLock; +use sync::{RwLock, RwLockReadGuard}; use core::ops::Deref; +/// `Persist` defines behavior for persisting channel monitors: this could mean +/// writing once to disk, and/or uploading to one or more backup services. +/// +/// Note that for every new monitor, you **must** persist the new `ChannelMonitor` +/// to disk/backups. And, on every update, you **must** persist either the +/// `ChannelMonitorUpdate` or the updated monitor itself. Otherwise, there is risk +/// of situations such as revoking a transaction, then crashing before this +/// revocation can be persisted, then unintentionally broadcasting a revoked +/// transaction and losing money. This is a risk because previous channel states +/// are toxic, so it's important that whatever channel state is persisted is +/// kept up-to-date. +pub trait Persist { + /// Persist a new channel's data. The data can be stored any way you want, but + /// the identifier provided by Rust-Lightning is the channel's outpoint (and + /// it is up to you to maintain a correct mapping between the outpoint and the + /// stored channel data). Note that you **must** persist every new monitor to + /// disk. See the `Persist` trait documentation for more details. + /// + /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor` + /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors. + /// + /// [`Writeable::write`]: crate::util::ser::Writeable::write + fn persist_new_channel(&self, id: OutPoint, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + + /// Update one channel's data. The provided `ChannelMonitor` has already + /// applied the given update. + /// + /// Note that on every update, you **must** persist either the + /// `ChannelMonitorUpdate` or the updated monitor itself to disk/backups. See + /// the `Persist` trait documentation for more details. + /// + /// If an implementer chooses to persist the updates only, they need to make + /// sure that all the updates are applied to the `ChannelMonitors` *before* + /// the set of channel monitors is given to the `ChannelManager` + /// deserialization routine. See [`ChannelMonitor::update_monitor`] for + /// applying a monitor update to a monitor. If full `ChannelMonitors` are + /// persisted, then there is no need to persist individual updates. + /// + /// Note that there could be a performance tradeoff between persisting complete + /// channel monitors on every update vs. persisting only updates and applying + /// them in batches. The size of each monitor grows `O(number of state updates)` + /// whereas updates are small and `O(1)`. + /// + /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`, + /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and + /// [`ChannelMonitorUpdateErr`] for requirements when returning errors. + /// + /// [`Writeable::write`]: crate::util::ser::Writeable::write + fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; +} + +struct MonitorHolder { + monitor: ChannelMonitor, +} + +/// A read-only reference to a current ChannelMonitor. +/// +/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is +/// released. +pub struct LockedChannelMonitor<'a, ChannelSigner: Sign> { + lock: RwLockReadGuard<'a, HashMap>>, + funding_txo: OutPoint, +} + +impl Deref for LockedChannelMonitor<'_, ChannelSigner> { + type Target = ChannelMonitor; + fn deref(&self) -> &ChannelMonitor { + &self.lock.get(&self.funding_txo).expect("Checked at construction").monitor + } +} + /// An implementation of [`chain::Watch`] for monitoring channels. /// /// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by @@ -56,10 +126,9 @@ pub struct ChainMonitor, + P::Target: Persist, { - /// The monitors - pub monitors: RwLock>>, + monitors: RwLock>>, chain_source: Option, broadcaster: T, logger: L, @@ -72,7 +141,7 @@ where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - P::Target: channelmonitor::Persist, + P::Target: Persist, { /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view /// of a channel and reacting accordingly based on transactions in the given chain data. See @@ -88,9 +157,9 @@ where C::Target: chain::Filter, FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let mut dependent_txdata = Vec::new(); - let monitors = self.monitors.read().unwrap(); - for monitor in monitors.values() { - let mut txn_outputs = process(monitor, txdata); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + let mut txn_outputs = process(&monitor_state.monitor, txdata); // Register any new outputs with the chain source for filtering, storing any dependent // transactions from within the block that previously had not been included in txdata. @@ -152,8 +221,8 @@ where C::Target: chain::Filter, /// inclusion in the return value. pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec { let mut ret = Vec::new(); - let monitors = self.monitors.read().unwrap(); - for (_, monitor) in monitors.iter().filter(|(funding_outpoint, _)| { + let monitor_states = self.monitors.read().unwrap(); + for (_, monitor_state) in monitor_states.iter().filter(|(funding_outpoint, _)| { for chan in ignored_channels { if chan.funding_txo.as_ref() == Some(funding_outpoint) { return false; @@ -161,11 +230,38 @@ where C::Target: chain::Filter, } true }) { - ret.append(&mut monitor.get_claimable_balances()); + ret.append(&mut monitor_state.monitor.get_claimable_balances()); } ret } + /// Gets the [`LockedChannelMonitor`] for a given funding outpoint, returning an `Err` if no + /// such [`ChannelMonitor`] is currently being monitored for. + /// + /// Note that the result holds a mutex over our monitor set, and should not be held + /// indefinitely. + pub fn get_monitor(&self, funding_txo: OutPoint) -> Result, ()> { + let lock = self.monitors.read().unwrap(); + if lock.get(&funding_txo).is_some() { + Ok(LockedChannelMonitor { lock, funding_txo }) + } else { + Err(()) + } + } + + /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored. + /// + /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always + /// monitoring for on-chain state resolutions. + pub fn list_monitors(&self) -> Vec { + self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() + } + + #[cfg(test)] + pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor { + self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor + } + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { use util::events::EventsProvider; @@ -183,7 +279,7 @@ where T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - P::Target: channelmonitor::Persist, + P::Target: Persist, { fn block_connected(&self, block: &Block, height: u32) { let header = &block.header; @@ -196,10 +292,10 @@ where } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - let monitors = self.monitors.read().unwrap(); + let monitor_states = self.monitors.read().unwrap(); log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height); - for monitor in monitors.values() { - monitor.block_disconnected( + for monitor_state in monitor_states.values() { + monitor_state.monitor.block_disconnected( header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); } } @@ -212,7 +308,7 @@ where T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - P::Target: channelmonitor::Persist, + P::Target: Persist, { fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash()); @@ -224,9 +320,9 @@ where fn transaction_unconfirmed(&self, txid: &Txid) { log_debug!(self.logger, "Transaction {} reorganized out of chain", txid); - let monitors = self.monitors.read().unwrap(); - for monitor in monitors.values() { - monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger); } } @@ -243,9 +339,9 @@ where fn get_relevant_txids(&self) -> Vec { let mut txids = Vec::new(); - let monitors = self.monitors.read().unwrap(); - for monitor in monitors.values() { - txids.append(&mut monitor.get_relevant_txids()); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + txids.append(&mut monitor_state.monitor.get_relevant_txids()); } txids.sort_unstable(); @@ -260,7 +356,7 @@ where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - P::Target: channelmonitor::Persist, + P::Target: Persist, { /// Adds the monitor that watches the channel referred to by the given outpoint. /// @@ -276,9 +372,12 @@ where C::Target: chain::Filter, return Err(ChannelMonitorUpdateErr::PermanentFailure)}, hash_map::Entry::Vacant(e) => e, }; - if let Err(e) = self.persister.persist_new_channel(funding_outpoint, &monitor) { - log_error!(self.logger, "Failed to persist new channel data"); - return Err(e); + let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor); + if persist_res.is_err() { + log_error!(self.logger, "Failed to persist new channel data: {:?}", persist_res); + } + if persist_res == Err(ChannelMonitorUpdateErr::PermanentFailure) { + return persist_res; } { let funding_txo = monitor.get_funding_txo(); @@ -288,8 +387,8 @@ where C::Target: chain::Filter, monitor.load_outputs_to_watch(chain_source); } } - entry.insert(monitor); - Ok(()) + entry.insert(MonitorHolder { monitor }); + persist_res } /// Note that we persist the given `ChannelMonitor` update while holding the @@ -309,7 +408,8 @@ where C::Target: chain::Filter, #[cfg(not(any(test, feature = "fuzztarget")))] Err(ChannelMonitorUpdateErr::PermanentFailure) }, - Some(monitor) => { + Some(monitor_state) => { + let monitor = &monitor_state.monitor; log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor)); let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger); if let Err(e) = &update_res { @@ -332,8 +432,8 @@ where C::Target: chain::Filter, fn release_pending_monitor_events(&self) -> Vec { let mut pending_monitor_events = Vec::new(); - for monitor in self.monitors.read().unwrap().values() { - pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events()); + for monitor_state in self.monitors.read().unwrap().values() { + pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); } pending_monitor_events } @@ -344,7 +444,7 @@ impl even T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, - P::Target: channelmonitor::Persist, + P::Target: Persist, { /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. /// @@ -354,8 +454,8 @@ impl even /// [`SpendableOutputs`]: events::Event::SpendableOutputs fn process_pending_events(&self, handler: H) where H::Target: EventHandler { let mut pending_events = Vec::new(); - for monitor in self.monitors.read().unwrap().values() { - pending_events.append(&mut monitor.get_and_clear_pending_events()); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } for event in pending_events.drain(..) { handler.handle_event(&event);