From c9e7e31427f51e606bd90a4d280c1356af8baf37 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 8 Jul 2020 23:37:13 -0700 Subject: [PATCH] Replace use of ChainWatchInterface with WatchEvent SimpleManyChannelMonitor is parameterized by ChainWatchInterface to signal what transactions and outputs to watch for on chain. The interface has grown to cover chain access (via get_chain_utxo) and block block filtering (via filter_block and reentered), which has added complexity for implementations and user (see ChainWatchInterfaceUtil). Pull the watch functionality out as a first step to eliminating ChainWatchInterface entirely. --- lightning/src/chain/mod.rs | 35 +++++++ lightning/src/ln/channelmonitor.rs | 114 ++++++++++++++-------- lightning/src/ln/functional_test_utils.rs | 23 ++++- lightning/src/ln/functional_tests.rs | 15 ++- 4 files changed, 137 insertions(+), 50 deletions(-) diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index ffa5ed968..6acca8a55 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -1,5 +1,40 @@ //! Structs and traits which allow other parts of rust-lightning to interact with the blockchain. +use bitcoin::blockdata::script::Script; +use bitcoin::hash_types::Txid; + +use chain::transaction::OutPoint; + pub mod chaininterface; pub mod transaction; pub mod keysinterface; + +/// An interface for providing [`WatchEvent`]s. +/// +/// [`WatchEvent`]: enum.WatchEvent.html +pub trait WatchEventProvider { + /// Releases events produced since the last call. Subsequent calls must only return new events. + fn release_pending_watch_events(&self) -> Vec; +} + +/// An event indicating on-chain activity to watch for pertaining to a channel. +pub enum WatchEvent { + /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending + /// condition. + WatchTransaction { + /// Identifier of the transaction. + txid: Txid, + + /// Spending condition for an output of the transaction. + script_pubkey: Script, + }, + /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as + /// the spending condition. + WatchOutput { + /// Identifier for the output. + outpoint: OutPoint, + + /// Spending condition for the output. + script_pubkey: Script, + } +} diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index 33cc869ef..1980eb5c2 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -32,7 +32,8 @@ use ln::chan_utils; use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, LocalCommitmentTransaction, HTLCType}; use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash}; use ln::onchaintx::{OnchainTxHandler, InputDescriptors}; -use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface, FeeEstimator}; +use chain; +use chain::chaininterface::{ChainListener, ChainWatchInterface, ChainWatchedUtil, BroadcasterInterface, FeeEstimator}; use chain::transaction::OutPoint; use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys}; use util::logger::Logger; @@ -170,12 +171,55 @@ pub struct SimpleManyChannelMonitor>>, #[cfg(not(test))] monitors: Mutex>>, + watch_events: Mutex, chain_monitor: C, broadcaster: T, logger: L, fee_estimator: F } +struct WatchEventQueue { + watched: ChainWatchedUtil, + events: Vec, +} + +impl WatchEventQueue { + fn new() -> Self { + Self { + watched: ChainWatchedUtil::new(), + events: Vec::new(), + } + } + + fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) { + if self.watched.register_tx(txid, script_pubkey) { + self.events.push(chain::WatchEvent::WatchTransaction { + txid: *txid, + script_pubkey: script_pubkey.clone() + }); + } + } + + fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) { + let (txid, index) = outpoint; + if self.watched.register_outpoint((*txid, index as u32), script_pubkey) { + self.events.push(chain::WatchEvent::WatchOutput { + outpoint: OutPoint { + txid: *txid, + index: index as u16, + }, + script_pubkey: script_pubkey.clone(), + }); + } + } + + fn dequeue_events(&mut self) -> Vec { + let mut pending_events = Vec::with_capacity(self.events.len()); + pending_events.append(&mut self.events); + pending_events + } +} + impl ChainListener for SimpleManyChannelMonitor where T::Target: BroadcasterInterface, @@ -184,24 +228,19 @@ impl = matched_indexes.iter().map(|index| txdata[*index]).collect(); - let last_seen = self.chain_monitor.reentered(); - { - let mut monitors = self.monitors.lock().unwrap(); - for monitor in monitors.values_mut() { - let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); - - for (ref txid, ref outputs) in txn_outputs { - for (idx, output) in outputs.iter().enumerate() { - self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey); - } + let mut watch_events = self.watch_events.lock().unwrap(); + let matched_txn: Vec<_> = txdata.iter().filter(|&&(_, tx)| watch_events.watched.does_match_tx(tx)).map(|e| *e).collect(); + { + let mut monitors = self.monitors.lock().unwrap(); + for monitor in monitors.values_mut() { + let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + + for (ref txid, ref outputs) in txn_outputs { + for (idx, output) in outputs.iter().enumerate() { + watch_events.watch_output((txid, idx), &output.script_pubkey); } } } - reentered = last_seen != self.chain_monitor.reentered(); } } @@ -224,6 +263,7 @@ impl SimpleManyChannelMonitor { let res = SimpleManyChannelMonitor { monitors: Mutex::new(HashMap::new()), + watch_events: Mutex::new(WatchEventQueue::new()), chain_monitor, broadcaster, logger, @@ -235,6 +275,7 @@ impl) -> Result<(), MonitorUpdateError> { + let mut watch_events = self.watch_events.lock().unwrap(); let mut monitors = self.monitors.lock().unwrap(); let entry = match monitors.entry(key) { hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given key is already present")), @@ -243,11 +284,11 @@ impl chain::WatchEventProvider for SimpleManyChannelMonitor + where T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + C::Target: ChainWatchInterface, +{ + fn release_pending_watch_events(&self) -> Vec { + self.watch_events.lock().unwrap().dequeue_events() + } +} + /// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction, /// instead claiming it in its own individual transaction. pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12; @@ -849,30 +901,12 @@ pub trait ManyChannelMonitor: Send + Sync { /// Adds a monitor for the given `funding_txo`. /// - /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with - /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected - /// callbacks with the funding transaction, or any spends of it. - /// - /// Further, the implementer must also ensure that each output returned in - /// monitor.get_outputs_to_watch() is registered to ensure that the provided monitor learns about - /// any spends of any of the outputs. - /// - /// Any spends of outputs which should have been registered which aren't passed to - /// ChannelMonitors via block_connected may result in FUNDS LOSS. + /// Implementations must ensure that `monitor` receives block_connected calls for blocks with + /// the funding transaction or any spends of it, as well as any spends of outputs returned by + /// get_outputs_to_watch. Not doing so may result in LOST FUNDS. fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; /// Updates a monitor for the given `funding_txo`. - /// - /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with - /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected - /// callbacks with the funding transaction, or any spends of it. - /// - /// Further, the implementer must also ensure that each output returned in - /// monitor.get_watch_outputs() is registered to ensure that the provided monitor learns about - /// any spends of any of the outputs. - /// - /// Any spends of outputs which should have been registered which aren't passed to - /// ChannelMonitors via block_connected may result in FUNDS LOSS. fn update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr>; /// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 10938cf31..607eb0de0 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -1,6 +1,7 @@ //! A bunch of useful utilities for building networks of nodes and exchanging messages between //! nodes for functional tests. +use chain; use chain::chaininterface; use chain::transaction::OutPoint; use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure}; @@ -72,9 +73,29 @@ pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32, he } pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, height: u32) { - node.block_notifier.block_connected(block, height) + use chain::WatchEventProvider; + use chain::chaininterface::ChainListener; + + let watch_events = node.chan_monitor.simple_monitor.release_pending_watch_events(); + process_chain_watch_events(&watch_events); + + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); + loop { + node.chan_monitor.simple_monitor.block_connected(&block.header, &txdata, height); + + let watch_events = node.chan_monitor.simple_monitor.release_pending_watch_events(); + process_chain_watch_events(&watch_events); + + if watch_events.is_empty() { + break; + } + } + + node.node.block_connected(&block.header, &txdata, height); } +fn process_chain_watch_events(_events: &Vec) {} + pub fn disconnect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, header: &BlockHeader, height: u32) { node.block_notifier.block_disconnected(header, height) } diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 5b29d80d4..32162c5d7 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -408,9 +408,6 @@ fn test_1_conf_open() { let nodes = create_network(2, &node_cfgs, &node_chanmgrs); let tx = create_chan_between_nodes_with_value_init(&nodes[0], &nodes[1], 100000, 10001, InitFeatures::known(), InitFeatures::known()); - assert!(nodes[0].chain_monitor.does_match_tx(&tx)); - assert!(nodes[1].chain_monitor.does_match_tx(&tx)); - let block = Block { header: BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 }, txdata: vec![tx], @@ -2739,8 +2736,8 @@ fn claim_htlc_outputs_single_tx() { #[test] fn test_htlc_on_chain_success() { - // Test that in case of a unilateral close onchain, we detect the state of output thanks to - // ChainWatchInterface and pass the preimage backward accordingly. So here we test that ChannelManager is + // Test that in case of a unilateral close onchain, we detect the state of output and pass + // the preimage backward accordingly. So here we test that ChannelManager is // broadcasting the right event to other nodes in payment path. // We test with two HTLCs simultaneously as that was not handled correctly in the past. // A --------------------> B ----------------------> C (preimage) @@ -2918,8 +2915,8 @@ fn test_htlc_on_chain_success() { #[test] fn test_htlc_on_chain_timeout() { - // Test that in case of a unilateral close onchain, we detect the state of output thanks to - // ChainWatchInterface and timeout the HTLC backward accordingly. So here we test that ChannelManager is + // Test that in case of a unilateral close onchain, we detect the state of output and + // timeout the HTLC backward accordingly. So here we test that ChannelManager is // broadcasting the right event to other nodes in payment path. // A ------------------> B ----------------------> C (timeout) // B's commitment tx C's commitment tx @@ -5092,8 +5089,8 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_success_tx() { #[test] fn test_onchain_to_onchain_claim() { - // Test that in case of channel closure, we detect the state of output thanks to - // ChainWatchInterface and claim HTLC on downstream peer's remote commitment tx. + // Test that in case of channel closure, we detect the state of output and claim HTLC + // on downstream peer's remote commitment tx. // First, have C claim an HTLC against its own latest commitment transaction. // Then, broadcast these to B, which should update the monitor downstream on the A<->B // channel. -- 2.39.5