X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=eb17b469a0365df2de4464ef313cc243441b6767;hb=47ad3d6bd87affc14281ac8dbf62d69b6c066072;hp=d858c12673daf1e24365bd20d79558f540a9ff86;hpb=8fb4a3ddc2b1b70ac7032a5904ad79114a77b8dc;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index d858c126..eb17b469 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -13,36 +13,31 @@ //! update [`ChannelMonitor`]s accordingly. If any on-chain events need further processing, it will //! make those available as [`MonitorEvent`]s to be consumed. //! -//! `ChainMonitor` is parameterized by an optional chain source, which must implement the +//! [`ChainMonitor`] is parameterized by an optional chain source, which must implement the //! [`chain::Filter`] trait. This provides a mechanism to signal new relevant outputs back to light //! clients, such that transactions spending those outputs are included in block data. //! -//! `ChainMonitor` may be used directly to monitor channels locally or as a part of a distributed -//! setup to monitor channels remotely. In the latter case, a custom `chain::Watch` implementation +//! [`ChainMonitor`] may be used directly to monitor channels locally or as a part of a distributed +//! setup to monitor channels remotely. In the latter case, a custom [`chain::Watch`] implementation //! would be responsible for routing each update to a remote server and for retrieving monitor -//! events. The remote server would make use of `ChainMonitor` for block processing and for -//! servicing `ChannelMonitor` updates from the client. -//! -//! [`ChainMonitor`]: struct.ChainMonitor.html -//! [`chain::Filter`]: ../trait.Filter.html -//! [`chain::Watch`]: ../trait.Watch.html -//! [`ChannelMonitor`]: ../channelmonitor/struct.ChannelMonitor.html -//! [`MonitorEvent`]: ../channelmonitor/enum.MonitorEvent.html +//! events. The remote server would make use of [`ChainMonitor`] for block processing and for +//! servicing [`ChannelMonitor`] updates from the client. -use bitcoin::blockdata::block::BlockHeader; +use bitcoin::blockdata::block::{Block, BlockHeader}; use chain; use chain::Filter; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, MonitorUpdateError}; +use chain::channelmonitor; +use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist}; use chain::transaction::{OutPoint, TransactionData}; -use chain::keysinterface::ChannelKeys; +use chain::keysinterface::Sign; use util::logger::Logger; use util::events; use util::events::Event; use std::collections::{HashMap, hash_map}; -use std::sync::Mutex; +use std::sync::RwLock; use std::ops::Deref; /// An implementation of [`chain::Watch`] for monitoring channels. @@ -52,70 +47,61 @@ use std::ops::Deref; /// or used independently to monitor channels remotely. See the [module-level documentation] for /// details. /// -/// [`chain::Watch`]: ../trait.Watch.html -/// [`ChannelManager`]: ../../ln/channelmanager/struct.ChannelManager.html -/// [module-level documentation]: index.html -pub struct ChainMonitor +/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager +/// [module-level documentation]: crate::chain::chainmonitor +pub struct ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, + P::Target: channelmonitor::Persist, { /// The monitors - pub monitors: Mutex>>, + pub monitors: RwLock>>, chain_source: Option, broadcaster: T, logger: L, - fee_estimator: F + fee_estimator: F, + persister: P, } -impl ChainMonitor - where C::Target: chain::Filter, - T::Target: BroadcasterInterface, - F::Target: FeeEstimator, - L::Target: Logger, +impl ChainMonitor +where C::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + P::Target: channelmonitor::Persist, { /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view /// of a channel and reacting accordingly based on transactions in the connected block. See /// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will /// be returned by [`chain::Watch::release_pending_monitor_events`]. /// - /// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch, returning - /// `true` if so. Subsequent calls must not exclude any transactions matching the new outputs - /// nor any in-block descendants of such transactions. It is not necessary to re-fetch the block - /// to obtain updated `txdata`. - /// - /// [`ChannelMonitor::block_connected`]: ../channelmonitor/struct.ChannelMonitor.html#method.block_connected - /// [`chain::Watch::release_pending_monitor_events`]: ../trait.Watch.html#tymethod.release_pending_monitor_events - /// [`chain::Filter`]: ../trait.Filter.html - pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) -> bool { - let mut has_new_outputs_to_watch = false; - { - let mut monitors = self.monitors.lock().unwrap(); - for monitor in monitors.values_mut() { - let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); - has_new_outputs_to_watch |= !txn_outputs.is_empty(); - - if let Some(ref chain_source) = self.chain_source { - for (txid, outputs) in txn_outputs.drain(..) { - for (idx, output) in outputs.iter().enumerate() { - chain_source.register_output(&OutPoint { txid, index: idx as u16 }, &output.script_pubkey); - } + /// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch. Subsequent + /// calls must not exclude any transactions matching the new outputs nor any in-block + /// descendants of such transactions. It is not necessary to re-fetch the block to obtain + /// updated `txdata`. + pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { + let monitors = self.monitors.read().unwrap(); + for monitor in monitors.values() { + let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + + if let Some(ref chain_source) = self.chain_source { + for (txid, outputs) in txn_outputs.drain(..) { + for (idx, output) in outputs.iter() { + chain_source.register_output(&OutPoint { txid, index: *idx as u16 }, &output.script_pubkey); } } } } - has_new_outputs_to_watch } /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view /// of a channel based on the disconnected block. See [`ChannelMonitor::block_disconnected`] for /// details. - /// - /// [`ChannelMonitor::block_disconnected`]: ../channelmonitor/struct.ChannelMonitor.html#method.block_disconnected pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) { - let mut monitors = self.monitors.lock().unwrap(); - for monitor in monitors.values_mut() { + let monitors = self.monitors.read().unwrap(); + for monitor in monitors.values() { monitor.block_disconnected(header, disconnected_height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); } } @@ -127,100 +113,134 @@ impl ChainMonit /// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may /// always need to fetch full blocks absent another means for determining which blocks contain /// transactions relevant to the watched channels. - /// - /// [`chain::Filter`]: ../trait.Filter.html - pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F) -> Self { + pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P) -> Self { Self { - monitors: Mutex::new(HashMap::new()), + monitors: RwLock::new(HashMap::new()), chain_source, broadcaster, logger, fee_estimator: feeest, + persister, } } +} + +impl +chain::Listen for ChainMonitor +where + ChannelSigner: Sign, + C::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + P::Target: channelmonitor::Persist, +{ + fn block_connected(&self, block: &Block, height: u32) { + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + ChainMonitor::block_connected(self, &block.header, &txdata, height); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + ChainMonitor::block_disconnected(self, header, height); + } +} +impl +chain::Watch for ChainMonitor +where C::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + P::Target: channelmonitor::Persist, +{ /// Adds the monitor that watches the channel referred to by the given outpoint. /// /// Calls back to [`chain::Filter`] with the funding transaction and outputs to watch. /// - /// [`chain::Filter`]: ../trait.Filter.html - fn add_monitor(&self, outpoint: OutPoint, monitor: ChannelMonitor) -> Result<(), MonitorUpdateError> { - let mut monitors = self.monitors.lock().unwrap(); - let entry = match monitors.entry(outpoint) { - hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given outpoint is already present")), + /// Note that we persist the given `ChannelMonitor` while holding the `ChainMonitor` + /// monitors lock. + fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { + let mut monitors = self.monitors.write().unwrap(); + let entry = match monitors.entry(funding_outpoint) { + hash_map::Entry::Occupied(_) => { + log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); + return Err(ChannelMonitorUpdateErr::PermanentFailure)}, hash_map::Entry::Vacant(e) => e, }; + if let Err(e) = self.persister.persist_new_channel(funding_outpoint, &monitor) { + log_error!(self.logger, "Failed to persist new channel data"); + return Err(e); + } { let funding_txo = monitor.get_funding_txo(); log_trace!(self.logger, "Got new Channel Monitor for channel {}", log_bytes!(funding_txo.0.to_channel_id()[..])); if let Some(ref chain_source) = self.chain_source { - chain_source.register_tx(&funding_txo.0.txid, &funding_txo.1); - for (txid, outputs) in monitor.get_outputs_to_watch().iter() { - for (idx, script_pubkey) in outputs.iter().enumerate() { - chain_source.register_output(&OutPoint { txid: *txid, index: idx as u16 }, &script_pubkey); - } - } + monitor.load_outputs_to_watch(chain_source); } } entry.insert(monitor); Ok(()) } - /// Updates the monitor that watches the channel referred to by the given outpoint. - fn update_monitor(&self, outpoint: OutPoint, update: ChannelMonitorUpdate) -> Result<(), MonitorUpdateError> { - let mut monitors = self.monitors.lock().unwrap(); - match monitors.get_mut(&outpoint) { - Some(orig_monitor) => { - log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(orig_monitor)); - orig_monitor.update_monitor(update, &self.broadcaster, &self.logger) - }, - None => Err(MonitorUpdateError("No such monitor registered")) - } - } -} - -impl chain::Watch for ChainMonitor - where C::Target: chain::Filter, - T::Target: BroadcasterInterface, - F::Target: FeeEstimator, - L::Target: Logger, -{ - type Keys = ChanSigner; - - fn watch_channel(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { - match self.add_monitor(funding_txo, monitor) { - Ok(_) => Ok(()), - Err(_) => Err(ChannelMonitorUpdateErr::PermanentFailure), - } - } - + /// Note that we persist the given `ChannelMonitor` update while holding the + /// `ChainMonitor` monitors lock. fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> { - match self.update_monitor(funding_txo, update) { - Ok(_) => Ok(()), - Err(_) => Err(ChannelMonitorUpdateErr::PermanentFailure), + // Update the monitor that watches the channel referred to by the given outpoint. + let monitors = self.monitors.read().unwrap(); + match monitors.get(&funding_txo) { + None => { + log_error!(self.logger, "Failed to update channel monitor: no such monitor registered"); + + // We should never ever trigger this from within ChannelManager. Technically a + // user could use this object with some proxying in between which makes this + // possible, but in tests and fuzzing, this should be a panic. + #[cfg(any(test, feature = "fuzztarget"))] + panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); + #[cfg(not(any(test, feature = "fuzztarget")))] + Err(ChannelMonitorUpdateErr::PermanentFailure) + }, + Some(monitor) => { + log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor)); + let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger); + if let Err(e) = &update_res { + log_error!(self.logger, "Failed to update channel monitor: {:?}", e); + } + // Even if updating the monitor returns an error, the monitor's state will + // still be changed. So, persist the updated monitor despite the error. + let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor); + if let Err(ref e) = persist_res { + log_error!(self.logger, "Failed to persist channel monitor update: {:?}", e); + } + if update_res.is_err() { + Err(ChannelMonitorUpdateErr::PermanentFailure) + } else { + persist_res + } + } } } fn release_pending_monitor_events(&self) -> Vec { let mut pending_monitor_events = Vec::new(); - for chan in self.monitors.lock().unwrap().values_mut() { - pending_monitor_events.append(&mut chan.get_and_clear_pending_monitor_events()); + for monitor in self.monitors.read().unwrap().values() { + pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events()); } pending_monitor_events } } -impl events::EventsProvider for ChainMonitor +impl events::EventsProvider for ChainMonitor where C::Target: chain::Filter, T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, + P::Target: channelmonitor::Persist, { fn get_and_clear_pending_events(&self) -> Vec { let mut pending_events = Vec::new(); - for chan in self.monitors.lock().unwrap().values_mut() { - pending_events.append(&mut chan.get_and_clear_pending_events()); + for monitor in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor.get_and_clear_pending_events()); } pending_events }