chaininterface+multi: add filter_block and reentered to ChainWatchInterface
authorValentine Wallace <vwallace@protonmail.com>
Mon, 18 Nov 2019 21:40:05 +0000 (16:40 -0500)
committerValentine Wallace <vwallace@protonmail.com>
Fri, 22 Nov 2019 01:35:53 +0000 (20:35 -0500)
Because filter_block takes a  and returns a list of s , we must add a lifetime to the ChainWatchInterface, which bubbles up in a lot of places. These places include adding a lifetime  to the Node struct, which causes a lot of rearranging tests so that variables don't go out of scope before the Node that owns them does.

lightning/fuzz/fuzz_targets/chanmon_fail_consistency.rs
lightning/fuzz/fuzz_targets/full_stack_target.rs
lightning/fuzz/fuzz_targets/router_target.rs
lightning/src/chain/chaininterface.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/channelmonitor.rs
lightning/src/ln/functional_test_utils.rs

index 33b7e26330a89efa0a63a3aeab33de815024a0f9..459fca47c5592bd9753eed23aac7070ebbddae18 100644 (file)
@@ -225,7 +225,6 @@ pub fn do_test(data: &[u8]) {
                                keys_manager,
                                fee_estimator: fee_est.clone(),
                                monitor: monitor.clone(),
-                               chain_monitor: watch,
                                tx_broadcaster: broadcast.clone(),
                                logger,
                                default_config: config,
@@ -246,7 +245,6 @@ pub fn do_test(data: &[u8]) {
                } }
        }
 
-
        let mut channel_txn = Vec::new();
        macro_rules! make_channel {
                ($source: expr, $dest: expr, $chan_id: expr) => { {
index f9f2e3bb0f54b311d561622ac26a8d70eb87237d..cd65a2f16307d9db6aeb35a00fa02154af63ed5c 100644 (file)
@@ -144,8 +144,8 @@ impl<'a> Hash for Peer<'a> {
        }
 }
 
-struct MoneyLossDetector<'a> {
-       manager: Arc<ChannelManager>,
+struct MoneyLossDetector<'a, 'b> {
+       manager: Arc<ChannelManager<'b>>,
        monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
        handler: PeerManager<Peer<'a>>,
 
@@ -157,8 +157,8 @@ struct MoneyLossDetector<'a> {
        max_height: usize,
        blocks_connected: u32,
 }
-impl<'a> MoneyLossDetector<'a> {
-       pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
+impl<'a, 'b> MoneyLossDetector<'a, 'b> {
+       pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<'b>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
                MoneyLossDetector {
                        manager,
                        monitor,
@@ -217,7 +217,7 @@ impl<'a> MoneyLossDetector<'a> {
        }
 }
 
-impl<'a> Drop for MoneyLossDetector<'a> {
+impl<'a, 'b> Drop for MoneyLossDetector<'a, 'b> {
        fn drop(&mut self) {
                if !::std::thread::panicking() {
                        // Disconnect all peers
index 25bf38274abe93b474a1f27b590d5262ca9209e7..d9a8f78d3ac2ab18d35106241eb9c2d27a9e89ee 100644 (file)
@@ -5,8 +5,10 @@ extern crate secp256k1;
 
 use bitcoin_hashes::sha256d::Hash as Sha256dHash;
 use bitcoin::blockdata::script::{Script, Builder};
+use bitcoin::blockdata::block::Block;
+use bitcoin::blockdata::transaction::Transaction;
 
-use lightning::chain::chaininterface::{ChainError,ChainWatchInterface, ChainListener};
+use lightning::chain::chaininterface::{ChainError,ChainWatchInterface};
 use lightning::ln::channelmanager::ChannelDetails;
 use lightning::ln::msgs;
 use lightning::ln::msgs::{RoutingMessageHandler};
@@ -20,7 +22,7 @@ mod utils;
 
 use utils::test_logger;
 
-use std::sync::{Weak, Arc};
+use std::sync::Arc;
 use std::sync::atomic::{AtomicUsize, Ordering};
 
 #[inline]
@@ -79,7 +81,10 @@ impl ChainWatchInterface for DummyChainWatcher {
        fn install_watch_tx(&self, _txid: &Sha256dHash, _script_pub_key: &Script) { }
        fn install_watch_outpoint(&self, _outpoint: (Sha256dHash, u32), _out_script: &Script) { }
        fn watch_all_txn(&self) { }
-       fn register_listener(&self, _listener: Weak<ChainListener>) { }
+       fn filter_block<'a>(&self, _block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>) {
+               (Vec::new(), Vec::new())
+       }
+       fn reentered(&self) -> usize { 0 }
 
        fn get_chain_utxo(&self, _genesis_hash: Sha256dHash, _unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError> {
                match self.input.get_slice(2) {
index 32fef48eb44a54acaa7a576502ad1e0f1e56311d..92c9c102597b8546e6ca42f3272d5f0cb9e2289f 100644 (file)
@@ -50,6 +50,15 @@ pub trait ChainWatchInterface: Sync + Send {
        /// bytes are the block height, the next 3 the transaction index within the block, and the
        /// final two the output within the transaction.
        fn get_chain_utxo(&self, genesis_hash: Sha256dHash, unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError>;
+
+       /// Gets the list of transactions and transaction indices that the ChainWatchInterface is
+       /// watching for.
+       fn filter_block<'a>(&self, block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>);
+
+       /// Returns a usize that changes when the ChainWatchInterface's watched data is modified.
+       /// Users of `filter_block` should pre-save a copy of `reentered`'s return value and use it to
+       /// determine whether they need to re-filter a given block.
+       fn reentered(&self) -> usize;
 }
 
 /// An interface to send a transaction to the Bitcoin network.
@@ -301,6 +310,25 @@ impl ChainWatchInterface for ChainWatchInterfaceUtil {
                }
                Err(ChainError::NotSupported)
        }
+
+       fn filter_block<'a>(&self, block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>) {
+               let mut matched = Vec::new();
+               let mut matched_index = Vec::new();
+               {
+                       let watched = self.watched.lock().unwrap();
+                       for (index, transaction) in block.txdata.iter().enumerate() {
+                               if self.does_match_tx_unguarded(transaction, &watched) {
+                                       matched.push(transaction);
+                                       matched_index.push(index as u32);
+                               }
+                       }
+               }
+               (matched, matched_index)
+       }
+
+       fn reentered(&self) -> usize {
+               self.reentered.load(Ordering::Relaxed)
+       }
 }
 
 impl ChainWatchInterfaceUtil {
index 0414cc5594363cf62ccd008ac679e9e5d64298fb..0058eccd34a30e92e2237839a2ae9dcd18c96d0e 100644 (file)
@@ -318,11 +318,11 @@ const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assum
 /// the "reorg path" (ie call block_disconnected() until you get to a common block and then call
 /// block_connected() to step towards your best block) upon deserialization before using the
 /// object!
-pub struct ChannelManager {
+pub struct ChannelManager<'a> {
        default_configuration: UserConfig,
        genesis_hash: Sha256dHash,
        fee_estimator: Arc<FeeEstimator>,
-       monitor: Arc<ManyChannelMonitor>,
+       monitor: Arc<ManyChannelMonitor + 'a>,
        tx_broadcaster: Arc<BroadcasterInterface>,
 
        #[cfg(test)]
@@ -575,7 +575,7 @@ macro_rules! maybe_break_monitor_err {
        }
 }
 
-impl ChannelManager {
+impl<'a> ChannelManager<'a> {
        /// Constructs a new ChannelManager to hold several channels and route between them.
        ///
        /// This is the main "logic hub" for all channel-related actions, and implements
@@ -594,7 +594,7 @@ impl ChannelManager {
        /// the ChannelManager as a listener to the BlockNotifier and call the BlockNotifier's
        /// `block_(dis)connected` methods, which will notify all registered listeners in one
        /// go.
-       pub fn new(network: Network, feeest: Arc<FeeEstimator>, monitor: Arc<ManyChannelMonitor>, chain_monitor: Arc<ChainWatchInterface>, tx_broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>,keys_manager: Arc<KeysInterface>, config: UserConfig, current_blockchain_height: usize) -> Result<Arc<ChannelManager>, secp256k1::Error> {
+       pub fn new(network: Network, feeest: Arc<FeeEstimator>, monitor: Arc<ManyChannelMonitor + 'a>, tx_broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>,keys_manager: Arc<KeysInterface>, config: UserConfig, current_blockchain_height: usize) -> Result<Arc<ChannelManager<'a>>, secp256k1::Error> {
                let secp_ctx = Secp256k1::new();
 
                let res = Arc::new(ChannelManager {
@@ -2518,7 +2518,7 @@ impl ChannelManager {
        }
 }
 
-impl events::MessageSendEventsProvider for ChannelManager {
+impl<'a> events::MessageSendEventsProvider for ChannelManager<'a> {
        fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
                // TODO: Event release to users and serialization is currently race-y: it's very easy for a
                // user to serialize a ChannelManager with pending events in it and lose those events on
@@ -2543,7 +2543,7 @@ impl events::MessageSendEventsProvider for ChannelManager {
        }
 }
 
-impl events::EventsProvider for ChannelManager {
+impl<'a> events::EventsProvider for ChannelManager<'a> {
        fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
                // TODO: Event release to users and serialization is currently race-y: it's very easy for a
                // user to serialize a ChannelManager with pending events in it and lose those events on
@@ -2568,7 +2568,7 @@ impl events::EventsProvider for ChannelManager {
        }
 }
 
-impl ChainListener for ChannelManager {
+impl<'a> ChainListener for ChannelManager<'a> {
        fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
                let header_hash = header.bitcoin_hash();
                log_trace!(self, "Block {} at height {} connected with {} txn matched", header_hash, height, txn_matched.len());
@@ -2682,7 +2682,7 @@ impl ChainListener for ChannelManager {
        }
 }
 
-impl ChannelMessageHandler for ChannelManager {
+impl<'a> ChannelMessageHandler for ChannelManager<'a> {
        //TODO: Handle errors and close channel (or so)
        fn handle_open_channel(&self, their_node_id: &PublicKey, their_local_features: LocalFeatures, msg: &msgs::OpenChannel) -> Result<(), LightningError> {
                let _ = self.total_consistency_lock.read().unwrap();
@@ -3067,7 +3067,7 @@ impl<R: ::std::io::Read> Readable<R> for HTLCForwardInfo {
        }
 }
 
-impl Writeable for ChannelManager {
+impl<'a> Writeable for ChannelManager<'a> {
        fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
                let _ = self.total_consistency_lock.write().unwrap();
 
@@ -3130,7 +3130,7 @@ impl Writeable for ChannelManager {
 /// 5) Move the ChannelMonitors into your local ManyChannelMonitor.
 /// 6) Disconnect/connect blocks on the ChannelManager.
 /// 7) Register the new ChannelManager with your ChainWatchInterface.
-pub struct ChannelManagerReadArgs<'a> {
+pub struct ChannelManagerReadArgs<'a, 'b> {
        /// The keys provider which will give us relevant keys. Some keys will be loaded during
        /// deserialization.
        pub keys_manager: Arc<KeysInterface>,
@@ -3144,7 +3144,7 @@ pub struct ChannelManagerReadArgs<'a> {
        /// No calls to the ManyChannelMonitor will be made during deserialization. It is assumed that
        /// you have deserialized ChannelMonitors separately and will add them to your
        /// ManyChannelMonitor after deserializing this ChannelManager.
-       pub monitor: Arc<ManyChannelMonitor>,
+       pub monitor: Arc<ManyChannelMonitor + 'b>,
 
        /// The BroadcasterInterface which will be used in the ChannelManager in the future and may be
        /// used to broadcast the latest local commitment transactions of channels which must be
@@ -3170,8 +3170,8 @@ pub struct ChannelManagerReadArgs<'a> {
        pub channel_monitors: &'a HashMap<OutPoint, &'a ChannelMonitor>,
 }
 
-impl<'a, R : ::std::io::Read> ReadableArgs<R, ChannelManagerReadArgs<'a>> for (Sha256dHash, ChannelManager) {
-       fn read(reader: &mut R, args: ChannelManagerReadArgs<'a>) -> Result<Self, DecodeError> {
+impl<'a, 'b, R : ::std::io::Read> ReadableArgs<R, ChannelManagerReadArgs<'a, 'b>> for (Sha256dHash, ChannelManager<'b>) {
+       fn read(reader: &mut R, args: ChannelManagerReadArgs<'a, 'b>) -> Result<Self, DecodeError> {
                let _ver: u8 = Readable::read(reader)?;
                let min_ver: u8 = Readable::read(reader)?;
                if min_ver > SERIALIZATION_VERSION {
index dfa56d6f2555cab8c59520a5183d1dc6e5f1f7fe..1957128a1d3733d0f6fef67f8be17e6bee043d6c 100644 (file)
@@ -152,7 +152,8 @@ pub struct SimpleManyChannelMonitor<Key> {
        fee_estimator: Arc<FeeEstimator>
 }
 
-impl<Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelMonitor<Key> {
+impl<'a, Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelMonitor<Key> {
+
        fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
                let block_hash = header.bitcoin_hash();
                let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
@@ -2143,14 +2144,14 @@ impl ChannelMonitor {
                                };
                                if funding_txo.is_none() || (prevout.txid == funding_txo.as_ref().unwrap().0.txid && prevout.vout == funding_txo.as_ref().unwrap().0.index as u32) {
                                        if (tx.input[0].sequence >> 8*3) as u8 == 0x80 && (tx.lock_time >> 8*3) as u8 == 0x20 {
-                                               let (remote_txn, new_outputs, mut spendable_output) = self.check_spend_remote_transaction(tx, height, fee_estimator);
+                                               let (remote_txn, new_outputs, mut spendable_output) = self.check_spend_remote_transaction(&tx, height, fee_estimator);
                                                txn = remote_txn;
                                                spendable_outputs.append(&mut spendable_output);
                                                if !new_outputs.1.is_empty() {
                                                        watch_outputs.push(new_outputs);
                                                }
                                                if txn.is_empty() {
-                                                       let (local_txn, mut spendable_output, new_outputs) = self.check_spend_local_transaction(tx, height);
+                                                       let (local_txn, mut spendable_output, new_outputs) = self.check_spend_local_transaction(&tx, height);
                                                        spendable_outputs.append(&mut spendable_output);
                                                        txn = local_txn;
                                                        if !new_outputs.1.is_empty() {
@@ -2159,13 +2160,13 @@ impl ChannelMonitor {
                                                }
                                        }
                                        if !funding_txo.is_none() && txn.is_empty() {
-                                               if let Some(spendable_output) = self.check_spend_closing_transaction(tx) {
+                                               if let Some(spendable_output) = self.check_spend_closing_transaction(&tx) {
                                                        spendable_outputs.push(spendable_output);
                                                }
                                        }
                                } else {
                                        if let Some(&(commitment_number, _)) = self.remote_commitment_txn_on_chain.get(&prevout.txid) {
-                                               let (tx, spendable_output) = self.check_spend_remote_htlc(tx, commitment_number, height, fee_estimator);
+                                               let (tx, spendable_output) = self.check_spend_remote_htlc(&tx, commitment_number, height, fee_estimator);
                                                if let Some(tx) = tx {
                                                        txn.push(tx);
                                                }
@@ -2181,7 +2182,7 @@ impl ChannelMonitor {
                        // While all commitment/HTLC-Success/HTLC-Timeout transactions have one input, HTLCs
                        // can also be resolved in a few other ways which can have more than one output. Thus,
                        // we call is_resolving_htlc_output here outside of the tx.input.len() == 1 check.
-                       let mut updated = self.is_resolving_htlc_output(tx, height);
+                       let mut updated = self.is_resolving_htlc_output(&tx, height);
                        if updated.len() > 0 {
                                htlc_updated.append(&mut updated);
                        }
index 2745bae1dbfe39d4008b7b5c997cc50bbfdd038c..4c8f162bcf53dc41c982346ce4335222ce4e451f 100644 (file)
@@ -54,19 +54,19 @@ pub fn connect_blocks(notifier: &chaininterface::BlockNotifier, depth: u32, heig
        header.bitcoin_hash()
 }
 
-pub struct Node {
-       pub block_notifier: Arc<chaininterface::BlockNotifier<'a, 'b>>,
+pub struct Node<'a, 'b: 'a> {
+       pub block_notifier: Arc<chaininterface::BlockNotifier<'a>>,
        pub chain_monitor: Arc<chaininterface::ChainWatchInterfaceUtil>,
        pub tx_broadcaster: Arc<test_utils::TestBroadcaster>,
        pub chan_monitor: Arc<test_utils::TestChannelMonitor>,
        pub keys_manager: Arc<test_utils::TestKeysInterface>,
-       pub node: Arc<ChannelManager>,
+       pub node: Arc<ChannelManager<'b>>,
        pub router: Router,
        pub node_seed: [u8; 32],
        pub network_payment_count: Rc<RefCell<u8>>,
        pub network_chan_count: Rc<RefCell<u32>>,
 }
-impl Drop for Node {
+impl<'a, 'b> Drop for Node<'a, 'b> {
        fn drop(&mut self) {
                if !::std::thread::panicking() {
                        // Check that we processed all pending events
@@ -354,7 +354,7 @@ macro_rules! check_closed_broadcast {
        }}
 }
 
-pub fn close_channel(outbound_node: &Node, inbound_node: &Node, channel_id: &[u8; 32], funding_tx: Transaction, close_inbound_first: bool) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, Transaction) {
+pub fn close_channel<'a, 'b>(outbound_node: &Node<'a, 'b>, inbound_node: &Node<'a, 'b>, channel_id: &[u8; 32], funding_tx: Transaction, close_inbound_first: bool) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, Transaction) {
        let (node_a, broadcaster_a, struct_a) = if close_inbound_first { (&inbound_node.node, &inbound_node.tx_broadcaster, inbound_node) } else { (&outbound_node.node, &outbound_node.tx_broadcaster, outbound_node) };
        let (node_b, broadcaster_b) = if close_inbound_first { (&outbound_node.node, &outbound_node.tx_broadcaster) } else { (&inbound_node.node, &inbound_node.tx_broadcaster) };
        let (tx_a, tx_b);
@@ -589,7 +589,7 @@ macro_rules! expect_payment_sent {
        }
 }
 
-pub fn send_along_route_with_hash(origin_node: &Node, route: Route, expected_route: &[&Node], recv_value: u64, our_payment_hash: PaymentHash) {
+pub fn send_along_route_with_hash<'a, 'b>(origin_node: &Node<'a, 'b>, route: Route, expected_route: &[&Node<'a, 'b>], recv_value: u64, our_payment_hash: PaymentHash) {
        let mut payment_event = {
                origin_node.node.send_payment(route, our_payment_hash).unwrap();
                check_added_monitors!(origin_node, 1);
@@ -631,7 +631,7 @@ pub fn send_along_route_with_hash(origin_node: &Node, route: Route, expected_rou
        }
 }
 
-pub fn send_along_route(origin_node: &Node, route: Route, expected_route: &[&Node], recv_value: u64) -> (PaymentPreimage, PaymentHash) {
+pub fn send_along_route<'a, 'b>(origin_node: &Node<'a, 'b>, route: Route, expected_route: &[&Node<'a, 'b>], recv_value: u64) -> (PaymentPreimage, PaymentHash) {
        let (our_payment_preimage, our_payment_hash) = get_payment_preimage_hash!(origin_node);
        send_along_route_with_hash(origin_node, route, expected_route, recv_value, our_payment_hash);
        (our_payment_preimage, our_payment_hash)
@@ -721,7 +721,7 @@ pub fn claim_payment(origin_node: &Node, expected_route: &[&Node], our_payment_p
 
 pub const TEST_FINAL_CLTV: u32 = 32;
 
-pub fn route_payment(origin_node: &Node, expected_route: &[&Node], recv_value: u64) -> (PaymentPreimage, PaymentHash) {
+pub fn route_payment<'a, 'b>(origin_node: &Node<'a, 'b>, expected_route: &[&Node<'a, 'b>], recv_value: u64) -> (PaymentPreimage, PaymentHash) {
        let route = origin_node.router.get_route(&expected_route.last().unwrap().node.get_our_node_id(), None, &Vec::new(), recv_value, TEST_FINAL_CLTV).unwrap();
        assert_eq!(route.hops.len(), expected_route.len());
        for (node, hop) in expected_route.iter().zip(route.hops.iter()) {
@@ -747,7 +747,7 @@ pub fn route_over_limit(origin_node: &Node, expected_route: &[&Node], recv_value
        };
 }
 
-pub fn send_payment(origin: &Node, expected_route: &[&Node], recv_value: u64, expected_value: u64) {
+pub fn send_payment<'a, 'b>(origin: &Node<'a, 'b>, expected_route: &[&Node<'a, 'b>], recv_value: u64, expected_value: u64) {
        let our_payment_preimage = route_payment(&origin, expected_route, recv_value).0;
        claim_payment(&origin, expected_route, our_payment_preimage, expected_value);
 }