From bd39b20f642e042981e4fdd5f3600a357be51931 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 8 Jul 2020 23:37:13 -0700 Subject: [PATCH] Replace use of ChainWatchInterface with WatchEvent SimpleManyChannelMonitor is parameterized by ChainWatchInterface to signal what transactions and outputs to watch for on chain. The interface has grown to cover chain access (via get_chain_utxo) and block block filtering (via filter_block and reentered), which has added complexity for implementations and user (see ChainWatchInterfaceUtil). Pull the watch functionality out as a first step to eliminating ChainWatchInterface entirely. --- lightning/src/chain/mod.rs | 35 ++++++ lightning/src/ln/channelmonitor.rs | 135 +++++++++++++++------- lightning/src/ln/functional_test_utils.rs | 23 +++- lightning/src/ln/functional_tests.rs | 15 +-- 4 files changed, 158 insertions(+), 50 deletions(-) diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 9968bc6c..bbdca213 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -9,6 +9,41 @@ //! Structs and traits which allow other parts of rust-lightning to interact with the blockchain. +use bitcoin::blockdata::script::Script; +use bitcoin::hash_types::Txid; + +use chain::transaction::OutPoint; + pub mod chaininterface; pub mod transaction; pub mod keysinterface; + +/// An interface for providing [`WatchEvent`]s. +/// +/// [`WatchEvent`]: enum.WatchEvent.html +pub trait WatchEventProvider { + /// Releases events produced since the last call. Subsequent calls must only return new events. + fn release_pending_watch_events(&self) -> Vec; +} + +/// An event indicating on-chain activity to watch for pertaining to a channel. +pub enum WatchEvent { + /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending + /// condition. + WatchTransaction { + /// Identifier of the transaction. + txid: Txid, + + /// Spending condition for an output of the transaction. + script_pubkey: Script, + }, + /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as + /// the spending condition. + WatchOutput { + /// Identifier for the output. + outpoint: OutPoint, + + /// Spending condition for the output. + script_pubkey: Script, + } +} diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index 9fb24027..fd724c1c 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -40,7 +40,8 @@ use ln::chan_utils; use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HolderCommitmentTransaction, HTLCType}; use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash}; use ln::onchaintx::{OnchainTxHandler, InputDescriptors}; -use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface, FeeEstimator}; +use chain; +use chain::chaininterface::{ChainListener, ChainWatchInterface, ChainWatchedUtil, BroadcasterInterface, FeeEstimator}; use chain::transaction::OutPoint; use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys}; use util::logger::Logger; @@ -48,7 +49,7 @@ use util::ser::{Readable, MaybeReadable, Writer, Writeable, U48}; use util::{byte_utils, events}; use util::events::Event; -use std::collections::{HashMap, hash_map}; +use std::collections::{HashMap, HashSet, hash_map}; use std::sync::Mutex; use std::{hash,cmp, mem}; use std::ops::Deref; @@ -198,12 +199,74 @@ pub struct SimpleManyChannelMonitor>>, + watch_events: Mutex, chain_monitor: C, broadcaster: T, logger: L, fee_estimator: F } +struct WatchEventQueue { + watched: ChainWatchedUtil, + events: Vec, +} + +impl WatchEventQueue { + fn new() -> Self { + Self { + watched: ChainWatchedUtil::new(), + events: Vec::new(), + } + } + + fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) { + if self.watched.register_tx(txid, script_pubkey) { + self.events.push(chain::WatchEvent::WatchTransaction { + txid: *txid, + script_pubkey: script_pubkey.clone() + }); + } + } + + fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) { + let (txid, index) = outpoint; + if self.watched.register_outpoint((*txid, index as u32), script_pubkey) { + self.events.push(chain::WatchEvent::WatchOutput { + outpoint: OutPoint { + txid: *txid, + index: index as u16, + }, + script_pubkey: script_pubkey.clone(), + }); + } + } + + fn dequeue_events(&mut self) -> Vec { + let mut pending_events = Vec::with_capacity(self.events.len()); + pending_events.append(&mut self.events); + pending_events + } + + fn filter_block<'a>(&self, txdata: &[(usize, &'a Transaction)]) -> Vec<(usize, &'a Transaction)> { + let mut matched_txids = HashSet::new(); + txdata.iter().filter(|&&(_, tx)| { + // A tx matches the filter if it either matches the filter directly (via does_match_tx) + // or if it is a descendant of another matched transaction within the same block. + let mut matched = self.watched.does_match_tx(tx); + for input in tx.input.iter() { + if matched || matched_txids.contains(&input.previous_output.txid) { + matched = true; + break; + } + } + if matched { + matched_txids.insert(tx.txid()); + } + matched + }).map(|e| *e).collect() + } +} + impl ChainListener for SimpleManyChannelMonitor where T::Target: BroadcasterInterface, @@ -212,24 +275,19 @@ impl = matched_indexes.iter().map(|index| txdata[*index]).collect(); - let last_seen = self.chain_monitor.reentered(); - { - let mut monitors = self.monitors.lock().unwrap(); - for monitor in monitors.values_mut() { - let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); - - for (ref txid, ref outputs) in txn_outputs { - for (idx, output) in outputs.iter().enumerate() { - self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey); - } + let mut watch_events = self.watch_events.lock().unwrap(); + let matched_txn = watch_events.filter_block(txdata); + { + let mut monitors = self.monitors.lock().unwrap(); + for monitor in monitors.values_mut() { + let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + + for (ref txid, ref outputs) in txn_outputs { + for (idx, output) in outputs.iter().enumerate() { + watch_events.watch_output((txid, idx), &output.script_pubkey); } } } - reentered = last_seen != self.chain_monitor.reentered(); } } @@ -252,6 +310,7 @@ impl SimpleManyChannelMonitor { let res = SimpleManyChannelMonitor { monitors: Mutex::new(HashMap::new()), + watch_events: Mutex::new(WatchEventQueue::new()), chain_monitor, broadcaster, logger, @@ -263,6 +322,7 @@ impl) -> Result<(), MonitorUpdateError> { + let mut watch_events = self.watch_events.lock().unwrap(); let mut monitors = self.monitors.lock().unwrap(); let entry = match monitors.entry(key) { hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given key is already present")), @@ -271,11 +331,11 @@ impl chain::WatchEventProvider for SimpleManyChannelMonitor + where T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + C::Target: ChainWatchInterface, +{ + fn release_pending_watch_events(&self) -> Vec { + self.watch_events.lock().unwrap().dequeue_events() + } +} + /// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction, /// instead claiming it in its own individual transaction. pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12; @@ -881,30 +952,14 @@ pub trait ManyChannelMonitor: Send + Sync { /// Adds a monitor for the given `funding_txo`. /// - /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with - /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected - /// callbacks with the funding transaction, or any spends of it. - /// - /// Further, the implementer must also ensure that each output returned in - /// monitor.get_outputs_to_watch() is registered to ensure that the provided monitor learns about - /// any spends of any of the outputs. - /// - /// Any spends of outputs which should have been registered which aren't passed to - /// ChannelMonitors via block_connected may result in FUNDS LOSS. + /// Implementations must ensure that `monitor` receives block_connected calls for blocks with + /// the funding transaction or any spends of it, as well as any spends of outputs returned by + /// get_outputs_to_watch. Not doing so may result in LOST FUNDS. fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; /// Updates a monitor for the given `funding_txo`. /// - /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with - /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected - /// callbacks with the funding transaction, or any spends of it. - /// - /// Further, the implementer must also ensure that each output returned in - /// monitor.get_watch_outputs() is registered to ensure that the provided monitor learns about - /// any spends of any of the outputs. - /// - /// Any spends of outputs which should have been registered which aren't passed to - /// ChannelMonitors via block_connected may result in FUNDS LOSS. + /// TODO(jkczyz): Determine where this should go from e73036c6845fd3cc16479a1b497db82a5ebb3897. /// /// In case of distributed watchtowers deployment, even if an Err is return, the new version /// must be written to disk, as state may have been stored but rejected due to a block forcing diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index f8f324c9..c164cd6b 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -10,6 +10,7 @@ //! A bunch of useful utilities for building networks of nodes and exchanging messages between //! nodes for functional tests. +use chain; use chain::chaininterface; use chain::transaction::OutPoint; use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure}; @@ -80,9 +81,29 @@ pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32, he } pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, height: u32) { - node.block_notifier.block_connected(block, height) + use chain::WatchEventProvider; + use chain::chaininterface::ChainListener; + + let watch_events = node.chan_monitor.simple_monitor.release_pending_watch_events(); + process_chain_watch_events(&watch_events); + + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + loop { + node.chan_monitor.simple_monitor.block_connected(&block.header, &txdata, height); + + let watch_events = node.chan_monitor.simple_monitor.release_pending_watch_events(); + process_chain_watch_events(&watch_events); + + if watch_events.is_empty() { + break; + } + } + + node.node.block_connected(&block.header, &txdata, height); } +fn process_chain_watch_events(_events: &Vec) {} + pub fn disconnect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, header: &BlockHeader, height: u32) { node.block_notifier.block_disconnected(header, height) } diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 4bb48fc9..b834a531 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -417,9 +417,6 @@ fn test_1_conf_open() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let tx = create_chan_between_nodes_with_value_init(&nodes[0], &nodes[1], 100000, 10001, InitFeatures::known(), InitFeatures::known()); - assert!(nodes[0].chain_monitor.does_match_tx(&tx)); - assert!(nodes[1].chain_monitor.does_match_tx(&tx)); - let block = Block { header: BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }, txdata: vec![tx], @@ -2785,8 +2782,8 @@ fn claim_htlc_outputs_single_tx() { #[test] fn test_htlc_on_chain_success() { - // Test that in case of a unilateral close onchain, we detect the state of output thanks to - // ChainWatchInterface and pass the preimage backward accordingly. So here we test that ChannelManager is + // Test that in case of a unilateral close onchain, we detect the state of output and pass + // the preimage backward accordingly. So here we test that ChannelManager is // broadcasting the right event to other nodes in payment path. // We test with two HTLCs simultaneously as that was not handled correctly in the past. // A --------------------> B ----------------------> C (preimage) @@ -2964,8 +2961,8 @@ fn test_htlc_on_chain_success() { #[test] fn test_htlc_on_chain_timeout() { - // Test that in case of a unilateral close onchain, we detect the state of output thanks to - // ChainWatchInterface and timeout the HTLC backward accordingly. So here we test that ChannelManager is + // Test that in case of a unilateral close onchain, we detect the state of output and + // timeout the HTLC backward accordingly. So here we test that ChannelManager is // broadcasting the right event to other nodes in payment path. // A ------------------> B ----------------------> C (timeout) // B's commitment tx C's commitment tx @@ -5177,8 +5174,8 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_success_tx() { #[test] fn test_onchain_to_onchain_claim() { - // Test that in case of channel closure, we detect the state of output thanks to - // ChainWatchInterface and claim HTLC on downstream peer's remote commitment tx. + // Test that in case of channel closure, we detect the state of output and claim HTLC + // on downstream peer's remote commitment tx. // First, have C claim an HTLC against its own latest commitment transaction. // Then, broadcast these to B, which should update the monitor downstream on the A<->B // channel. -- 2.30.2