use bitcoin::hash_types::Txid;
use chain;
-use chain::{Filter, WatchedOutput};
+use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput};
use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use chain::channelmonitor;
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, Balance, MonitorEvent, Persist, TransactionOutputs};
+use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs};
use chain::transaction::{OutPoint, TransactionData};
use chain::keysinterface::Sign;
use util::logger::Logger;
use ln::channelmanager::ChannelDetails;
use prelude::*;
-use sync::RwLock;
+use sync::{RwLock, RwLockReadGuard};
use core::ops::Deref;
+/// `Persist` defines behavior for persisting channel monitors: this could mean
+/// writing once to disk, and/or uploading to one or more backup services.
+///
+/// Note that for every new monitor, you **must** persist the new `ChannelMonitor`
+/// to disk/backups. And, on every update, you **must** persist either the
+/// `ChannelMonitorUpdate` or the updated monitor itself. Otherwise, there is risk
+/// of situations such as revoking a transaction, then crashing before this
+/// revocation can be persisted, then unintentionally broadcasting a revoked
+/// transaction and losing money. This is a risk because previous channel states
+/// are toxic, so it's important that whatever channel state is persisted is
+/// kept up-to-date.
+pub trait Persist<ChannelSigner: Sign> {
+ /// Persist a new channel's data. The data can be stored any way you want, but
+ /// the identifier provided by Rust-Lightning is the channel's outpoint (and
+ /// it is up to you to maintain a correct mapping between the outpoint and the
+ /// stored channel data). Note that you **must** persist every new monitor to
+ /// disk. See the `Persist` trait documentation for more details.
+ ///
+ /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`
+ /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors.
+ ///
+ /// [`Writeable::write`]: crate::util::ser::Writeable::write
+ fn persist_new_channel(&self, id: OutPoint, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
+
+ /// Update one channel's data. The provided `ChannelMonitor` has already
+ /// applied the given update.
+ ///
+ /// Note that on every update, you **must** persist either the
+ /// `ChannelMonitorUpdate` or the updated monitor itself to disk/backups. See
+ /// the `Persist` trait documentation for more details.
+ ///
+ /// If an implementer chooses to persist the updates only, they need to make
+ /// sure that all the updates are applied to the `ChannelMonitors` *before*
+ /// the set of channel monitors is given to the `ChannelManager`
+ /// deserialization routine. See [`ChannelMonitor::update_monitor`] for
+ /// applying a monitor update to a monitor. If full `ChannelMonitors` are
+ /// persisted, then there is no need to persist individual updates.
+ ///
+ /// Note that there could be a performance tradeoff between persisting complete
+ /// channel monitors on every update vs. persisting only updates and applying
+ /// them in batches. The size of each monitor grows `O(number of state updates)`
+ /// whereas updates are small and `O(1)`.
+ ///
+ /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`,
+ /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and
+ /// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
+ ///
+ /// [`Writeable::write`]: crate::util::ser::Writeable::write
+ fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
+}
+
+struct MonitorHolder<ChannelSigner: Sign> {
+ monitor: ChannelMonitor<ChannelSigner>,
+}
+
+/// A read-only reference to a current ChannelMonitor.
+///
+/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is
+/// released.
+pub struct LockedChannelMonitor<'a, ChannelSigner: Sign> {
+ lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
+ funding_txo: OutPoint,
+}
+
+impl<ChannelSigner: Sign> Deref for LockedChannelMonitor<'_, ChannelSigner> {
+ type Target = ChannelMonitor<ChannelSigner>;
+ fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
+ &self.lock.get(&self.funding_txo).expect("Checked at construction").monitor
+ }
+}
+
/// An implementation of [`chain::Watch`] for monitoring channels.
///
/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
- P::Target: channelmonitor::Persist<ChannelSigner>,
+ P::Target: Persist<ChannelSigner>,
{
- /// The monitors
- pub monitors: RwLock<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
+ monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
chain_source: Option<C>,
broadcaster: T,
logger: L,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
- P::Target: channelmonitor::Persist<ChannelSigner>,
+ P::Target: Persist<ChannelSigner>,
{
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
/// of a channel and reacting accordingly based on transactions in the given chain data. See
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
{
let mut dependent_txdata = Vec::new();
- let monitors = self.monitors.read().unwrap();
- for monitor in monitors.values() {
- let mut txn_outputs = process(monitor, txdata);
+ let monitor_states = self.monitors.read().unwrap();
+ for monitor_state in monitor_states.values() {
+ let mut txn_outputs = process(&monitor_state.monitor, txdata);
// Register any new outputs with the chain source for filtering, storing any dependent
// transactions from within the block that previously had not been included in txdata.
/// inclusion in the return value.
pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
let mut ret = Vec::new();
- let monitors = self.monitors.read().unwrap();
- for (_, monitor) in monitors.iter().filter(|(funding_outpoint, _)| {
+ let monitor_states = self.monitors.read().unwrap();
+ for (_, monitor_state) in monitor_states.iter().filter(|(funding_outpoint, _)| {
for chan in ignored_channels {
if chan.funding_txo.as_ref() == Some(funding_outpoint) {
return false;
}
true
}) {
- ret.append(&mut monitor.get_claimable_balances());
+ ret.append(&mut monitor_state.monitor.get_claimable_balances());
}
ret
}
+ /// Gets the [`LockedChannelMonitor`] for a given funding outpoint, returning an `Err` if no
+ /// such [`ChannelMonitor`] is currently being monitored for.
+ ///
+ /// Note that the result holds a mutex over our monitor set, and should not be held
+ /// indefinitely.
+ pub fn get_monitor(&self, funding_txo: OutPoint) -> Result<LockedChannelMonitor<'_, ChannelSigner>, ()> {
+ let lock = self.monitors.read().unwrap();
+ if lock.get(&funding_txo).is_some() {
+ Ok(LockedChannelMonitor { lock, funding_txo })
+ } else {
+ Err(())
+ }
+ }
+
+ /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored.
+ ///
+ /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
+ /// monitoring for on-chain state resolutions.
+ pub fn list_monitors(&self) -> Vec<OutPoint> {
+ self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
+ }
+
+ #[cfg(test)]
+ pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor<ChannelSigner> {
+ self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
+ }
+
#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
use util::events::EventsProvider;
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
- P::Target: channelmonitor::Persist<ChannelSigner>,
+ P::Target: Persist<ChannelSigner>,
{
fn block_connected(&self, block: &Block, height: u32) {
let header = &block.header;
}
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
- let monitors = self.monitors.read().unwrap();
+ let monitor_states = self.monitors.read().unwrap();
log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
- for monitor in monitors.values() {
- monitor.block_disconnected(
+ for monitor_state in monitor_states.values() {
+ monitor_state.monitor.block_disconnected(
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
}
}
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
- P::Target: channelmonitor::Persist<ChannelSigner>,
+ P::Target: Persist<ChannelSigner>,
{
fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
fn transaction_unconfirmed(&self, txid: &Txid) {
log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
- let monitors = self.monitors.read().unwrap();
- for monitor in monitors.values() {
- monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+ let monitor_states = self.monitors.read().unwrap();
+ for monitor_state in monitor_states.values() {
+ monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
}
}
fn get_relevant_txids(&self) -> Vec<Txid> {
let mut txids = Vec::new();
- let monitors = self.monitors.read().unwrap();
- for monitor in monitors.values() {
- txids.append(&mut monitor.get_relevant_txids());
+ let monitor_states = self.monitors.read().unwrap();
+ for monitor_state in monitor_states.values() {
+ txids.append(&mut monitor_state.monitor.get_relevant_txids());
}
txids.sort_unstable();
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
- P::Target: channelmonitor::Persist<ChannelSigner>,
+ P::Target: Persist<ChannelSigner>,
{
/// Adds the monitor that watches the channel referred to by the given outpoint.
///
monitor.load_outputs_to_watch(chain_source);
}
}
- entry.insert(monitor);
+ entry.insert(MonitorHolder { monitor });
Ok(())
}
#[cfg(not(any(test, feature = "fuzztarget")))]
Err(ChannelMonitorUpdateErr::PermanentFailure)
},
- Some(monitor) => {
+ Some(monitor_state) => {
+ let monitor = &monitor_state.monitor;
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
if let Err(e) = &update_res {
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
let mut pending_monitor_events = Vec::new();
- for monitor in self.monitors.read().unwrap().values() {
- pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events());
+ for monitor_state in self.monitors.read().unwrap().values() {
+ pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
}
pending_monitor_events
}
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
- P::Target: channelmonitor::Persist<ChannelSigner>,
+ P::Target: Persist<ChannelSigner>,
{
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
///
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
let mut pending_events = Vec::new();
- for monitor in self.monitors.read().unwrap().values() {
- pending_events.append(&mut monitor.get_and_clear_pending_events());
+ for monitor_state in self.monitors.read().unwrap().values() {
+ pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
}
for event in pending_events.drain(..) {
handler.handle_event(&event);