From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Thu, 15 Apr 2021 14:25:54 +0000 (+0000) Subject: Merge pull request #858 from jkczyz/2021-03-electrum-interface X-Git-Tag: v0.0.14~24 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=feeb89305a449703a341ce52bca8b3194c716f60;hp=9397db6119a63acb7fb0b8f56134b2f854077acc;p=rust-lightning Merge pull request #858 from jkczyz/2021-03-electrum-interface Electrum interface for ChannelMonitor --- diff --git a/background-processor/src/lib.rs b/background-processor/src/lib.rs index 3f11f4e3..30d9f6c5 100644 --- a/background-processor/src/lib.rs +++ b/background-processor/src/lib.rs @@ -131,7 +131,7 @@ mod tests { use lightning::chain::keysinterface::{Sign, InMemorySigner, KeysInterface, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; - use lightning::ln::channelmanager::{ChainParameters, ChannelManager, SimpleArcChannelManager}; + use lightning::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::InitFeatures; use lightning::ln::msgs::ChannelMessageHandler; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; @@ -192,14 +192,12 @@ mod tests { let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); let seed = [i as u8; 32]; let network = Network::Testnet; - let genesis_block = genesis_block(network); - let now = Duration::from_secs(genesis_block.header.time as u64); + let now = Duration::from_secs(genesis_block(network).header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); let params = ChainParameters { network, - latest_hash: genesis_block.block_hash(), - latest_height: 0, + best_block: BestBlock::from_genesis(network), }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params)); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index a6a6a853..9d0a90ee 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -36,7 +36,7 @@ use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, use lightning::chain::transaction::OutPoint; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::chain::keysinterface::{KeysInterface, InMemorySigner}; -use lightning::ln::channelmanager::{ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret, PaymentSendFailure, ChannelManagerReadArgs}; +use lightning::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret, PaymentSendFailure, ChannelManagerReadArgs}; use lightning::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use lightning::ln::msgs::{CommitmentUpdate, ChannelMessageHandler, DecodeError, ErrorAction, UpdateAddHTLC, Init}; use lightning::util::enforcing_trait_impls::{EnforcingSigner, INITIAL_REVOKED_COMMITMENT_NUMBER}; @@ -322,8 +322,7 @@ pub fn do_test(data: &[u8], out: Out) { let network = Network::Bitcoin; let params = ChainParameters { network, - latest_hash: genesis_block(network).block_hash(), - latest_height: 0, + best_block: BestBlock::from_genesis(network), }; (ChannelManager::new(fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, params), monitor, keys_manager) diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 9e4f18b5..91ebb990 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -32,7 +32,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, use lightning::chain::chainmonitor; use lightning::chain::transaction::OutPoint; use lightning::chain::keysinterface::{InMemorySigner, KeysInterface}; -use lightning::ln::channelmanager::{ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret}; +use lightning::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret}; use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor}; use lightning::ln::msgs::DecodeError; use lightning::routing::router::get_route; @@ -355,15 +355,13 @@ pub fn do_test(data: &[u8], logger: &Arc) { config.channel_options.announced_channel = get_slice!(1)[0] != 0; config.peer_channel_config_limits.min_dust_limit_satoshis = 0; let network = Network::Bitcoin; - let genesis_hash = genesis_block(network).block_hash(); let params = ChainParameters { network, - latest_hash: genesis_hash, - latest_height: 0, + best_block: BestBlock::from_genesis(network), }; let channelmanager = Arc::new(ChannelManager::new(fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, params)); let our_id = PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()); - let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(genesis_hash, None, Arc::clone(&logger))); + let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(genesis_block(network).block_hash(), None, Arc::clone(&logger))); let peers = RefCell::new([false; 256]); let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler { diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 0fd08801..ee6df63a 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -24,12 +24,13 @@ //! servicing [`ChannelMonitor`] updates from the client. use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::hash_types::Txid; use chain; use chain::{Filter, WatchedOutput}; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use chain::channelmonitor; -use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist}; +use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist, TransactionOutputs}; use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; use util::logger::Logger; @@ -82,24 +83,77 @@ where C::Target: chain::Filter, /// descendants of such transactions. It is not necessary to re-fetch the block to obtain /// updated `txdata`. pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { + self.process_chain_data(header, txdata, |monitor, txdata| { + monitor.block_connected( + header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + }); + } + + /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view + /// of a channel and reacting accordingly to newly confirmed transactions. For details, see + /// [`ChannelMonitor::transactions_confirmed`]. + /// + /// Used instead of [`block_connected`] by clients that are notified of transactions rather than + /// blocks. May be called before or after [`update_best_block`] for transactions in the + /// corresponding block. See [`update_best_block`] for further calling expectations. + /// + /// [`block_connected`]: Self::block_connected + /// [`update_best_block`]: Self::update_best_block + pub fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { + self.process_chain_data(header, txdata, |monitor, txdata| { + monitor.transactions_confirmed( + header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + }); + } + + /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view + /// of a channel and reacting accordingly based on the new chain tip. For details, see + /// [`ChannelMonitor::update_best_block`]. + /// + /// Used instead of [`block_connected`] by clients that are notified of transactions rather than + /// blocks. May be called before or after [`transactions_confirmed`] for the corresponding + /// block. + /// + /// Must be called after new blocks become available for the most recent block. Intermediary + /// blocks, however, may be safely skipped. In the event of a chain re-organization, this only + /// needs to be called for the most recent block assuming `transaction_unconfirmed` is called + /// for any affected transactions. + /// + /// [`block_connected`]: Self::block_connected + /// [`transactions_confirmed`]: Self::transactions_confirmed + /// [`transaction_unconfirmed`]: Self::transaction_unconfirmed + pub fn update_best_block(&self, header: &BlockHeader, height: u32) { + self.process_chain_data(header, &[], |monitor, txdata| { + // While in practice there shouldn't be any recursive calls when given empty txdata, + // it's still possible if a chain::Filter implementation returns a transaction. + debug_assert!(txdata.is_empty()); + monitor.update_best_block( + header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) + }); + } + + fn process_chain_data(&self, header: &BlockHeader, txdata: &TransactionData, process: FN) + where + FN: Fn(&ChannelMonitor, &TransactionData) -> Vec + { let mut dependent_txdata = Vec::new(); let monitors = self.monitors.read().unwrap(); for monitor in monitors.values() { - let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + let mut txn_outputs = process(monitor, txdata); // Register any new outputs with the chain source for filtering, storing any dependent // transactions from within the block that previously had not been included in txdata. if let Some(ref chain_source) = self.chain_source { let block_hash = header.block_hash(); - for (txid, outputs) in txn_outputs.drain(..) { - for (idx, output) in outputs.iter() { + for (txid, mut outputs) in txn_outputs.drain(..) { + for (idx, output) in outputs.drain(..) { // Register any new outputs with the chain source for filtering and recurse // if it indicates that there are dependent transactions within the block // that had not been previously included in txdata. let output = WatchedOutput { block_hash: Some(block_hash), - outpoint: OutPoint { txid, index: *idx as u16 }, - script_pubkey: output.script_pubkey.clone(), + outpoint: OutPoint { txid, index: idx as u16 }, + script_pubkey: output.script_pubkey, }; if let Some(tx) = chain_source.register_output(output) { dependent_txdata.push(tx); @@ -114,7 +168,7 @@ where C::Target: chain::Filter, dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index); dependent_txdata.dedup_by_key(|(index, _tx)| *index); let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect(); - self.block_connected(header, &txdata, height); + self.process_chain_data(header, &txdata, process); } } @@ -128,6 +182,36 @@ where C::Target: chain::Filter, } } + /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view + /// of a channel based on transactions unconfirmed as a result of a chain reorganization. See + /// [`ChannelMonitor::transaction_unconfirmed`] for details. + /// + /// Used instead of [`block_disconnected`] by clients that are notified of transactions rather + /// than blocks. May be called before or after [`update_best_block`] for transactions in the + /// corresponding block. See [`update_best_block`] for further calling expectations. + /// + /// [`block_disconnected`]: Self::block_disconnected + /// [`update_best_block`]: Self::update_best_block + pub fn transaction_unconfirmed(&self, txid: &Txid) { + let monitors = self.monitors.read().unwrap(); + for monitor in monitors.values() { + monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + } + } + + /// Returns the set of txids that should be monitored for re-organization out of the chain. + pub fn get_relevant_txids(&self) -> Vec { + let mut txids = Vec::new(); + let monitors = self.monitors.read().unwrap(); + for monitor in monitors.values() { + txids.append(&mut monitor.get_relevant_txids()); + } + + txids.sort_unstable(); + txids.dedup(); + txids + } + /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels. /// /// When an optional chain source implementing [`chain::Filter`] is provided, the chain monitor diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 939337d7..777bc06b 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -37,7 +37,7 @@ use bitcoin::secp256k1; use ln::msgs::DecodeError; use ln::chan_utils; use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLCType, ChannelTransactionParameters, HolderCommitmentTransaction}; -use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash}; +use ln::channelmanager::{BestBlock, HTLCSource, PaymentPreimage, PaymentHash}; use ln::onchaintx::{OnchainTxHandler, InputDescriptors}; use chain; use chain::WatchedOutput; @@ -50,7 +50,7 @@ use util::ser::{Readable, ReadableArgs, MaybeReadable, Writer, Writeable, U48}; use util::byte_utils; use util::events::Event; -use std::collections::{HashMap, HashSet, hash_map}; +use std::collections::{HashMap, HashSet}; use std::{cmp, mem}; use std::io::Error; use std::ops::Deref; @@ -465,9 +465,30 @@ pub(crate) struct ClaimRequest { pub(crate) witness_data: InputMaterial } +/// An entry for an [`OnchainEvent`], stating the block height when the event was observed and the +/// transaction causing it. +/// +/// Used to determine when the on-chain event can be considered safe from a chain reorganization. +#[derive(PartialEq)] +struct OnchainEventEntry { + txid: Txid, + height: u32, + event: OnchainEvent, +} + +impl OnchainEventEntry { + fn confirmation_threshold(&self) -> u32 { + self.height + ANTI_REORG_DELAY - 1 + } + + fn has_reached_confirmation_threshold(&self, height: u32) -> bool { + height >= self.confirmation_threshold() + } +} + /// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it /// once they mature to enough confirmations (ANTI_REORG_DELAY) -#[derive(Clone, PartialEq)] +#[derive(PartialEq)] enum OnchainEvent { /// HTLC output getting solved by a timeout, at maturation we pass upstream payment source information to solve /// inbound HTLC in backward channel. Note, in case of preimage, we pass info to upstream without delay as we can @@ -684,10 +705,10 @@ pub(crate) struct ChannelMonitorImpl { pending_monitor_events: Vec, pending_events: Vec, - // Used to track onchain events, i.e transactions parts of channels confirmed on chain, on which - // we have to take actions once they reach enough confs. Key is a block height timer, i.e we enforce - // actions when we receive a block with given height. Actions depend on OnchainEvent type. - onchain_events_waiting_threshold_conf: HashMap>, + // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on + // which to take actions once they reach enough confirmations. Each entry includes the + // transaction's id and the height when the transaction was confirmed on chain. + onchain_events_awaiting_threshold_conf: Vec, // If we get serialized out and re-read, we need to make sure that the chain monitoring // interface knows about the TXOs that we want to be notified of spends of. We could probably @@ -714,15 +735,19 @@ pub(crate) struct ChannelMonitorImpl { // remote monitor out-of-order with regards to the block view. holder_tx_signed: bool, - // We simply modify last_block_hash in Channel's block_connected so that serialization is + // We simply modify best_block in Channel's block_connected so that serialization is // consistent but hopefully the users' copy handles block_connected in a consistent way. // (we do *not*, however, update them in update_monitor to ensure any local user copies keep - // their last_block_hash from its state and not based on updated copies that didn't run through + // their best_block from its state and not based on updated copies that didn't run through // the full block_connected). - last_block_hash: BlockHash, + best_block: BestBlock, + secp_ctx: Secp256k1, //TODO: dedup this a bit... } +/// Transaction outputs to watch for on-chain spends. +pub(super) type TransactionOutputs = (Txid, Vec<(u32, TxOut)>); + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] /// Used only in testing and fuzztarget to check serialization roundtrips don't change the /// underlying object @@ -765,7 +790,7 @@ impl PartialEq for ChannelMonitorImpl { self.payment_preimages != other.payment_preimages || self.pending_monitor_events != other.pending_monitor_events || self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly - self.onchain_events_waiting_threshold_conf != other.onchain_events_waiting_threshold_conf || + self.onchain_events_awaiting_threshold_conf != other.onchain_events_awaiting_threshold_conf || self.outputs_to_watch != other.outputs_to_watch || self.lockdown_from_offchain != other.lockdown_from_offchain || self.holder_tx_signed != other.holder_tx_signed @@ -931,24 +956,23 @@ impl Writeable for ChannelMonitorImpl { event.write(writer)?; } - self.last_block_hash.write(writer)?; + self.best_block.block_hash().write(writer)?; + writer.write_all(&byte_utils::be32_to_array(self.best_block.height()))?; - writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?; - for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() { - writer.write_all(&byte_utils::be32_to_array(**target))?; - writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?; - for ev in events.iter() { - match *ev { - OnchainEvent::HTLCUpdate { ref htlc_update } => { - 0u8.write(writer)?; - htlc_update.0.write(writer)?; - htlc_update.1.write(writer)?; - }, - OnchainEvent::MaturingOutput { ref descriptor } => { - 1u8.write(writer)?; - descriptor.write(writer)?; - }, - } + writer.write_all(&byte_utils::be64_to_array(self.onchain_events_awaiting_threshold_conf.len() as u64))?; + for ref entry in self.onchain_events_awaiting_threshold_conf.iter() { + entry.txid.write(writer)?; + writer.write_all(&byte_utils::be32_to_array(entry.height))?; + match entry.event { + OnchainEvent::HTLCUpdate { ref htlc_update } => { + 0u8.write(writer)?; + htlc_update.0.write(writer)?; + htlc_update.1.write(writer)?; + }, + OnchainEvent::MaturingOutput { ref descriptor } => { + 1u8.write(writer)?; + descriptor.write(writer)?; + }, } } @@ -977,7 +1001,7 @@ impl ChannelMonitor { funding_redeemscript: Script, channel_value_satoshis: u64, commitment_transaction_number_obscure_factor: u64, initial_holder_commitment_tx: HolderCommitmentTransaction, - last_block_hash: BlockHash) -> ChannelMonitor { + best_block: BestBlock) -> ChannelMonitor { assert!(commitment_transaction_number_obscure_factor <= (1 << 48)); let our_channel_close_key_hash = WPubkeyHash::hash(&shutdown_pubkey.serialize()); @@ -1056,7 +1080,7 @@ impl ChannelMonitor { pending_monitor_events: Vec::new(), pending_events: Vec::new(), - onchain_events_waiting_threshold_conf: HashMap::new(), + onchain_events_awaiting_threshold_conf: Vec::new(), outputs_to_watch, onchain_tx_handler, @@ -1064,7 +1088,8 @@ impl ChannelMonitor { lockdown_from_offchain: false, holder_tx_signed: false, - last_block_hash, + best_block, + secp_ctx, }), } @@ -1254,7 +1279,7 @@ impl ChannelMonitor { broadcaster: B, fee_estimator: F, logger: L, - ) -> Vec<(Txid, Vec<(u32, TxOut)>)> + ) -> Vec where B::Target: BroadcasterInterface, F::Target: FeeEstimator, @@ -1281,6 +1306,101 @@ impl ChannelMonitor { self.inner.lock().unwrap().block_disconnected( header, height, broadcaster, fee_estimator, logger) } + + /// Processes transactions confirmed in a block with the given header and height, returning new + /// outputs to watch. See [`block_connected`] for details. + /// + /// Used instead of [`block_connected`] by clients that are notified of transactions rather than + /// blocks. May be called before or after [`update_best_block`] for transactions in the + /// corresponding block. See [`update_best_block`] for further calling expectations. + /// + /// [`block_connected`]: Self::block_connected + /// [`update_best_block`]: Self::update_best_block + pub fn transactions_confirmed( + &self, + header: &BlockHeader, + txdata: &TransactionData, + height: u32, + broadcaster: B, + fee_estimator: F, + logger: L, + ) -> Vec + where + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { + self.inner.lock().unwrap().transactions_confirmed( + header, txdata, height, broadcaster, fee_estimator, logger) + } + + /// Processes a transaction that was reorganized out of the chain. + /// + /// Used instead of [`block_disconnected`] by clients that are notified of transactions rather + /// than blocks. May be called before or after [`update_best_block`] for transactions in the + /// corresponding block. See [`update_best_block`] for further calling expectations. + /// + /// [`block_disconnected`]: Self::block_disconnected + /// [`update_best_block`]: Self::update_best_block + pub fn transaction_unconfirmed( + &self, + txid: &Txid, + broadcaster: B, + fee_estimator: F, + logger: L, + ) where + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { + self.inner.lock().unwrap().transaction_unconfirmed( + txid, broadcaster, fee_estimator, logger); + } + + /// Updates the monitor with the current best chain tip, returning new outputs to watch. See + /// [`block_connected`] for details. + /// + /// Used instead of [`block_connected`] by clients that are notified of transactions rather than + /// blocks. May be called before or after [`transactions_confirmed`] for the corresponding + /// block. + /// + /// Must be called after new blocks become available for the most recent block. Intermediary + /// blocks, however, may be safely skipped. In the event of a chain re-organization, this only + /// needs to be called for the most recent block assuming `transaction_unconfirmed` is called + /// for any affected transactions. + /// + /// [`block_connected`]: Self::block_connected + /// [`transactions_confirmed`]: Self::transactions_confirmed + /// [`transaction_unconfirmed`]: Self::transaction_unconfirmed + pub fn update_best_block( + &self, + header: &BlockHeader, + height: u32, + broadcaster: B, + fee_estimator: F, + logger: L, + ) -> Vec + where + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { + self.inner.lock().unwrap().update_best_block( + header, height, broadcaster, fee_estimator, logger) + } + + /// Returns the set of txids that should be monitored for re-organization out of the chain. + pub fn get_relevant_txids(&self) -> Vec { + let inner = self.inner.lock().unwrap(); + let mut txids: Vec = inner.onchain_events_awaiting_threshold_conf + .iter() + .map(|entry| entry.txid) + .chain(inner.onchain_tx_handler.get_relevant_txids().into_iter()) + .collect(); + txids.sort_unstable(); + txids.dedup(); + txids + } } impl ChannelMonitorImpl { @@ -1574,7 +1694,7 @@ impl ChannelMonitorImpl { /// HTLC-Success/HTLC-Timeout transactions. /// Return updates for HTLC pending in the channel and failed automatically by the broadcast of /// revoked counterparty commitment tx - fn check_spend_counterparty_transaction(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec, (Txid, Vec<(u32, TxOut)>)) where L::Target: Logger { + fn check_spend_counterparty_transaction(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec, TransactionOutputs) where L::Target: Logger { // Most secp and related errors trying to create keys means we have no hope of constructing // a spend transaction...so we return no transactions to broadcast let mut claimable_outpoints = Vec::new(); @@ -1639,24 +1759,24 @@ impl ChannelMonitorImpl { if let Some(ref outpoints) = self.counterparty_claimable_outpoints.get($txid) { for &(ref htlc, ref source_option) in outpoints.iter() { if let &Some(ref source) = source_option { - log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1); - match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) { - hash_map::Entry::Occupied(mut entry) => { - let e = entry.get_mut(); - e.retain(|ref event| { - match **event { - OnchainEvent::HTLCUpdate { ref htlc_update } => { - return htlc_update.0 != **source - }, - _ => true - } - }); - e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}); - } - hash_map::Entry::Vacant(entry) => { - entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]); + self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { + if entry.height != height { return true; } + match entry.event { + OnchainEvent::HTLCUpdate { ref htlc_update } => { + htlc_update.0 != **source + }, + _ => true, } - } + }); + let entry = OnchainEventEntry { + txid: *$txid, + height, + event: OnchainEvent::HTLCUpdate { + htlc_update: ((**source).clone(), htlc.payment_hash.clone()) + }, + }; + log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold()); + self.onchain_events_awaiting_threshold_conf.push(entry); } } } @@ -1705,23 +1825,22 @@ impl ChannelMonitorImpl { } } log_trace!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of counterparty commitment transaction", log_bytes!(htlc.payment_hash.0), $commitment_tx); - match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) { - hash_map::Entry::Occupied(mut entry) => { - let e = entry.get_mut(); - e.retain(|ref event| { - match **event { - OnchainEvent::HTLCUpdate { ref htlc_update } => { - return htlc_update.0 != **source - }, - _ => true - } - }); - e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}); - } - hash_map::Entry::Vacant(entry) => { - entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]); + self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { + if entry.height != height { return true; } + match entry.event { + OnchainEvent::HTLCUpdate { ref htlc_update } => { + htlc_update.0 != **source + }, + _ => true, } - } + }); + self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { + txid: *$txid, + height, + event: OnchainEvent::HTLCUpdate { + htlc_update: ((**source).clone(), htlc.payment_hash.clone()) + }, + }); } } } @@ -1786,7 +1905,7 @@ impl ChannelMonitorImpl { } /// Attempts to claim a counterparty HTLC-Success/HTLC-Timeout's outputs using the revocation key - fn check_spend_counterparty_htlc(&mut self, tx: &Transaction, commitment_number: u64, height: u32, logger: &L) -> (Vec, Option<(Txid, Vec<(u32, TxOut)>)>) where L::Target: Logger { + fn check_spend_counterparty_htlc(&mut self, tx: &Transaction, commitment_number: u64, height: u32, logger: &L) -> (Vec, Option) where L::Target: Logger { let htlc_txid = tx.txid(); if tx.input.len() != 1 || tx.output.len() != 1 || tx.input[0].witness.len() != 5 { return (Vec::new(), None) @@ -1855,31 +1974,29 @@ impl ChannelMonitorImpl { /// Attempts to claim any claimable HTLCs in a commitment transaction which was not (yet) /// revoked using data in holder_claimable_outpoints. /// Should not be used if check_spend_revoked_transaction succeeds. - fn check_spend_holder_transaction(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec, (Txid, Vec<(u32, TxOut)>)) where L::Target: Logger { + fn check_spend_holder_transaction(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec, TransactionOutputs) where L::Target: Logger { let commitment_txid = tx.txid(); let mut claim_requests = Vec::new(); let mut watch_outputs = Vec::new(); macro_rules! wait_threshold_conf { - ($height: expr, $source: expr, $commitment_tx: expr, $payment_hash: expr) => { - log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1); - match self.onchain_events_waiting_threshold_conf.entry($height + ANTI_REORG_DELAY - 1) { - hash_map::Entry::Occupied(mut entry) => { - let e = entry.get_mut(); - e.retain(|ref event| { - match **event { - OnchainEvent::HTLCUpdate { ref htlc_update } => { - return htlc_update.0 != $source - }, - _ => true - } - }); - e.push(OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)}); + ($source: expr, $commitment_tx: expr, $payment_hash: expr) => { + self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { + if entry.height != height { return true; } + match entry.event { + OnchainEvent::HTLCUpdate { ref htlc_update } => { + htlc_update.0 != $source + }, + _ => true, } - hash_map::Entry::Vacant(entry) => { - entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)}]); - } - } + }); + let entry = OnchainEventEntry { + txid: commitment_txid, + height, + event: OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash) }, + }; + log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, entry.confirmation_threshold()); + self.onchain_events_awaiting_threshold_conf.push(entry); } } @@ -1915,7 +2032,7 @@ impl ChannelMonitorImpl { for &(ref htlc, _, ref source) in &$holder_tx.htlc_outputs { if htlc.transaction_output_index.is_none() { if let &Some(ref source) = source { - wait_threshold_conf!(height, source.clone(), "lastest", htlc.payment_hash.clone()); + wait_threshold_conf!(source.clone(), "lastest", htlc.payment_hash.clone()); } } } @@ -1980,10 +2097,58 @@ impl ChannelMonitorImpl { return res } - pub fn block_connected(&mut self, header: &BlockHeader, txdata: &TransactionData, height: u32, broadcaster: B, fee_estimator: F, logger: L)-> Vec<(Txid, Vec<(u32, TxOut)>)> + pub fn block_connected(&mut self, header: &BlockHeader, txdata: &TransactionData, height: u32, broadcaster: B, fee_estimator: F, logger: L) -> Vec where B::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, + { + let block_hash = header.block_hash(); + log_trace!(logger, "New best block {} at height {}", block_hash, height); + self.best_block = BestBlock::new(block_hash, height); + + self.transactions_confirmed(header, txdata, height, broadcaster, fee_estimator, logger) + } + + fn update_best_block( + &mut self, + header: &BlockHeader, + height: u32, + broadcaster: B, + fee_estimator: F, + logger: L, + ) -> Vec + where + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { + let block_hash = header.block_hash(); + log_trace!(logger, "New best block {} at height {}", block_hash, height); + + if height > self.best_block.height() { + self.best_block = BestBlock::new(block_hash, height); + self.block_confirmed(height, vec![], vec![], vec![], broadcaster, fee_estimator, logger) + } else { + self.best_block = BestBlock::new(block_hash, height); + self.onchain_events_awaiting_threshold_conf.retain(|ref entry| entry.height <= height); + self.onchain_tx_handler.block_disconnected(height + 1, broadcaster, fee_estimator, logger); + Vec::new() + } + } + + fn transactions_confirmed( + &mut self, + header: &BlockHeader, + txdata: &TransactionData, + height: u32, + broadcaster: B, + fee_estimator: F, + logger: L, + ) -> Vec + where + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, { let txn_matched = self.filter_block(txdata); for tx in &txn_matched { @@ -2039,11 +2204,28 @@ impl ChannelMonitorImpl { self.is_paying_spendable_output(&tx, height, &logger); } + + self.block_confirmed(height, txn_matched, watch_outputs, claimable_outpoints, broadcaster, fee_estimator, logger) + } + + fn block_confirmed( + &mut self, + height: u32, + txn_matched: Vec<&Transaction>, + mut watch_outputs: Vec, + mut claimable_outpoints: Vec, + broadcaster: B, + fee_estimator: F, + logger: L, + ) -> Vec + where + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { let should_broadcast = self.would_broadcast_at_height(height, &logger); if should_broadcast { claimable_outpoints.push(ClaimRequest { absolute_timelock: height, aggregable: false, outpoint: BitcoinOutPoint { txid: self.funding_info.0.txid.clone(), vout: self.funding_info.0.index as u32 }, witness_data: InputMaterial::Funding { funding_redeemscript: self.funding_redeemscript.clone() }}); - } - if should_broadcast { self.pending_monitor_events.push(MonitorEvent::CommitmentTxBroadcasted(self.funding_info.0)); let commitment_tx = self.onchain_tx_handler.get_fully_signed_holder_tx(&self.funding_redeemscript); self.holder_tx_signed = true; @@ -2054,29 +2236,68 @@ impl ChannelMonitorImpl { } claimable_outpoints.append(&mut new_outpoints); } - if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) { - for ev in events { - match ev { - OnchainEvent::HTLCUpdate { htlc_update } => { - log_trace!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0)); - self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { - payment_hash: htlc_update.1, - payment_preimage: None, - source: htlc_update.0, - })); - }, - OnchainEvent::MaturingOutput { descriptor } => { - log_trace!(logger, "Descriptor {} has got enough confirmations to be passed upstream", log_spendable!(descriptor)); - self.pending_events.push(Event::SpendableOutputs { - outputs: vec![descriptor] - }); + + // Find which on-chain events have reached their confirmation threshold. + let onchain_events_awaiting_threshold_conf = + self.onchain_events_awaiting_threshold_conf.drain(..).collect::>(); + let mut onchain_events_reaching_threshold_conf = Vec::new(); + for entry in onchain_events_awaiting_threshold_conf { + if entry.has_reached_confirmation_threshold(height) { + onchain_events_reaching_threshold_conf.push(entry); + } else { + self.onchain_events_awaiting_threshold_conf.push(entry); + } + } + + // Used to check for duplicate HTLC resolutions. + #[cfg(debug_assertions)] + let unmatured_htlcs: Vec<_> = self.onchain_events_awaiting_threshold_conf + .iter() + .filter_map(|entry| match &entry.event { + OnchainEvent::HTLCUpdate { htlc_update } => Some(htlc_update.0.clone()), + OnchainEvent::MaturingOutput { .. } => None, + }) + .collect(); + #[cfg(debug_assertions)] + let mut matured_htlcs = Vec::new(); + + // Produce actionable events from on-chain events having reached their threshold. + for entry in onchain_events_reaching_threshold_conf.drain(..) { + match entry.event { + OnchainEvent::HTLCUpdate { htlc_update } => { + // Check for duplicate HTLC resolutions. + #[cfg(debug_assertions)] + { + debug_assert!( + unmatured_htlcs.iter().find(|&htlc| htlc == &htlc_update.0).is_none(), + "An unmature HTLC transaction conflicts with a maturing one; failed to \ + call either transaction_unconfirmed for the conflicting transaction \ + or block_disconnected for a block containing it."); + debug_assert!( + matured_htlcs.iter().find(|&htlc| htlc == &htlc_update.0).is_none(), + "A matured HTLC transaction conflicts with a maturing one; failed to \ + call either transaction_unconfirmed for the conflicting transaction \ + or block_disconnected for a block containing it."); + matured_htlcs.push(htlc_update.0.clone()); } + + log_trace!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0)); + self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate { + payment_hash: htlc_update.1, + payment_preimage: None, + source: htlc_update.0, + })); + }, + OnchainEvent::MaturingOutput { descriptor } => { + log_trace!(logger, "Descriptor {} has got enough confirmations to be passed upstream", log_spendable!(descriptor)); + self.pending_events.push(Event::SpendableOutputs { + outputs: vec![descriptor] + }); } } } self.onchain_tx_handler.update_claims_view(&txn_matched, claimable_outpoints, Some(height), &&*broadcaster, &&*fee_estimator, &&*logger); - self.last_block_hash = block_hash; // Determine new outputs to watch by comparing against previously known outputs to watch, // updating the latter in the process. @@ -2108,15 +2329,29 @@ impl ChannelMonitorImpl { { log_trace!(logger, "Block {} at height {} disconnected", header.block_hash(), height); - if let Some(_) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) { - //We may discard: - //- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected - //- maturing spendable output has transaction paying us has been disconnected - } + //We may discard: + //- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected + //- maturing spendable output has transaction paying us has been disconnected + self.onchain_events_awaiting_threshold_conf.retain(|ref entry| entry.height < height); self.onchain_tx_handler.block_disconnected(height, broadcaster, fee_estimator, logger); - self.last_block_hash = header.prev_blockhash; + self.best_block = BestBlock::new(header.prev_blockhash, height - 1); + } + + fn transaction_unconfirmed( + &mut self, + txid: &Txid, + broadcaster: B, + fee_estimator: F, + logger: L, + ) where + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { + self.onchain_events_awaiting_threshold_conf.retain(|ref entry| entry.txid != *txid); + self.onchain_tx_handler.transaction_unconfirmed(txid, broadcaster, fee_estimator, logger); } /// Filters a block's `txdata` for transactions spending watched outputs or for any child @@ -2344,24 +2579,22 @@ impl ChannelMonitorImpl { })); } } else { - log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1); - match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) { - hash_map::Entry::Occupied(mut entry) => { - let e = entry.get_mut(); - e.retain(|ref event| { - match **event { - OnchainEvent::HTLCUpdate { ref htlc_update } => { - return htlc_update.0 != source - }, - _ => true - } - }); - e.push(OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)}); - } - hash_map::Entry::Vacant(entry) => { - entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)}]); + self.onchain_events_awaiting_threshold_conf.retain(|ref entry| { + if entry.height != height { return true; } + match entry.event { + OnchainEvent::HTLCUpdate { ref htlc_update } => { + htlc_update.0 != source + }, + _ => true, } - } + }); + let entry = OnchainEventEntry { + txid: tx.txid(), + height, + event: OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash) }, + }; + log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), entry.confirmation_threshold()); + self.onchain_events_awaiting_threshold_conf.push(entry); } } } @@ -2420,16 +2653,13 @@ impl ChannelMonitorImpl { } } if let Some(spendable_output) = spendable_output { - log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), height + ANTI_REORG_DELAY - 1); - match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) { - hash_map::Entry::Occupied(mut entry) => { - let e = entry.get_mut(); - e.push(OnchainEvent::MaturingOutput { descriptor: spendable_output }); - } - hash_map::Entry::Vacant(entry) => { - entry.insert(vec![OnchainEvent::MaturingOutput { descriptor: spendable_output }]); - } - } + let entry = OnchainEventEntry { + txid: tx.txid(), + height: height, + event: OnchainEvent::MaturingOutput { descriptor: spendable_output.clone() }, + }; + log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), entry.confirmation_threshold()); + self.onchain_events_awaiting_threshold_conf.push(entry); } } } @@ -2692,34 +2922,30 @@ impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> } } - let last_block_hash: BlockHash = Readable::read(reader)?; + let best_block = BestBlock::new(Readable::read(reader)?, Readable::read(reader)?); let waiting_threshold_conf_len: u64 = Readable::read(reader)?; - let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128)); + let mut onchain_events_awaiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128)); for _ in 0..waiting_threshold_conf_len { - let height_target = Readable::read(reader)?; - let events_len: u64 = Readable::read(reader)?; - let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128)); - for _ in 0..events_len { - let ev = match ::read(reader)? { - 0 => { - let htlc_source = Readable::read(reader)?; - let hash = Readable::read(reader)?; - OnchainEvent::HTLCUpdate { - htlc_update: (htlc_source, hash) - } - }, - 1 => { - let descriptor = Readable::read(reader)?; - OnchainEvent::MaturingOutput { - descriptor - } - }, - _ => return Err(DecodeError::InvalidValue), - }; - events.push(ev); - } - onchain_events_waiting_threshold_conf.insert(height_target, events); + let txid = Readable::read(reader)?; + let height = Readable::read(reader)?; + let event = match ::read(reader)? { + 0 => { + let htlc_source = Readable::read(reader)?; + let hash = Readable::read(reader)?; + OnchainEvent::HTLCUpdate { + htlc_update: (htlc_source, hash) + } + }, + 1 => { + let descriptor = Readable::read(reader)?; + OnchainEvent::MaturingOutput { + descriptor + } + }, + _ => return Err(DecodeError::InvalidValue), + }; + onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid, height, event }); } let outputs_to_watch_len: u64 = Readable::read(reader)?; @@ -2743,7 +2969,7 @@ impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&keys_manager.get_secure_random_bytes()); - Ok((last_block_hash.clone(), ChannelMonitor { + Ok((best_block.block_hash(), ChannelMonitor { inner: Mutex::new(ChannelMonitorImpl { latest_update_id, commitment_transaction_number_obscure_factor, @@ -2780,7 +3006,7 @@ impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> pending_monitor_events, pending_events, - onchain_events_waiting_threshold_conf, + onchain_events_awaiting_threshold_conf, outputs_to_watch, onchain_tx_handler, @@ -2788,7 +3014,8 @@ impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> lockdown_from_offchain, holder_tx_signed, - last_block_hash, + best_block, + secp_ctx, }), })) @@ -2797,7 +3024,6 @@ impl<'a, Signer: Sign, K: KeysInterface> ReadableArgs<&'a K> #[cfg(test)] mod tests { - use bitcoin::blockdata::constants::genesis_block; use bitcoin::blockdata::script::{Script, Builder}; use bitcoin::blockdata::opcodes; use bitcoin::blockdata::transaction::{Transaction, TxIn, TxOut, SigHashType}; @@ -2811,7 +3037,7 @@ mod tests { use hex; use chain::channelmonitor::ChannelMonitor; use chain::transaction::OutPoint; - use ln::channelmanager::{PaymentPreimage, PaymentHash}; + use ln::channelmanager::{BestBlock, PaymentPreimage, PaymentHash}; use ln::onchaintx::{OnchainTxHandler, InputDescriptors}; use ln::chan_utils; use ln::chan_utils::{HTLCOutputInCommitment, ChannelPublicKeys, ChannelTransactionParameters, HolderCommitmentTransaction, CounterpartyChannelTransactionParameters}; @@ -2907,13 +3133,13 @@ mod tests { }; // Prune with one old state and a holder commitment tx holding a few overlaps with the // old state. - let last_block_hash = genesis_block(Network::Testnet).block_hash(); + let best_block = BestBlock::from_genesis(Network::Testnet); let monitor = ChannelMonitor::new(Secp256k1::new(), keys, &PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()), 0, &Script::new(), (OutPoint { txid: Txid::from_slice(&[43; 32]).unwrap(), index: 0 }, Script::new()), &channel_parameters, Script::new(), 46, 0, - HolderCommitmentTransaction::dummy(), last_block_hash); + HolderCommitmentTransaction::dummy(), best_block); monitor.provide_latest_holder_commitment_tx(HolderCommitmentTransaction::dummy(), preimages_to_holder_htlcs!(preimages[0..10])).unwrap(); let dummy_txid = dummy_tx.txid(); diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 6aee5e7e..5f987427 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -24,7 +24,7 @@ use bitcoin::secp256k1; use ln::features::{ChannelFeatures, InitFeatures}; use ln::msgs; use ln::msgs::{DecodeError, OptionalField, DataLossProtect}; -use ln::channelmanager::{PendingHTLCStatus, HTLCSource, HTLCFailReason, HTLCFailureMsg, PendingHTLCInfo, RAACommitmentOrder, PaymentPreimage, PaymentHash, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, MAX_LOCAL_BREAKDOWN_TIMEOUT}; +use ln::channelmanager::{BestBlock, PendingHTLCStatus, HTLCSource, HTLCFailReason, HTLCFailureMsg, PendingHTLCInfo, RAACommitmentOrder, PaymentPreimage, PaymentHash, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, MAX_LOCAL_BREAKDOWN_TIMEOUT}; use ln::chan_utils::{CounterpartyCommitmentSecrets, TxCreationKeys, HTLCOutputInCommitment, HTLC_SUCCESS_TX_WEIGHT, HTLC_TIMEOUT_TX_WEIGHT, make_funding_redeemscript, ChannelPublicKeys, CommitmentTransaction, HolderCommitmentTransaction, ChannelTransactionParameters, CounterpartyChannelTransactionParameters, MAX_HTLCS, get_commitment_transaction_number_obscure_factor}; use ln::chan_utils; use chain::chaininterface::{FeeEstimator,ConfirmationTarget}; @@ -1531,7 +1531,7 @@ impl Channel { &self.get_counterparty_pubkeys().funding_pubkey } - pub fn funding_created(&mut self, msg: &msgs::FundingCreated, last_block_hash: BlockHash, logger: &L) -> Result<(msgs::FundingSigned, ChannelMonitor), ChannelError> where L::Target: Logger { + pub fn funding_created(&mut self, msg: &msgs::FundingCreated, best_block: BestBlock, logger: &L) -> Result<(msgs::FundingSigned, ChannelMonitor), ChannelError> where L::Target: Logger { if self.is_outbound() { return Err(ChannelError::Close("Received funding_created for an outbound channel?".to_owned())); } @@ -1585,7 +1585,7 @@ impl Channel { &self.channel_transaction_parameters, funding_redeemscript.clone(), self.channel_value_satoshis, obscure_factor, - holder_commitment_tx, last_block_hash); + holder_commitment_tx, best_block); channel_monitor.provide_latest_counterparty_commitment_tx(counterparty_initial_commitment_txid, Vec::new(), self.cur_counterparty_commitment_transaction_number, self.counterparty_cur_commitment_point.unwrap(), logger); @@ -1602,7 +1602,7 @@ impl Channel { /// Handles a funding_signed message from the remote end. /// If this call is successful, broadcast the funding transaction (and not before!) - pub fn funding_signed(&mut self, msg: &msgs::FundingSigned, last_block_hash: BlockHash, logger: &L) -> Result<(ChannelMonitor, Transaction), ChannelError> where L::Target: Logger { + pub fn funding_signed(&mut self, msg: &msgs::FundingSigned, best_block: BestBlock, logger: &L) -> Result<(ChannelMonitor, Transaction), ChannelError> where L::Target: Logger { if !self.is_outbound() { return Err(ChannelError::Close("Received funding_signed for an inbound channel?".to_owned())); } @@ -1655,7 +1655,7 @@ impl Channel { &self.channel_transaction_parameters, funding_redeemscript.clone(), self.channel_value_satoshis, obscure_factor, - holder_commitment_tx, last_block_hash); + holder_commitment_tx, best_block); channel_monitor.provide_latest_counterparty_commitment_tx(counterparty_initial_bitcoin_tx.txid, Vec::new(), self.cur_counterparty_commitment_transaction_number, self.counterparty_cur_commitment_point.unwrap(), logger); @@ -4825,7 +4825,7 @@ mod tests { use bitcoin::network::constants::Network; use bitcoin::hashes::hex::FromHex; use hex; - use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash}; + use ln::channelmanager::{BestBlock, HTLCSource, PaymentPreimage, PaymentHash}; use ln::channel::{Channel,InboundHTLCOutput,OutboundHTLCOutput,InboundHTLCState,OutboundHTLCState,HTLCOutputInCommitment,HTLCCandidate,HTLCInitiator,TxCreationKeys}; use ln::channel::MAX_FUNDING_SATOSHIS; use ln::features::InitFeatures; @@ -5038,8 +5038,8 @@ mod tests { let secp_ctx = Secp256k1::new(); let seed = [42; 32]; let network = Network::Testnet; - let chain_hash = genesis_block(network).header.block_hash(); - let last_block_hash = chain_hash; + let best_block = BestBlock::from_genesis(network); + let chain_hash = best_block.block_hash(); let keys_provider = test_utils::TestKeysInterface::new(&seed, network); // Go through the flow of opening a channel between two nodes. @@ -5065,10 +5065,10 @@ mod tests { }]}; let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 }; let funding_created_msg = node_a_chan.get_outbound_funding_created(tx.clone(), funding_outpoint, &&logger).unwrap(); - let (funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, last_block_hash, &&logger).unwrap(); + let (funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&logger).unwrap(); // Node B --> Node A: funding signed - let _ = node_a_chan.funding_signed(&funding_signed_msg, last_block_hash, &&logger); + let _ = node_a_chan.funding_signed(&funding_signed_msg, best_block, &&logger); // Now disconnect the two nodes and check that the commitment point in // Node B's channel_reestablish message is sane. diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index f9e00bc5..39426b3f 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -352,9 +352,6 @@ struct PeerState { latest_features: InitFeatures, } -#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))] -const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assume they're the same) for ChannelManager::latest_block_height"; - /// SimpleArcChannelManager is useful when you need a ChannelManager with a static lifetime, e.g. /// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static /// lifetimes). Other times you can afford a reference, which is more efficient, in which case @@ -424,10 +421,9 @@ pub struct ChannelManager, #[cfg(not(test))] - latest_block_height: AtomicUsize, - last_block_hash: RwLock, + best_block: RwLock, secp_ctx: Secp256k1, #[cfg(any(test, feature = "_test_utils"))] @@ -475,13 +471,38 @@ pub struct ChainParameters { /// The network for determining the `chain_hash` in Lightning messages. pub network: Network, - /// The hash of the latest block successfully connected. - pub latest_hash: BlockHash, - - /// The height of the latest block successfully connected. + /// The hash and height of the latest block successfully connected. /// /// Used to track on-chain channel funding outputs and send payments with reliable timelocks. - pub latest_height: usize, + pub best_block: BestBlock, +} + +/// The best known block as identified by its hash and height. +#[derive(Clone, Copy)] +pub struct BestBlock { + block_hash: BlockHash, + height: u32, +} + +impl BestBlock { + /// Returns the best block from the genesis of the given network. + pub fn from_genesis(network: Network) -> Self { + BestBlock { + block_hash: genesis_block(network).header.block_hash(), + height: 0, + } + } + + /// Returns the best block as identified by the given block hash and height. + pub fn new(block_hash: BlockHash, height: u32) -> Self { + BestBlock { block_hash, height } + } + + /// Returns the best block hash. + pub fn block_hash(&self) -> BlockHash { self.block_hash } + + /// Returns the best block height. + pub fn height(&self) -> u32 { self.height } } /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is @@ -822,8 +843,7 @@ impl ChannelMana chain_monitor, tx_broadcaster, - latest_block_height: AtomicUsize::new(params.latest_height), - last_block_hash: RwLock::new(params.latest_hash), + best_block: RwLock::new(params.best_block), channel_state: Mutex::new(ChannelHolder{ by_id: HashMap::new(), @@ -1176,7 +1196,7 @@ impl ChannelMana // HTLC_FAIL_BACK_BUFFER blocks to go. // Also, ensure that, in the case of an unknown payment hash, our payment logic has enough time to fail the HTLC backward // before our onchain logic triggers a channel closure (see HTLC_FAIL_BACK_BUFFER rational). - if (msg.cltv_expiry as u64) <= self.latest_block_height.load(Ordering::Acquire) as u64 + HTLC_FAIL_BACK_BUFFER as u64 + 1 { + if (msg.cltv_expiry as u64) <= self.best_block.read().unwrap().height() as u64 + HTLC_FAIL_BACK_BUFFER as u64 + 1 { return_err!("The final CLTV expiry is too soon to handle", 17, &[0;0]); } // final_incorrect_htlc_amount @@ -1299,7 +1319,7 @@ impl ChannelMana if (msg.cltv_expiry as u64) < (*outgoing_cltv_value) as u64 + chan.get_cltv_expiry_delta() as u64 { // incorrect_cltv_expiry break Some(("Forwarding node has tampered with the intended HTLC values or origin node has an obsolete cltv_expiry_delta", 0x1000 | 13, Some(self.get_channel_update(chan).unwrap()))); } - let cur_height = self.latest_block_height.load(Ordering::Acquire) as u32 + 1; + let cur_height = self.best_block.read().unwrap().height() + 1; // Theoretically, channel counterparty shouldn't send us a HTLC expiring now, but we want to be robust wrt to counterparty // packet sanitization (see HTLC_FAIL_BACK_BUFFER rational) if msg.cltv_expiry <= cur_height + HTLC_FAIL_BACK_BUFFER as u32 { // expiry_too_soon @@ -1516,7 +1536,7 @@ impl ChannelMana return Err(PaymentSendFailure::PathParameterError(path_errs)); } - let cur_height = self.latest_block_height.load(Ordering::Acquire) as u32 + 1; + let cur_height = self.best_block.read().unwrap().height() + 1; let mut results = Vec::new(); for path in route.paths.iter() { results.push(self.send_payment_along_path(&path, &payment_hash, payment_secret, total_value, cur_height)); @@ -1910,10 +1930,7 @@ impl ChannelMana for htlc in htlcs.iter() { let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec(); htlc_msat_height_data.extend_from_slice( - &byte_utils::be32_to_array( - self.latest_block_height.load(Ordering::Acquire) - as u32, - ), + &byte_utils::be32_to_array(self.best_block.read().unwrap().height()), ); failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id: htlc.prev_hop.short_channel_id, @@ -2033,8 +2050,7 @@ impl ChannelMana if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); } let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec(); htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array( - self.latest_block_height.load(Ordering::Acquire) as u32, - )); + self.best_block.read().unwrap().height())); self.fail_htlc_backwards_internal(channel_state.take().unwrap(), HTLCSource::PreviousHopData(htlc.prev_hop), payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 15, data: htlc_msat_height_data }); @@ -2248,8 +2264,7 @@ impl ChannelMana if (is_mpp && !valid_mpp) || (!is_mpp && (htlc.value < expected_amount || htlc.value > expected_amount * 2)) { let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec(); htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array( - self.latest_block_height.load(Ordering::Acquire) as u32, - )); + self.best_block.read().unwrap().height())); self.fail_htlc_backwards_internal(channel_state.take().unwrap(), HTLCSource::PreviousHopData(htlc.prev_hop), &payment_hash, HTLCFailReason::Reason { failure_code: 0x4000|15, data: htlc_msat_height_data }); @@ -2534,7 +2549,7 @@ impl ChannelMana fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> { let ((funding_msg, monitor), mut chan) = { - let last_block_hash = *self.last_block_hash.read().unwrap(); + let best_block = *self.best_block.read().unwrap(); let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.temporary_channel_id.clone()) { @@ -2542,7 +2557,7 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id)); } - (try_chan_entry!(self, chan.get_mut().funding_created(msg, last_block_hash, &self.logger), channel_state, chan), chan.remove()) + (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.logger), channel_state, chan), chan.remove()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id)) } @@ -2591,7 +2606,7 @@ impl ChannelMana fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> { let funding_tx = { - let last_block_hash = *self.last_block_hash.read().unwrap(); + let best_block = *self.best_block.read().unwrap(); let mut channel_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_lock; match channel_state.by_id.entry(msg.channel_id) { @@ -2599,7 +2614,7 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - let (monitor, funding_tx) = match chan.get_mut().funding_signed(&msg, last_block_hash, &self.logger) { + let (monitor, funding_tx) = match chan.get_mut().funding_signed(&msg, best_block, &self.logger) { Ok(update) => update, Err(e) => try_chan_entry!(self, Err(e), channel_state, chan), }; @@ -3339,24 +3354,30 @@ where L::Target: Logger, { fn block_connected(&self, block: &Block, height: u32) { - assert_eq!(*self.last_block_hash.read().unwrap(), block.header.prev_blockhash, - "Blocks must be connected in chain-order - the connected header must build on the last connected header"); - assert_eq!(self.latest_block_height.load(Ordering::Acquire) as u64, height as u64 - 1, - "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); + { + let best_block = self.best_block.read().unwrap(); + assert_eq!(best_block.block_hash(), block.header.prev_blockhash, + "Blocks must be connected in chain-order - the connected header must build on the last connected header"); + assert_eq!(best_block.height(), height - 1, + "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); + } + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); self.transactions_confirmed(&block.header, height, &txdata); self.update_best_block(&block.header, height); } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - assert_eq!(*self.last_block_hash.read().unwrap(), header.block_hash(), - "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); - let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); - let new_height = self.latest_block_height.fetch_sub(1, Ordering::AcqRel) as u32 - 1; - assert_eq!(new_height, height - 1, - "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); - *self.last_block_hash.write().unwrap() = header.prev_blockhash; + let new_height = height - 1; + { + let mut best_block = self.best_block.write().unwrap(); + assert_eq!(best_block.block_hash(), header.block_hash(), + "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); + assert_eq!(best_block.height(), height, + "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); + *best_block = BestBlock::new(header.prev_blockhash, new_height) + } self.do_chain_event(Some(new_height), |channel| channel.update_best_block(new_height, header.time)); } @@ -3513,8 +3534,7 @@ impl ChannelMana let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); - self.latest_block_height.store(height as usize, Ordering::Release); - *self.last_block_hash.write().unwrap() = block_hash; + *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); self.do_chain_event(Some(height), |channel| channel.update_best_block(height, header.time)); @@ -4147,8 +4167,11 @@ impl Writeable f writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?; self.genesis_hash.write(writer)?; - (self.latest_block_height.load(Ordering::Acquire) as u32).write(writer)?; - self.last_block_hash.read().unwrap().write(writer)?; + { + let best_block = self.best_block.read().unwrap(); + best_block.height().write(writer)?; + best_block.block_hash().write(writer)?; + } let channel_state = self.channel_state.lock().unwrap(); let mut unfunded_channels = 0; @@ -4340,8 +4363,8 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> } let genesis_hash: BlockHash = Readable::read(reader)?; - let latest_block_height: u32 = Readable::read(reader)?; - let last_block_hash: BlockHash = Readable::read(reader)?; + let best_block_height: u32 = Readable::read(reader)?; + let best_block_hash: BlockHash = Readable::read(reader)?; let mut failed_htlcs = Vec::new(); @@ -4449,8 +4472,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> chain_monitor: args.chain_monitor, tx_broadcaster: args.tx_broadcaster, - latest_block_height: AtomicUsize::new(latest_block_height as usize), - last_block_hash: RwLock::new(last_block_hash), + best_block: RwLock::new(BestBlock::new(best_block_hash, best_block_height)), channel_state: Mutex::new(ChannelHolder { by_id, @@ -4484,7 +4506,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> //TODO: Broadcast channel update for closed channels, but only after we've made a //connection or two. - Ok((last_block_hash.clone(), channel_manager)) + Ok((best_block_hash.clone(), channel_manager)) } } @@ -4545,7 +4567,7 @@ pub mod bench { use chain::chainmonitor::ChainMonitor; use chain::channelmonitor::Persist; use chain::keysinterface::{KeysManager, InMemorySigner}; - use ln::channelmanager::{ChainParameters, ChannelManager, PaymentHash, PaymentPreimage}; + use ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage}; use ln::features::InitFeatures; use ln::functional_test_utils::*; use ln::msgs::ChannelMessageHandler; @@ -4597,8 +4619,7 @@ pub mod bench { let keys_manager_a = KeysManager::new(&seed_a, 42, 42); let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &logger_a, &keys_manager_a, config.clone(), ChainParameters { network, - latest_hash: genesis_hash, - latest_height: 0, + best_block: BestBlock::from_genesis(network), }); let node_a_holder = NodeHolder { node: &node_a }; @@ -4608,8 +4629,7 @@ pub mod bench { let keys_manager_b = KeysManager::new(&seed_b, 42, 42); let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &logger_b, &keys_manager_b, config.clone(), ChainParameters { network, - latest_hash: genesis_hash, - latest_height: 0, + best_block: BestBlock::from_genesis(network), }); let node_b_holder = NodeHolder { node: &node_b }; diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index e3f5003a..dee3c6ea 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -13,7 +13,7 @@ use chain::{Listen, Watch}; use chain::channelmonitor::ChannelMonitor; use chain::transaction::OutPoint; -use ln::channelmanager::{ChainParameters, ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure}; +use ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure}; use routing::router::{Route, get_route}; use routing::network_graph::{NetGraphMsgHandler, NetworkGraph}; use ln::features::InitFeatures; @@ -122,21 +122,25 @@ pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block) do_connect_block(node, block, false); } -fn do_connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, skip_manager: bool) { - let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); +fn do_connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, skip_intermediaries: bool) { let height = node.best_block_info().1 + 1; - node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height); - if !skip_manager { + if !skip_intermediaries { + let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); match *node.connect_style.borrow() { ConnectStyle::BestBlockFirst|ConnectStyle::BestBlockFirstSkippingBlocks => { + node.chain_monitor.chain_monitor.update_best_block(&block.header, height); + node.chain_monitor.chain_monitor.transactions_confirmed(&block.header, &txdata, height); node.node.update_best_block(&block.header, height); - node.node.transactions_confirmed(&block.header, height, &block.txdata.iter().enumerate().collect::>()); + node.node.transactions_confirmed(&block.header, height, &txdata); }, ConnectStyle::TransactionsFirst|ConnectStyle::TransactionsFirstSkippingBlocks => { - node.node.transactions_confirmed(&block.header, height, &block.txdata.iter().enumerate().collect::>()); + node.chain_monitor.chain_monitor.transactions_confirmed(&block.header, &txdata, height); + node.chain_monitor.chain_monitor.update_best_block(&block.header, height); + node.node.transactions_confirmed(&block.header, height, &txdata); node.node.update_best_block(&block.header, height); }, ConnectStyle::FullBlockViaListen => { + node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height); Listen::block_connected(node.node, &block, height); } } @@ -151,17 +155,19 @@ pub fn disconnect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, count: u32) assert!(orig_header.1 > 0); // Cannot disconnect genesis let prev_header = node.blocks.borrow().last().unwrap().clone(); - node.chain_monitor.chain_monitor.block_disconnected(&orig_header.0, orig_header.1); match *node.connect_style.borrow() { ConnectStyle::FullBlockViaListen => { + node.chain_monitor.chain_monitor.block_disconnected(&orig_header.0, orig_header.1); Listen::block_disconnected(node.node, &orig_header.0, orig_header.1); }, ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks => { if i == count - 1 { + node.chain_monitor.chain_monitor.update_best_block(&prev_header.0, prev_header.1); node.node.update_best_block(&prev_header.0, prev_header.1); } }, _ => { + node.chain_monitor.chain_monitor.update_best_block(&prev_header.0, prev_header.1); node.node.update_best_block(&prev_header.0, prev_header.1); }, } @@ -1279,8 +1285,7 @@ pub fn create_node_chanmgrs<'a, 'b>(node_count: usize, cfgs: &'a Vec let network = Network::Testnet; let params = ChainParameters { network, - latest_hash: genesis_block(network).header.block_hash(), - latest_height: 0, + best_block: BestBlock::from_genesis(network), }; let node = ChannelManager::new(cfgs[i].fee_estimator, &cfgs[i].chain_monitor, cfgs[i].tx_broadcaster, cfgs[i].logger, cfgs[i].keys_manager, if node_config[i].is_some() { node_config[i].clone().unwrap() } else { default_config }, params); chanmgrs.push(node); diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 2ab48831..9a0c06e7 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -51,7 +51,6 @@ use regex; use std::collections::{BTreeSet, HashMap, HashSet}; use std::default::Default; use std::sync::Mutex; -use std::sync::atomic::Ordering; use ln::functional_test_utils::*; use ln::chan_utils::CommitmentTransaction; @@ -1585,7 +1584,7 @@ fn test_fee_spike_violation_fails_htlc() { let secp_ctx = Secp256k1::new(); let session_priv = SecretKey::from_slice(&[42; 32]).expect("RNG is bad!"); - let cur_height = nodes[1].node.latest_block_height.load(Ordering::Acquire) as u32 + 1; + let cur_height = nodes[1].node.best_block.read().unwrap().height() + 1; let onion_keys = onion_utils::construct_onion_keys(&secp_ctx, &route.paths[0], &session_priv).unwrap(); let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route.paths[0], 3460001, &None, cur_height).unwrap(); @@ -1756,7 +1755,7 @@ fn test_chan_reserve_violation_inbound_htlc_outbound_channel() { // Need to manually create the update_add_htlc message to go around the channel reserve check in send_htlc() let secp_ctx = Secp256k1::new(); let session_priv = SecretKey::from_slice(&[42; 32]).unwrap(); - let cur_height = nodes[1].node.latest_block_height.load(Ordering::Acquire) as u32 + 1; + let cur_height = nodes[1].node.best_block.read().unwrap().height() + 1; let onion_keys = onion_utils::construct_onion_keys(&secp_ctx, &route.paths[0], &session_priv).unwrap(); let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route.paths[0], 1000, &None, cur_height).unwrap(); let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, [0; 32], &payment_hash); @@ -1879,7 +1878,7 @@ fn test_chan_reserve_violation_inbound_htlc_inbound_chan() { // Need to manually create the update_add_htlc message to go around the channel reserve check in send_htlc() let secp_ctx = Secp256k1::new(); let session_priv = SecretKey::from_slice(&[42; 32]).unwrap(); - let cur_height = nodes[0].node.latest_block_height.load(Ordering::Acquire) as u32 + 1; + let cur_height = nodes[0].node.best_block.read().unwrap().height() + 1; let onion_keys = onion_utils::construct_onion_keys(&secp_ctx, &route_2.paths[0], &session_priv).unwrap(); let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route_2.paths[0], recv_value_2, &None, cur_height).unwrap(); let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, [0; 32], &our_payment_hash_1); @@ -2941,8 +2940,7 @@ fn test_htlc_on_chain_success() { check_tx_local_broadcast!(nodes[0], true, commitment_tx[0], chan_1.3); } -#[test] -fn test_htlc_on_chain_timeout() { +fn do_test_htlc_on_chain_timeout(connect_style: ConnectStyle) { // Test that in case of a unilateral close onchain, we detect the state of output and // timeout the HTLC backward accordingly. So here we test that ChannelManager is // broadcasting the right event to other nodes in payment path. @@ -2954,7 +2952,10 @@ fn test_htlc_on_chain_timeout() { let chanmon_cfgs = create_chanmon_cfgs(3); let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); - let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); + *nodes[0].connect_style.borrow_mut() = connect_style; + *nodes[1].connect_style.borrow_mut() = connect_style; + *nodes[2].connect_style.borrow_mut() = connect_style; // Create some intial channels let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); @@ -3068,6 +3069,13 @@ fn test_htlc_on_chain_timeout() { assert_eq!(node_txn[2].clone().input[0].witness.last().unwrap().len(), OFFERED_HTLC_SCRIPT_WEIGHT); } +#[test] +fn test_htlc_on_chain_timeout() { + do_test_htlc_on_chain_timeout(ConnectStyle::BestBlockFirstSkippingBlocks); + do_test_htlc_on_chain_timeout(ConnectStyle::TransactionsFirstSkippingBlocks); + do_test_htlc_on_chain_timeout(ConnectStyle::FullBlockViaListen); +} + #[test] fn test_simple_commitment_revoked_fail_backward() { // Test that in case of a revoked commitment tx, we detect the resolution of output by justice tx @@ -3400,7 +3408,7 @@ fn fail_backward_pending_htlc_upon_channel_failure() { let secp_ctx = Secp256k1::new(); let session_priv = SecretKey::from_slice(&[42; 32]).unwrap(); - let current_height = nodes[1].node.latest_block_height.load(Ordering::Acquire) as u32 + 1; + let current_height = nodes[1].node.best_block.read().unwrap().height() + 1; let net_graph_msg_handler = &nodes[1].net_graph_msg_handler; let route = get_route(&nodes[1].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[0].node.get_our_node_id(), None, None, &Vec::new(), 50_000, TEST_FINAL_CLTV, &logger).unwrap(); let (onion_payloads, _amount_msat, cltv_expiry) = onion_utils::build_onion_payloads(&route.paths[0], 50_000, &None, current_height).unwrap(); @@ -6504,7 +6512,7 @@ fn test_update_add_htlc_bolt2_receiver_check_max_htlc_limit() { let net_graph_msg_handler = &nodes[0].net_graph_msg_handler; let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), None, None, &[], 3999999, TEST_FINAL_CLTV, &logger).unwrap(); - let cur_height = nodes[0].node.latest_block_height.load(Ordering::Acquire) as u32 + 1; + let cur_height = nodes[0].node.best_block.read().unwrap().height() + 1; let onion_keys = onion_utils::construct_onion_keys(&Secp256k1::signing_only(), &route.paths[0], &session_priv).unwrap(); let (onion_payloads, _htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route.paths[0], 3999999, &None, cur_height).unwrap(); let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, [0; 32], &our_payment_hash); diff --git a/lightning/src/ln/onchaintx.rs b/lightning/src/ln/onchaintx.rs index f3d8b2e9..8e531b50 100644 --- a/lightning/src/ln/onchaintx.rs +++ b/lightning/src/ln/onchaintx.rs @@ -32,16 +32,37 @@ use util::logger::Logger; use util::ser::{Readable, ReadableArgs, Writer, Writeable, VecWriter}; use util::byte_utils; -use std::collections::{HashMap, hash_map}; +use std::collections::HashMap; use std::cmp; use std::ops::Deref; use std::mem::replace; const MAX_ALLOC_SIZE: usize = 64*1024; +/// An entry for an [`OnchainEvent`], stating the block height when the event was observed and the +/// transaction causing it. +/// +/// Used to determine when the on-chain event can be considered safe from a chain reorganization. +#[derive(PartialEq)] +struct OnchainEventEntry { + txid: Txid, + height: u32, + event: OnchainEvent, +} + +impl OnchainEventEntry { + fn confirmation_threshold(&self) -> u32 { + self.height + ANTI_REORG_DELAY - 1 + } + + fn has_reached_confirmation_threshold(&self, height: u32) -> bool { + height >= self.confirmation_threshold() + } +} + /// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it /// once they mature to enough confirmations (ANTI_REORG_DELAY) -#[derive(Clone, PartialEq)] +#[derive(PartialEq)] enum OnchainEvent { /// Outpoint under claim process by our own tx, once this one get enough confirmations, we remove it from /// bump-txn candidate buffer. @@ -280,7 +301,7 @@ pub struct OnchainTxHandler { #[cfg(not(test))] claimable_outpoints: HashMap, - onchain_events_waiting_threshold_conf: HashMap>, + onchain_events_awaiting_threshold_conf: Vec, latest_height: u32, @@ -317,21 +338,19 @@ impl OnchainTxHandler { claim_and_height.1.write(writer)?; } - writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?; - for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() { - writer.write_all(&byte_utils::be32_to_array(**target))?; - writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?; - for ev in events.iter() { - match *ev { - OnchainEvent::Claim { ref claim_request } => { - writer.write_all(&[0; 1])?; - claim_request.write(writer)?; - }, - OnchainEvent::ContentiousOutpoint { ref outpoint, ref input_material } => { - writer.write_all(&[1; 1])?; - outpoint.write(writer)?; - input_material.write(writer)?; - } + writer.write_all(&byte_utils::be64_to_array(self.onchain_events_awaiting_threshold_conf.len() as u64))?; + for ref entry in self.onchain_events_awaiting_threshold_conf.iter() { + entry.txid.write(writer)?; + writer.write_all(&byte_utils::be32_to_array(entry.height))?; + match entry.event { + OnchainEvent::Claim { ref claim_request } => { + writer.write_all(&[0; 1])?; + claim_request.write(writer)?; + }, + OnchainEvent::ContentiousOutpoint { ref outpoint, ref input_material } => { + writer.write_all(&[1; 1])?; + outpoint.write(writer)?; + input_material.write(writer)?; } } } @@ -377,32 +396,28 @@ impl<'a, K: KeysInterface> ReadableArgs<&'a K> for OnchainTxHandler { claimable_outpoints.insert(outpoint, (ancestor_claim_txid, height)); } let waiting_threshold_conf_len: u64 = Readable::read(reader)?; - let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128)); + let mut onchain_events_awaiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128)); for _ in 0..waiting_threshold_conf_len { - let height_target = Readable::read(reader)?; - let events_len: u64 = Readable::read(reader)?; - let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128)); - for _ in 0..events_len { - let ev = match ::read(reader)? { - 0 => { - let claim_request = Readable::read(reader)?; - OnchainEvent::Claim { - claim_request - } - }, - 1 => { - let outpoint = Readable::read(reader)?; - let input_material = Readable::read(reader)?; - OnchainEvent::ContentiousOutpoint { - outpoint, - input_material - } + let txid = Readable::read(reader)?; + let height = Readable::read(reader)?; + let event = match ::read(reader)? { + 0 => { + let claim_request = Readable::read(reader)?; + OnchainEvent::Claim { + claim_request } - _ => return Err(DecodeError::InvalidValue), - }; - events.push(ev); - } - onchain_events_waiting_threshold_conf.insert(height_target, events); + }, + 1 => { + let outpoint = Readable::read(reader)?; + let input_material = Readable::read(reader)?; + OnchainEvent::ContentiousOutpoint { + outpoint, + input_material + } + } + _ => return Err(DecodeError::InvalidValue), + }; + onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid, height, event }); } let latest_height = Readable::read(reader)?; @@ -419,7 +434,7 @@ impl<'a, K: KeysInterface> ReadableArgs<&'a K> for OnchainTxHandler { channel_transaction_parameters: channel_parameters, claimable_outpoints, pending_claim_requests, - onchain_events_waiting_threshold_conf, + onchain_events_awaiting_threshold_conf, latest_height, secp_ctx, }) @@ -438,7 +453,7 @@ impl OnchainTxHandler { channel_transaction_parameters: channel_parameters, pending_claim_requests: HashMap::new(), claimable_outpoints: HashMap::new(), - onchain_events_waiting_threshold_conf: HashMap::new(), + onchain_events_awaiting_threshold_conf: Vec::new(), latest_height: 0, secp_ctx, @@ -756,16 +771,13 @@ impl OnchainTxHandler { macro_rules! clean_claim_request_after_safety_delay { () => { - let new_event = OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() }; - match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) { - hash_map::Entry::Occupied(mut entry) => { - if !entry.get().contains(&new_event) { - entry.get_mut().push(new_event); - } - }, - hash_map::Entry::Vacant(entry) => { - entry.insert(vec![new_event]); - } + let entry = OnchainEventEntry { + txid: tx.txid(), + height, + event: OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() } + }; + if !self.onchain_events_awaiting_threshold_conf.contains(&entry) { + self.onchain_events_awaiting_threshold_conf.push(entry); } } } @@ -799,24 +811,23 @@ impl OnchainTxHandler { } } for (outpoint, input_material) in claimed_outputs_material.drain(..) { - let new_event = OnchainEvent::ContentiousOutpoint { outpoint, input_material }; - match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) { - hash_map::Entry::Occupied(mut entry) => { - if !entry.get().contains(&new_event) { - entry.get_mut().push(new_event); - } - }, - hash_map::Entry::Vacant(entry) => { - entry.insert(vec![new_event]); - } + let entry = OnchainEventEntry { + txid: tx.txid(), + height, + event: OnchainEvent::ContentiousOutpoint { outpoint, input_material }, + }; + if !self.onchain_events_awaiting_threshold_conf.contains(&entry) { + self.onchain_events_awaiting_threshold_conf.push(entry); } } } // After security delay, either our claim tx got enough confs or outpoint is definetely out of reach - if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) { - for ev in events { - match ev { + let onchain_events_awaiting_threshold_conf = + self.onchain_events_awaiting_threshold_conf.drain(..).collect::>(); + for entry in onchain_events_awaiting_threshold_conf { + if entry.has_reached_confirmation_threshold(height) { + match entry.event { OnchainEvent::Claim { claim_request } => { // We may remove a whole set of claim outpoints here, as these one may have // been aggregated in a single tx and claimed so atomically @@ -830,13 +841,15 @@ impl OnchainTxHandler { self.claimable_outpoints.remove(&outpoint); } } + } else { + self.onchain_events_awaiting_threshold_conf.push(entry); } } // Check if any pending claim request must be rescheduled for (first_claim_txid, ref claim_data) in self.pending_claim_requests.iter() { - if let Some(h) = claim_data.height_timer { - if h == height { + if let Some(height_timer) = claim_data.height_timer { + if height >= height_timer { bump_candidates.insert(*first_claim_txid, (*claim_data).clone()); } } @@ -856,17 +869,43 @@ impl OnchainTxHandler { } } + pub(crate) fn transaction_unconfirmed( + &mut self, + txid: &Txid, + broadcaster: B, + fee_estimator: F, + logger: L, + ) where + B::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + { + let mut height = None; + for entry in self.onchain_events_awaiting_threshold_conf.iter() { + if entry.txid == *txid { + height = Some(entry.height); + break; + } + } + + if let Some(height) = height { + self.block_disconnected(height, broadcaster, fee_estimator, logger); + } + } + pub(crate) fn block_disconnected(&mut self, height: u32, broadcaster: B, fee_estimator: F, logger: L) where B::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, { let mut bump_candidates = HashMap::new(); - if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) { - //- our claim tx on a commitment tx output - //- resurect outpoint back in its claimable set and regenerate tx - for ev in events { - match ev { + let onchain_events_awaiting_threshold_conf = + self.onchain_events_awaiting_threshold_conf.drain(..).collect::>(); + for entry in onchain_events_awaiting_threshold_conf { + if entry.height >= height { + //- our claim tx on a commitment tx output + //- resurect outpoint back in its claimable set and regenerate tx + match entry.event { OnchainEvent::ContentiousOutpoint { outpoint, input_material } => { if let Some(ancestor_claimable_txid) = self.claimable_outpoints.get(&outpoint) { if let Some(claim_material) = self.pending_claim_requests.get_mut(&ancestor_claimable_txid.0) { @@ -879,6 +918,8 @@ impl OnchainTxHandler { }, _ => {}, } + } else { + self.onchain_events_awaiting_threshold_conf.push(entry); } } for (_, claim_material) in bump_candidates.iter_mut() { @@ -895,7 +936,7 @@ impl OnchainTxHandler { // right now if one of the outpoint get disconnected, just erase whole pending claim request. let mut remove_request = Vec::new(); self.claimable_outpoints.retain(|_, ref v| - if v.1 == height { + if v.1 >= height { remove_request.push(v.0.clone()); false } else { true }); @@ -904,6 +945,16 @@ impl OnchainTxHandler { } } + pub(crate) fn get_relevant_txids(&self) -> Vec { + let mut txids: Vec = self.onchain_events_awaiting_threshold_conf + .iter() + .map(|entry| entry.txid) + .collect(); + txids.sort_unstable(); + txids.dedup(); + txids + } + pub(crate) fn provide_latest_holder_tx(&mut self, tx: HolderCommitmentTransaction) { self.prev_holder_commitment = Some(replace(&mut self.holder_commitment, tx)); self.holder_htlc_sigs = None;