X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchaininterface.rs;h=b30ebc1ee1629441ca7ed95e4e27a2518279ad35;hb=79c8491120ecc2bc305766a8249d9b004e642135;hp=c0330fb2963ea868d753c16dd633d5cbbd8890c9;hpb=126b514168ff8294f6ee7b9573797c6759512b9c;p=rust-lightning diff --git a/lightning/src/chain/chaininterface.rs b/lightning/src/chain/chaininterface.rs index c0330fb2..b30ebc1e 100644 --- a/lightning/src/chain/chaininterface.rs +++ b/lightning/src/chain/chaininterface.rs @@ -14,11 +14,15 @@ use bitcoin::network::constants::Network; use util::logger::Logger; -use std::sync::{Mutex,Weak,MutexGuard,Arc}; +use std::sync::{Mutex, MutexGuard, Arc}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::collections::HashSet; +use std::ops::Deref; +use std::marker::PhantomData; +use std::ptr; /// Used to give chain error details upstream +#[derive(Clone)] pub enum ChainError { /// Client doesn't support UTXO lookup (but the chain hash matches our genesis block hash) NotSupported, @@ -45,16 +49,20 @@ pub trait ChainWatchInterface: Sync + Send { /// Indicates that a listener needs to see all transactions. fn watch_all_txn(&self); - /// Register the given listener to receive events. Only a weak pointer is provided and the - /// registration should be freed once that pointer expires. - fn register_listener(&self, listener: Weak); - //TODO: unregister - /// Gets the script and value in satoshis for a given unspent transaction output given a /// short_channel_id (aka unspent_tx_output_identier). For BTC/tBTC channels the top three /// bytes are the block height, the next 3 the transaction index within the block, and the /// final two the output within the transaction. fn get_chain_utxo(&self, genesis_hash: Sha256dHash, unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError>; + + /// Gets the list of transactions and transaction indices that the ChainWatchInterface is + /// watching for. + fn filter_block<'a>(&self, block: &'a Block) -> (Vec<&'a Transaction>, Vec); + + /// Returns a usize that changes when the ChainWatchInterface's watched data is modified. + /// Users of `filter_block` should pre-save a copy of `reentered`'s return value and use it to + /// determine whether they need to re-filter a given block. + fn reentered(&self) -> usize; } /// An interface to send a transaction to the Bitcoin network. @@ -66,11 +74,16 @@ pub trait BroadcasterInterface: Sync + Send { /// A trait indicating a desire to listen for events from the chain pub trait ChainListener: Sync + Send { /// Notifies a listener that a block was connected. - /// Note that if a new transaction/outpoint is watched during a block_connected call, the block - /// *must* be re-scanned with the new transaction/outpoints and block_connected should be - /// called again with the same header and (at least) the new transactions. /// - /// Note that if non-new transaction/outpoints may be registered during a call, a second call + /// The txn_matched array should be set to references to transactions which matched the + /// relevant installed watch outpoints/txn, or the full set of transactions in the block. + /// + /// Note that if txn_matched includes only matched transactions, and a new + /// transaction/outpoint is watched during a block_connected call, the block *must* be + /// re-scanned with the new transaction/outpoints and block_connected should be called + /// again with the same header and (at least) the new transactions. + /// + /// Note that if non-new transaction/outpoints are be registered during a call, a second call /// *must not* happen. /// /// This also means those counting confirmations using block_connected callbacks should watch @@ -111,7 +124,11 @@ pub trait FeeEstimator: Sync + Send { fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u64; } +/// Minimum relay fee as required by bitcoin network mempool policy. +pub const MIN_RELAY_FEE_SAT_PER_1000_WEIGHT: u64 = 4000; + /// Utility for tracking registered txn/outpoints and checking for matches +#[cfg_attr(test, derive(PartialEq))] pub struct ChainWatchedUtil { watch_all: bool, @@ -198,17 +215,121 @@ impl ChainWatchedUtil { } } +/// BlockNotifierArc is useful when you need a BlockNotifier that points to ChainListeners with +/// static lifetimes, e.g. when you're using lightning-net-tokio (since tokio::spawn requires +/// parameters with static lifetimes). Other times you can afford a reference, which is more +/// efficient, in which case BlockNotifierRef is a more appropriate type. Defining these type +/// aliases prevents issues such as overly long function definitions. +pub type BlockNotifierArc = Arc>>; + +/// BlockNotifierRef is useful when you want a BlockNotifier that points to ChainListeners +/// with nonstatic lifetimes. This is useful for when static lifetimes are not needed. Nonstatic +/// lifetimes are more efficient but less flexible, and should be used by default unless static +/// lifetimes are required, e.g. when you're using lightning-net-tokio (since tokio::spawn +/// requires parameters with static lifetimes), in which case BlockNotifierArc is a more +/// appropriate type. Defining these type aliases for common usages prevents issues such as +/// overly long function definitions. +pub type BlockNotifierRef<'a> = BlockNotifier<'a, &'a ChainListener>; + +/// Utility for notifying listeners about new blocks, and handling block rescans if new watch +/// data is registered. +/// +/// Rather than using a plain BlockNotifier, it is preferable to use either a BlockNotifierArc +/// or a BlockNotifierRef for conciseness. See their documentation for more details, but essentially +/// you should default to using a BlockNotifierRef, and use a BlockNotifierArc instead when you +/// require ChainListeners with static lifetimes, such as when you're using lightning-net-tokio. +pub struct BlockNotifier<'a, CL: Deref + 'a> { + listeners: Mutex>, + chain_monitor: Arc, + phantom: PhantomData<&'a ()>, +} + +impl<'a, CL: Deref + 'a> BlockNotifier<'a, CL> { + /// Constructs a new BlockNotifier without any listeners. + pub fn new(chain_monitor: Arc) -> BlockNotifier<'a, CL> { + BlockNotifier { + listeners: Mutex::new(Vec::new()), + chain_monitor, + phantom: PhantomData, + } + } + + /// Register the given listener to receive events. + pub fn register_listener(&self, listener: CL) { + let mut vec = self.listeners.lock().unwrap(); + vec.push(listener); + } + /// Unregister the given listener to no longer + /// receive events. + /// + /// If the same listener is registered multiple times, unregistering + /// will remove ALL occurrences of that listener. Comparison is done using + /// the pointer returned by the Deref trait implementation. + pub fn unregister_listener(&self, listener: CL) { + let mut vec = self.listeners.lock().unwrap(); + // item is a ref to an abstract thing that dereferences to a ChainListener, + // so dereference it twice to get the ChainListener itself + vec.retain(|item | !ptr::eq(&(**item), &(*listener))); + } + + /// Notify listeners that a block was connected given a full, unfiltered block. + /// + /// Handles re-scanning the block and calling block_connected again if listeners register new + /// watch data during the callbacks for you (see ChainListener::block_connected for more info). + pub fn block_connected<'b>(&self, block: &'b Block, height: u32) { + let mut reentered = true; + while reentered { + let (matched, matched_index) = self.chain_monitor.filter_block(block); + reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice()); + } + } + + /// Notify listeners that a block was connected, given pre-filtered list of transactions in the + /// block which matched the filter (probably using does_match_tx). + /// + /// Returns true if notified listeners registered additional watch data (implying that the + /// block must be re-scanned and this function called again prior to further block_connected + /// calls, see ChainListener::block_connected for more info). + pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool { + let last_seen = self.chain_monitor.reentered(); + + let listeners = self.listeners.lock().unwrap(); + for listener in listeners.iter() { + listener.block_connected(header, height, txn_matched, indexes_of_txn_matched); + } + return last_seen != self.chain_monitor.reentered(); + } + + /// Notify listeners that a block was disconnected. + pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) { + let listeners = self.listeners.lock().unwrap(); + for listener in listeners.iter() { + listener.block_disconnected(&header, disconnected_height); + } + } +} + /// Utility to capture some common parts of ChainWatchInterface implementors. /// /// Keeping a local copy of this in a ChainWatchInterface implementor is likely useful. pub struct ChainWatchInterfaceUtil { network: Network, watched: Mutex, - listeners: Mutex>>, reentered: AtomicUsize, logger: Arc, } +// We only expose PartialEq in test since its somewhat unclear exactly what it should do and we're +// only comparing a subset of fields (essentially just checking that the set of things we're +// watching is the same). +#[cfg(test)] +impl PartialEq for ChainWatchInterfaceUtil { + fn eq(&self, o: &Self) -> bool { + self.network == o.network && + *self.watched.lock().unwrap() == *o.watched.lock().unwrap() + } +} + /// Register listener impl ChainWatchInterface for ChainWatchInterfaceUtil { fn install_watch_tx(&self, txid: &Sha256dHash, script_pub_key: &Script) { @@ -232,17 +353,31 @@ impl ChainWatchInterface for ChainWatchInterfaceUtil { } } - fn register_listener(&self, listener: Weak) { - let mut vec = self.listeners.lock().unwrap(); - vec.push(listener); - } - fn get_chain_utxo(&self, genesis_hash: Sha256dHash, _unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError> { if genesis_hash != genesis_block(self.network).header.bitcoin_hash() { return Err(ChainError::NotWatched); } Err(ChainError::NotSupported) } + + fn filter_block<'a>(&self, block: &'a Block) -> (Vec<&'a Transaction>, Vec) { + let mut matched = Vec::new(); + let mut matched_index = Vec::new(); + { + let watched = self.watched.lock().unwrap(); + for (index, transaction) in block.txdata.iter().enumerate() { + if self.does_match_tx_unguarded(transaction, &watched) { + matched.push(transaction); + matched_index.push(index as u32); + } + } + } + (matched, matched_index) + } + + fn reentered(&self) -> usize { + self.reentered.load(Ordering::Relaxed) + } } impl ChainWatchInterfaceUtil { @@ -251,71 +386,95 @@ impl ChainWatchInterfaceUtil { ChainWatchInterfaceUtil { network: network, watched: Mutex::new(ChainWatchedUtil::new()), - listeners: Mutex::new(Vec::new()), reentered: AtomicUsize::new(1), logger: logger, } } - /// Notify listeners that a block was connected given a full, unfiltered block. - /// - /// Handles re-scanning the block and calling block_connected again if listeners register new - /// watch data during the callbacks for you (see ChainListener::block_connected for more info). - pub fn block_connected_with_filtering(&self, block: &Block, height: u32) { - let mut reentered = true; - while reentered { - let mut matched = Vec::new(); - let mut matched_index = Vec::new(); - { - let watched = self.watched.lock().unwrap(); - for (index, transaction) in block.txdata.iter().enumerate() { - if self.does_match_tx_unguarded(transaction, &watched) { - matched.push(transaction); - matched_index.push(index as u32); - } - } - } - reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice()); - } + /// Checks if a given transaction matches the current filter. + pub fn does_match_tx(&self, tx: &Transaction) -> bool { + let watched = self.watched.lock().unwrap(); + self.does_match_tx_unguarded (tx, &watched) } - /// Notify listeners that a block was disconnected. - pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) { - let listeners = self.listeners.lock().unwrap().clone(); - for listener in listeners.iter() { - match listener.upgrade() { - Some(arc) => arc.block_disconnected(&header, disconnected_height), - None => () - } - } + fn does_match_tx_unguarded(&self, tx: &Transaction, watched: &MutexGuard) -> bool { + watched.does_match_tx(tx) } +} - /// Notify listeners that a block was connected, given pre-filtered list of transactions in the - /// block which matched the filter (probably using does_match_tx). - /// - /// Returns true if notified listeners registered additional watch data (implying that the - /// block must be re-scanned and this function called again prior to further block_connected - /// calls, see ChainListener::block_connected for more info). - pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool { - let last_seen = self.reentered.load(Ordering::Relaxed); +#[cfg(test)] +mod tests { + use ln::functional_test_utils::{create_chanmon_cfgs, create_node_cfgs}; + use super::{BlockNotifier, ChainListener}; + use std::ptr; + + #[test] + fn register_listener_test() { + let chanmon_cfgs = create_chanmon_cfgs(1); + let node_cfgs = create_node_cfgs(1, &chanmon_cfgs); + let block_notifier = BlockNotifier::new(node_cfgs[0].chain_monitor.clone()); + assert_eq!(block_notifier.listeners.lock().unwrap().len(), 0); + let listener = &node_cfgs[0].chan_monitor.simple_monitor as &ChainListener; + block_notifier.register_listener(listener); + let vec = block_notifier.listeners.lock().unwrap(); + assert_eq!(vec.len(), 1); + let item = vec.first().clone().unwrap(); + assert!(ptr::eq(&(**item), &(*listener))); + } - let listeners = self.listeners.lock().unwrap().clone(); - for listener in listeners.iter() { - match listener.upgrade() { - Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched), - None => () - } - } - return last_seen != self.reentered.load(Ordering::Relaxed); + #[test] + fn unregister_single_listener_test() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let block_notifier = BlockNotifier::new(node_cfgs[0].chain_monitor.clone()); + let listener1 = &node_cfgs[0].chan_monitor.simple_monitor as &ChainListener; + let listener2 = &node_cfgs[1].chan_monitor.simple_monitor as &ChainListener; + block_notifier.register_listener(listener1); + block_notifier.register_listener(listener2); + let vec = block_notifier.listeners.lock().unwrap(); + assert_eq!(vec.len(), 2); + drop(vec); + block_notifier.unregister_listener(listener1); + let vec = block_notifier.listeners.lock().unwrap(); + assert_eq!(vec.len(), 1); + let item = vec.first().clone().unwrap(); + assert!(ptr::eq(&(**item), &(*listener2))); } - /// Checks if a given transaction matches the current filter. - pub fn does_match_tx(&self, tx: &Transaction) -> bool { - let watched = self.watched.lock().unwrap(); - self.does_match_tx_unguarded (tx, &watched) + #[test] + fn unregister_single_listener_ref_test() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let block_notifier = BlockNotifier::new(node_cfgs[0].chain_monitor.clone()); + block_notifier.register_listener(&node_cfgs[0].chan_monitor.simple_monitor as &ChainListener); + block_notifier.register_listener(&node_cfgs[1].chan_monitor.simple_monitor as &ChainListener); + let vec = block_notifier.listeners.lock().unwrap(); + assert_eq!(vec.len(), 2); + drop(vec); + block_notifier.unregister_listener(&node_cfgs[0].chan_monitor.simple_monitor); + let vec = block_notifier.listeners.lock().unwrap(); + assert_eq!(vec.len(), 1); + let item = vec.first().clone().unwrap(); + assert!(ptr::eq(&(**item), &(*&node_cfgs[1].chan_monitor.simple_monitor))); } - fn does_match_tx_unguarded(&self, tx: &Transaction, watched: &MutexGuard) -> bool { - watched.does_match_tx(tx) + #[test] + fn unregister_multiple_of_the_same_listeners_test() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let block_notifier = BlockNotifier::new(node_cfgs[0].chain_monitor.clone()); + let listener1 = &node_cfgs[0].chan_monitor.simple_monitor as &ChainListener; + let listener2 = &node_cfgs[1].chan_monitor.simple_monitor as &ChainListener; + block_notifier.register_listener(listener1); + block_notifier.register_listener(listener1); + block_notifier.register_listener(listener2); + let vec = block_notifier.listeners.lock().unwrap(); + assert_eq!(vec.len(), 3); + drop(vec); + block_notifier.unregister_listener(listener1); + let vec = block_notifier.listeners.lock().unwrap(); + assert_eq!(vec.len(), 1); + let item = vec.first().clone().unwrap(); + assert!(ptr::eq(&(**item), &(*listener2))); } }