Split out BroadcastInterface, ChainWatchInterface monitors re-enter from called listeners
authorTamas Blummer <tamas.blummer@gmail.com>
Tue, 13 Mar 2018 17:51:33 +0000 (18:51 +0100)
committerMatt Corallo <git@bluematt.me>
Mon, 19 Mar 2018 20:14:19 +0000 (16:14 -0400)
Cargo.toml
src/chain/bitcoincorerpcchain.rs
src/chain/chaininterface.rs
src/chain/mod.rs
src/chain/rustbitcoinchain.rs [deleted file]
src/lib.rs
src/ln/channelmanager.rs
src/ln/channelmonitor.rs
src/util/test_utils.rs

index 822484298536ada033829f0f67100bf107246ac0..b7c6e5cf521e1a306de90c44f9c4782b3aad6954 100644 (file)
@@ -15,7 +15,6 @@ non_bitcoin_chain_hash_routing = []
 
 [dependencies]
 bitcoin = "0.11"
-bitcoin-chain = { git = "https://github.com/rust-bitcoin/rust-bitcoin-chain", branch = "master" }
 rust-crypto = "0.2"
 rand = "0.4"
 secp256k1 = "0.8"
index f51b1d735a2d2f88c7281c683387bd5d892533c9..5c14b5da06cf0c8cbc5df61c0a730841a4ed5324 100644 (file)
@@ -2,7 +2,7 @@ use bitcoin::blockdata::transaction::Transaction;
 use bitcoin::blockdata::script::Script;
 use bitcoin::util::hash::Sha256dHash;
 
-use chain::chaininterface::{ChainWatchInterface,ChainWatchInterfaceUtil,ChainListener};
+use chain::chaininterface::{ChainWatchInterface,ChainWatchInterfaceUtil,ChainListener, BroadcasterInterface};
 
 use std::sync::Weak;
 
@@ -23,15 +23,17 @@ impl ChainWatchInterface for BitcoinCoreRPCClientChain {
                self.util.watch_all_txn()
        }
 
-       fn broadcast_transaction(&self, _tx: &Transaction) {
-               unimplemented!()
-       }
-
        fn register_listener(&self, listener: Weak<ChainListener>) {
                self.util.register_listener(listener)
        }
 }
 
+impl BroadcasterInterface for BitcoinCoreRPCClientChain {
+       fn broadcast_transaction(&self, _tx: &Transaction) {
+               unimplemented!()
+       }
+}
+
 impl BitcoinCoreRPCClientChain {
        pub fn new() -> BitcoinCoreRPCClientChain {
                BitcoinCoreRPCClientChain {
index ad298ce94db35eb4a29bec7c11fc884e6a8cf8af..f456afda691b023c3285ffe466aecb9bfd411f76 100644 (file)
@@ -1,9 +1,9 @@
-use bitcoin::blockdata::block::BlockHeader;
+use bitcoin::blockdata::block::{Block, BlockHeader};
 use bitcoin::blockdata::transaction::Transaction;
 use bitcoin::blockdata::script::Script;
 use bitcoin::util::hash::Sha256dHash;
-
-use std::sync::{Weak,Mutex};
+use std::sync::{Mutex,Weak,MutexGuard};
+use std::sync::atomic::{AtomicUsize, Ordering};
 
 /// An interface to request notification of certain scripts as they appear the
 /// chain.
@@ -21,13 +21,18 @@ pub trait ChainWatchInterface: Sync + Send {
        /// Indicates that a listener needs to see all transactions.
        fn watch_all_txn(&self);
 
-       /// Sends a transaction out to (hopefully) be mined
-       fn broadcast_transaction(&self, tx: &Transaction);
-
        fn register_listener(&self, listener: Weak<ChainListener>);
        //TODO: unregister
 }
 
+/// An interface to send a transaction to connected Bitcoin peers.
+/// This is for final settlement. An error might indicate that no peers can be reached or
+/// that peers rejected the transaction.
+pub trait BroadcasterInterface: Sync + Send {
+       /// Sends a transaction out to (hopefully) be mined
+       fn broadcast_transaction(&self, tx: &Transaction);
+}
+
 /// A trait indicating a desire to listen for events from the chain
 pub trait ChainListener: Sync + Send {
        /// Notifies a listener that a block was connected.
@@ -54,7 +59,7 @@ pub enum ConfirmationTarget {
 /// called from inside the library in response to ChainListener events, P2P events, or timer
 /// events).
 pub trait FeeEstimator: Sync + Send {
-       fn get_est_sat_per_vbyte(&self, ConfirmationTarget) -> u64;
+       fn get_est_sat_per_vbyte(&self, confirmation_target: ConfirmationTarget) -> u64;
 }
 
 /// Utility to capture some common parts of ChainWatchInterface implementors.
@@ -62,59 +67,97 @@ pub trait FeeEstimator: Sync + Send {
 pub struct ChainWatchInterfaceUtil {
        watched: Mutex<(Vec<Script>, Vec<(Sha256dHash, u32)>, bool)>, //TODO: Something clever to optimize this
        listeners: Mutex<Vec<Weak<ChainListener>>>,
+       reentered: AtomicUsize
 }
 
-impl ChainWatchInterfaceUtil {
-       pub fn new() -> ChainWatchInterfaceUtil {
-               ChainWatchInterfaceUtil {
-                       watched: Mutex::new((Vec::new(), Vec::new(), false)),
-                       listeners: Mutex::new(Vec::new()),
-               }
-       }
-
-       pub fn install_watch_script(&self, spk: Script) {
+/// Register listener
+impl ChainWatchInterface for ChainWatchInterfaceUtil {
+       fn install_watch_script(&self, script_pub_key: Script) {
                let mut watched = self.watched.lock().unwrap();
-               watched.0.push(Script::from(spk));
+               watched.0.push(Script::from(script_pub_key));
+               self.reentered.fetch_add(1, Ordering::Relaxed);
        }
 
-       pub fn install_watch_outpoint(&self, outpoint: (Sha256dHash, u32)) {
+       fn install_watch_outpoint(&self, outpoint: (Sha256dHash, u32)) {
                let mut watched = self.watched.lock().unwrap();
                watched.1.push(outpoint);
+               self.reentered.fetch_add(1, Ordering::Relaxed);
        }
 
-       pub fn watch_all_txn(&self) { //TODO: refcnt this?
+       fn watch_all_txn(&self) {
                let mut watched = self.watched.lock().unwrap();
                watched.2 = true;
+               self.reentered.fetch_add(1, Ordering::Relaxed);
        }
 
-       pub fn register_listener(&self, listener: Weak<ChainListener>) {
+       fn register_listener(&self, listener: Weak<ChainListener>) {
                let mut vec = self.listeners.lock().unwrap();
                vec.push(listener);
        }
+}
 
-       pub fn do_call_block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
+impl ChainWatchInterfaceUtil {
+       pub fn new() -> ChainWatchInterfaceUtil {
+               ChainWatchInterfaceUtil {
+                       watched: Mutex::new((Vec::new(), Vec::new(), false)),
+                       listeners: Mutex::new(Vec::new()),
+                       reentered: AtomicUsize::new(1)
+               }
+       }
+
+       /// notify listener that a block was connected
+       /// notification will repeat if notified listener register new listeners
+       pub fn block_connected_with_filtering(&self, block: &Block, height: u32) {
+               let mut reentered = true;
+               while reentered {
+                       let mut matched = Vec::new();
+                       let mut matched_index = Vec::new();
+                       {
+                               let watched = self.watched.lock().unwrap();
+                               for (index, transaction) in block.txdata.iter().enumerate() {
+                                       if self.does_match_tx_unguarded(transaction, &watched) {
+                                               matched.push(transaction);
+                                               matched_index.push(index as u32);
+                                       }
+                               }
+                       }
+                       reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice());
+               }
+       }
+
+       /// notify listener that a block was disconnected
+       pub fn block_disconnected(&self, header: &BlockHeader) {
                let listeners = self.listeners.lock().unwrap().clone();
                for listener in listeners.iter() {
                        match listener.upgrade() {
-                               Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
+                               Some(arc) => arc.block_disconnected(header),
                                None => ()
                        }
                }
        }
 
-       pub fn do_call_block_disconnected(&self, header: &BlockHeader) {
+       /// call listeners for connected blocks if they are still around.
+       /// returns true if notified listeners registered additional listener
+       pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
+               let last_seen = self.reentered.load(Ordering::Relaxed);
+
                let listeners = self.listeners.lock().unwrap().clone();
                for listener in listeners.iter() {
                        match listener.upgrade() {
-                               Some(arc) => arc.block_disconnected(header),
+                               Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
                                None => ()
                        }
                }
+               return last_seen != self.reentered.load(Ordering::Relaxed);
        }
 
        /// Checks if a given transaction matches the current filter
        pub fn does_match_tx(&self, tx: &Transaction) -> bool {
                let watched = self.watched.lock().unwrap();
+               self.does_match_tx_unguarded (tx, &watched)
+       }
+
+       fn does_match_tx_unguarded (&self, tx: &Transaction, watched: &MutexGuard<(Vec<Script>, Vec<(Sha256dHash, u32)>, bool)>) -> bool {
                if watched.2 {
                        return true;
                }
index a2b1453dc604ec1b464460554d66afc0737546b3..c8a528820a0eab0d6f51d8db6033962ac26bd6d7 100644 (file)
@@ -1,3 +1,2 @@
 pub mod chaininterface;
 pub mod bitcoincorerpcchain;
-pub mod rustbitcoinchain;
diff --git a/src/chain/rustbitcoinchain.rs b/src/chain/rustbitcoinchain.rs
deleted file mode 100644 (file)
index d933416..0000000
+++ /dev/null
@@ -1,66 +0,0 @@
-use bitcoin_chain::blockchain::Blockchain;
-use bitcoin::blockdata::transaction::Transaction;
-use bitcoin::blockdata::block::Block;
-use bitcoin::blockdata::script::Script;
-use bitcoin::network::constants::Network;
-use bitcoin::util::hash::Sha256dHash;
-
-use chain::chaininterface::{ChainWatchInterface,ChainWatchInterfaceUtil,ChainListener};
-
-use std::sync::{Mutex,Weak};
-
-/// Implements a ChainWatchInterface using rust-bitcoin's Blockchain class
-pub struct ChainWatchImpl {
-       chain: Mutex<Blockchain>,
-       util: ChainWatchInterfaceUtil
-}
-
-unsafe impl Send for ChainWatchImpl {} //TODO: GAH WTF
-unsafe impl Sync for ChainWatchImpl {} //TODO: GAH WTF
-
-impl ChainWatchInterface for ChainWatchImpl {
-       fn install_watch_script(&self, spk: Script) {
-               self.util.install_watch_script(spk)
-       }
-
-       fn install_watch_outpoint(&self, outpoint: (Sha256dHash, u32)) {
-               self.util.install_watch_outpoint(outpoint)
-       }
-
-       fn watch_all_txn(&self) {
-               self.util.watch_all_txn()
-       }
-
-       fn broadcast_transaction(&self, _tx: &Transaction) {
-               unimplemented!()
-       }
-
-       fn register_listener(&self, listener: Weak<ChainListener>) {
-               self.util.register_listener(listener)
-       }
-}
-
-impl ChainWatchImpl {
-       pub fn new(network: Network) -> ChainWatchImpl {
-               ChainWatchImpl {
-                       chain: Mutex::new(Blockchain::new(network)),
-                       util: ChainWatchInterfaceUtil::new(),
-               }
-       }
-
-       pub fn add_block(&mut self, block: Block) {
-               {
-                       let mut txn_matched: Vec<&Transaction> = Vec::new();
-                       let mut indexes_of_txn_matched = Vec::new();
-                       for (idx, tx) in block.txdata.iter().enumerate() {
-                               if self.util.does_match_tx(&tx) {
-                                       txn_matched.push(tx);
-                                       indexes_of_txn_matched.push(idx as u32);
-                               }
-                       }
-                       //TODO: Height
-                       self.util.do_call_block_connected(&block.header, 0, &txn_matched[..], &indexes_of_txn_matched[..]);
-               }
-               self.chain.lock().unwrap().add_block(block).unwrap();
-       }
-}
index 7d2654a5b431753e2d3bcc2e209c3a4c4df46a04..c01321776e0ae043f7570a186e876ed305a2120c 100644 (file)
@@ -1,7 +1,6 @@
 #![crate_name = "lightning"]
 
 extern crate bitcoin;
-extern crate bitcoin_chain;
 extern crate secp256k1;
 extern crate rand;
 extern crate crypto;
index e0e6cf36d7f5a4325e8c68046be2af2b7a0318c8..64c84172c90b0acebde4277d6b2095b3d1a945b3 100644 (file)
@@ -1392,10 +1392,10 @@ mod tests {
        fn confirm_transaction(chain: &test_utils::TestWatchInterface, tx: &Transaction) {
                let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                let chan_id = unsafe { CHAN_COUNT };
-               chain.watch_util.do_call_block_connected(&header, 1, &[tx; 1], &[chan_id as u32; 1]);
+               chain.watch_util.block_connected_checked(&header, 1, &[tx; 1], &[chan_id as u32; 1]);
                for i in 2..100 {
                        header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-                       chain.watch_util.do_call_block_connected(&header, i, &[tx; 0], &[0; 0]);
+                       chain.watch_util.block_connected_checked(&header, i, &[tx; 0], &[0; 0]);
                }
        }
 
index e463b936e61011dd5b2adfab2ef2f87d387495d0..ac08bf5a37bbfa402a1b40179986c5d1cdc6b4f9 100644 (file)
@@ -13,7 +13,7 @@ use secp256k1::key::{SecretKey,PublicKey};
 use ln::msgs::HandleError;
 use ln::chan_utils;
 use ln::chan_utils::HTLCOutputInCommitment;
-use chain::chaininterface::{ChainListener,ChainWatchInterface};
+use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface};
 
 use std::collections::HashMap;
 use std::sync::{Arc,Mutex};
@@ -39,13 +39,14 @@ pub trait ManyChannelMonitor: Send + Sync {
 pub struct SimpleManyChannelMonitor<Key> {
        monitors: Mutex<HashMap<Key, ChannelMonitor>>,
        chain_monitor: Arc<ChainWatchInterface>,
+       broadcaster: Arc<BroadcasterInterface>
 }
 
 impl<Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelMonitor<Key> {
        fn block_connected(&self, _header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
                let monitors = self.monitors.lock().unwrap();
                for monitor in monitors.values() {
-                       monitor.block_connected(txn_matched, height, &*self.chain_monitor);
+                       monitor.block_connected(txn_matched, height, &*self.broadcaster);
                }
        }
 
@@ -53,10 +54,11 @@ impl<Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelMonit
 }
 
 impl<Key : Send + cmp::Eq + hash::Hash + 'static> SimpleManyChannelMonitor<Key> {
-       pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> Arc<SimpleManyChannelMonitor<Key>> {
+       pub fn new(chain_monitor: Arc<ChainWatchInterface>, broadcaster: Arc<BroadcasterInterface>) -> Arc<SimpleManyChannelMonitor<Key>> {
                let res = Arc::new(SimpleManyChannelMonitor {
                        monitors: Mutex::new(HashMap::new()),
-                       chain_monitor: chain_monitor,
+                       chain_monitor,
+                       broadcaster
                });
                let weak_res = Arc::downgrade(&res);
                res.chain_monitor.register_listener(weak_res);
@@ -443,7 +445,7 @@ impl ChannelMonitor {
                txn_to_broadcast
        }
 
-       fn block_connected(&self, txn_matched: &[&Transaction], height: u32, chain_monitor: &ChainWatchInterface) {
+       fn block_connected(&self, txn_matched: &[&Transaction], height: u32, broadcaster: &BroadcasterInterface) {
                for tx in txn_matched {
                        if tx.input.len() != 1 {
                                // We currently only ever sign something spending a commitment or HTLC
@@ -454,7 +456,7 @@ impl ChannelMonitor {
                        for txin in tx.input.iter() {
                                if self.funding_txo.is_none() || (txin.prev_hash == self.funding_txo.unwrap().0 && txin.prev_index == self.funding_txo.unwrap().1 as u32) {
                                        for tx in self.check_spend_transaction(tx, height).iter() {
-                                               chain_monitor.broadcast_transaction(tx);
+                                               broadcaster.broadcast_transaction(tx); // TODO: use result
                                        }
                                }
                        }
index beb50940e45dbedb7548c1b1dbfc4acf76f64c8f..1626b60b2ede3dda06ce2823fc5e95ca1d03c2d8 100644 (file)
@@ -31,9 +31,6 @@ impl chaininterface::ChainWatchInterface for TestWatchInterface {
        fn watch_all_txn(&self) {
                unimplemented!();
        }
-       fn broadcast_transaction(&self, _tx: &Transaction) {
-               unimplemented!();
-       }
        fn register_listener(&self, listener: Weak<chaininterface::ChainListener>) {
                self.watch_util.register_listener(listener);
        }