Merge pull request #858 from jkczyz/2021-03-electrum-interface
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Thu, 15 Apr 2021 14:25:54 +0000 (14:25 +0000)
committerGitHub <noreply@github.com>
Thu, 15 Apr 2021 14:25:54 +0000 (14:25 +0000)
Electrum interface for ChannelMonitor

background-processor/src/lib.rs
fuzz/src/chanmon_consistency.rs
fuzz/src/full_stack.rs
lightning/src/chain/chainmonitor.rs
lightning/src/chain/channelmonitor.rs
lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/onchaintx.rs

index 3f11f4e381656a2529e2828e6913abda678f1688..30d9f6c501eff81aff203344935764f5d91aff39 100644 (file)
@@ -131,7 +131,7 @@ mod tests {
        use lightning::chain::keysinterface::{Sign, InMemorySigner, KeysInterface, KeysManager};
        use lightning::chain::transaction::OutPoint;
        use lightning::get_event_msg;
-       use lightning::ln::channelmanager::{ChainParameters, ChannelManager, SimpleArcChannelManager};
+       use lightning::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, SimpleArcChannelManager};
        use lightning::ln::features::InitFeatures;
        use lightning::ln::msgs::ChannelMessageHandler;
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
@@ -192,14 +192,12 @@ mod tests {
                        let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
                        let seed = [i as u8; 32];
                        let network = Network::Testnet;
-                       let genesis_block = genesis_block(network);
-                       let now = Duration::from_secs(genesis_block.header.time as u64);
+                       let now = Duration::from_secs(genesis_block(network).header.time as u64);
                        let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
                        let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
                        let params = ChainParameters {
                                network,
-                               latest_hash: genesis_block.block_hash(),
-                               latest_height: 0,
+                               best_block: BestBlock::from_genesis(network),
                        };
                        let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params));
                        let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
index a6a6a853ed17d877cf51084c035a5b6aaaa55d4a..9d0a90ee1cdc0736ea6e3384348136eaef44c9a8 100644 (file)
@@ -36,7 +36,7 @@ use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr,
 use lightning::chain::transaction::OutPoint;
 use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
 use lightning::chain::keysinterface::{KeysInterface, InMemorySigner};
-use lightning::ln::channelmanager::{ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret, PaymentSendFailure, ChannelManagerReadArgs};
+use lightning::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret, PaymentSendFailure, ChannelManagerReadArgs};
 use lightning::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
 use lightning::ln::msgs::{CommitmentUpdate, ChannelMessageHandler, DecodeError, ErrorAction, UpdateAddHTLC, Init};
 use lightning::util::enforcing_trait_impls::{EnforcingSigner, INITIAL_REVOKED_COMMITMENT_NUMBER};
@@ -322,8 +322,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
                        let network = Network::Bitcoin;
                        let params = ChainParameters {
                                network,
-                               latest_hash: genesis_block(network).block_hash(),
-                               latest_height: 0,
+                               best_block: BestBlock::from_genesis(network),
                        };
                        (ChannelManager::new(fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, params),
                        monitor, keys_manager)
index 9e4f18b5df8611d442c6ab13c2afc0341aed8e31..91ebb99018cca6cc5171c964394a32d5604a0bd1 100644 (file)
@@ -32,7 +32,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget,
 use lightning::chain::chainmonitor;
 use lightning::chain::transaction::OutPoint;
 use lightning::chain::keysinterface::{InMemorySigner, KeysInterface};
-use lightning::ln::channelmanager::{ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret};
+use lightning::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret};
 use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor};
 use lightning::ln::msgs::DecodeError;
 use lightning::routing::router::get_route;
@@ -355,15 +355,13 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
        config.channel_options.announced_channel = get_slice!(1)[0] != 0;
        config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
        let network = Network::Bitcoin;
-       let genesis_hash = genesis_block(network).block_hash();
        let params = ChainParameters {
                network,
-               latest_hash: genesis_hash,
-               latest_height: 0,
+               best_block: BestBlock::from_genesis(network),
        };
        let channelmanager = Arc::new(ChannelManager::new(fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, params));
        let our_id = PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret());
-       let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(genesis_hash, None, Arc::clone(&logger)));
+       let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(genesis_block(network).block_hash(), None, Arc::clone(&logger)));
 
        let peers = RefCell::new([false; 256]);
        let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
index 0fd088019247e4e8e9adbea39adb99798e274dd4..ee6df63a1e07ee272fda9f2217526e15c3d7beca 100644 (file)
 //! servicing [`ChannelMonitor`] updates from the client.
 
 use bitcoin::blockdata::block::{Block, BlockHeader};
+use bitcoin::hash_types::Txid;
 
 use chain;
 use chain::{Filter, WatchedOutput};
 use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use chain::channelmonitor;
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist};
+use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist, TransactionOutputs};
 use chain::transaction::{OutPoint, TransactionData};
 use chain::keysinterface::Sign;
 use util::logger::Logger;
@@ -82,24 +83,77 @@ where C::Target: chain::Filter,
        /// descendants of such transactions. It is not necessary to re-fetch the block to obtain
        /// updated `txdata`.
        pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
+               self.process_chain_data(header, txdata, |monitor, txdata| {
+                       monitor.block_connected(
+                               header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+               });
+       }
+
+       /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
+       /// of a channel and reacting accordingly to newly confirmed transactions. For details, see
+       /// [`ChannelMonitor::transactions_confirmed`].
+       ///
+       /// Used instead of [`block_connected`] by clients that are notified of transactions rather than
+       /// blocks. May be called before or after [`update_best_block`] for transactions in the
+       /// corresponding block. See [`update_best_block`] for further calling expectations.
+       ///
+       /// [`block_connected`]: Self::block_connected
+       /// [`update_best_block`]: Self::update_best_block
+       pub fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
+               self.process_chain_data(header, txdata, |monitor, txdata| {
+                       monitor.transactions_confirmed(
+                               header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+               });
+       }
+
+       /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
+       /// of a channel and reacting accordingly based on the new chain tip. For details, see
+       /// [`ChannelMonitor::update_best_block`].
+       ///
+       /// Used instead of [`block_connected`] by clients that are notified of transactions rather than
+       /// blocks. May be called before or after [`transactions_confirmed`] for the corresponding
+       /// block.
+       ///
+       /// Must be called after new blocks become available for the most recent block. Intermediary
+       /// blocks, however, may be safely skipped. In the event of a chain re-organization, this only
+       /// needs to be called for the most recent block assuming `transaction_unconfirmed` is called
+       /// for any affected transactions.
+       ///
+       /// [`block_connected`]: Self::block_connected
+       /// [`transactions_confirmed`]: Self::transactions_confirmed
+       /// [`transaction_unconfirmed`]: Self::transaction_unconfirmed
+       pub fn update_best_block(&self, header: &BlockHeader, height: u32) {
+               self.process_chain_data(header, &[], |monitor, txdata| {
+                       // While in practice there shouldn't be any recursive calls when given empty txdata,
+                       // it's still possible if a chain::Filter implementation returns a transaction.
+                       debug_assert!(txdata.is_empty());
+                       monitor.update_best_block(
+                               header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+               });
+       }
+
+       fn process_chain_data<FN>(&self, header: &BlockHeader, txdata: &TransactionData, process: FN)
+       where
+               FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
+       {
                let mut dependent_txdata = Vec::new();
                let monitors = self.monitors.read().unwrap();
                for monitor in monitors.values() {
-                       let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+                       let mut txn_outputs = process(monitor, txdata);
 
                        // Register any new outputs with the chain source for filtering, storing any dependent
                        // transactions from within the block that previously had not been included in txdata.
                        if let Some(ref chain_source) = self.chain_source {
                                let block_hash = header.block_hash();
-                               for (txid, outputs) in txn_outputs.drain(..) {
-                                       for (idx, output) in outputs.iter() {
+                               for (txid, mut outputs) in txn_outputs.drain(..) {
+                                       for (idx, output) in outputs.drain(..) {
                                                // Register any new outputs with the chain source for filtering and recurse
                                                // if it indicates that there are dependent transactions within the block
                                                // that had not been previously included in txdata.
                                                let output = WatchedOutput {
                                                        block_hash: Some(block_hash),
-                                                       outpoint: OutPoint { txid, index: *idx as u16 },
-                                                       script_pubkey: output.script_pubkey.clone(),
+                                                       outpoint: OutPoint { txid, index: idx as u16 },
+                                                       script_pubkey: output.script_pubkey,
                                                };
                                                if let Some(tx) = chain_source.register_output(output) {
                                                        dependent_txdata.push(tx);
@@ -114,7 +168,7 @@ where C::Target: chain::Filter,
                        dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index);
                        dependent_txdata.dedup_by_key(|(index, _tx)| *index);
                        let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect();
-                       self.block_connected(header, &txdata, height);
+                       self.process_chain_data(header, &txdata, process);
                }
        }
 
@@ -128,6 +182,36 @@ where C::Target: chain::Filter,
                }
        }
 
+       /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
+       /// of a channel based on transactions unconfirmed as a result of a chain reorganization. See
+       /// [`ChannelMonitor::transaction_unconfirmed`] for details.
+       ///
+       /// Used instead of [`block_disconnected`] by clients that are notified of transactions rather
+       /// than blocks. May be called before or after [`update_best_block`] for transactions in the
+       /// corresponding block. See [`update_best_block`] for further calling expectations. 
+       ///
+       /// [`block_disconnected`]: Self::block_disconnected
+       /// [`update_best_block`]: Self::update_best_block
+       pub fn transaction_unconfirmed(&self, txid: &Txid) {
+               let monitors = self.monitors.read().unwrap();
+               for monitor in monitors.values() {
+                       monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+               }
+       }
+
+       /// Returns the set of txids that should be monitored for re-organization out of the chain.
+       pub fn get_relevant_txids(&self) -> Vec<Txid> {
+               let mut txids = Vec::new();
+               let monitors = self.monitors.read().unwrap();
+               for monitor in monitors.values() {
+                       txids.append(&mut monitor.get_relevant_txids());
+               }
+
+               txids.sort_unstable();
+               txids.dedup();
+               txids
+       }
+
        /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
        ///
        /// When an optional chain source implementing [`chain::Filter`] is provided, the chain monitor
index 939337d7b42d572d64c7ab5b1a6224479f6c1ce9..777bc06bfb882745545c3db089c2f918a43c9b95 100644 (file)
@@ -37,7 +37,7 @@ use bitcoin::secp256k1;
 use ln::msgs::DecodeError;
 use ln::chan_utils;
 use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLCType, ChannelTransactionParameters, HolderCommitmentTransaction};
-use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
+use ln::channelmanager::{BestBlock, HTLCSource, PaymentPreimage, PaymentHash};
 use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
 use chain;
 use chain::WatchedOutput;
@@ -50,7 +50,7 @@ use util::ser::{Readable, ReadableArgs, MaybeReadable, Writer, Writeable, U48};
 use util::byte_utils;
 use util::events::Event;
 
-use std::collections::{HashMap, HashSet, hash_map};
+use std::collections::{HashMap, HashSet};
 use std::{cmp, mem};
 use std::io::Error;
 use std::ops::Deref;
@@ -465,9 +465,30 @@ pub(crate) struct ClaimRequest {
        pub(crate) witness_data: InputMaterial
 }
 
+/// An entry for an [`OnchainEvent`], stating the block height when the event was observed and the
+/// transaction causing it.
+///
+/// Used to determine when the on-chain event can be considered safe from a chain reorganization.
+#[derive(PartialEq)]
+struct OnchainEventEntry {
+       txid: Txid,
+       height: u32,
+       event: OnchainEvent,
+}
+
+impl OnchainEventEntry {
+       fn confirmation_threshold(&self) -> u32 {
+               self.height + ANTI_REORG_DELAY - 1
+       }
+
+       fn has_reached_confirmation_threshold(&self, height: u32) -> bool {
+               height >= self.confirmation_threshold()
+       }
+}
+
 /// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it
 /// once they mature to enough confirmations (ANTI_REORG_DELAY)
-#[derive(Clone, PartialEq)]
+#[derive(PartialEq)]
 enum OnchainEvent {
        /// HTLC output getting solved by a timeout, at maturation we pass upstream payment source information to solve
        /// inbound HTLC in backward channel. Note, in case of preimage, we pass info to upstream without delay as we can
@@ -684,10 +705,10 @@ pub(crate) struct ChannelMonitorImpl<Signer: Sign> {
        pending_monitor_events: Vec<MonitorEvent>,
        pending_events: Vec<Event>,
 
-       // Used to track onchain events, i.e transactions parts of channels confirmed on chain, on which
-       // we have to take actions once they reach enough confs. Key is a block height timer, i.e we enforce
-       // actions when we receive a block with given height. Actions depend on OnchainEvent type.
-       onchain_events_waiting_threshold_conf: HashMap<u32, Vec<OnchainEvent>>,
+       // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on
+       // which to take actions once they reach enough confirmations. Each entry includes the
+       // transaction's id and the height when the transaction was confirmed on chain.
+       onchain_events_awaiting_threshold_conf: Vec<OnchainEventEntry>,
 
        // If we get serialized out and re-read, we need to make sure that the chain monitoring
        // interface knows about the TXOs that we want to be notified of spends of. We could probably
@@ -714,15 +735,19 @@ pub(crate) struct ChannelMonitorImpl<Signer: Sign> {
        // remote monitor out-of-order with regards to the block view.
        holder_tx_signed: bool,
 
-       // We simply modify last_block_hash in Channel's block_connected so that serialization is
+       // We simply modify best_block in Channel's block_connected so that serialization is
        // consistent but hopefully the users' copy handles block_connected in a consistent way.
        // (we do *not*, however, update them in update_monitor to ensure any local user copies keep
-       // their last_block_hash from its state and not based on updated copies that didn't run through
+       // their best_block from its state and not based on updated copies that didn't run through
        // the full block_connected).
-       last_block_hash: BlockHash,
+       best_block: BestBlock,
+
        secp_ctx: Secp256k1<secp256k1::All>, //TODO: dedup this a bit...
 }
 
+/// Transaction outputs to watch for on-chain spends.
+pub(super) type TransactionOutputs = (Txid, Vec<(u32, TxOut)>);
+
 #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
 /// Used only in testing and fuzztarget to check serialization roundtrips don't change the
 /// underlying object
@@ -765,7 +790,7 @@ impl<Signer: Sign> PartialEq for ChannelMonitorImpl<Signer> {
                        self.payment_preimages != other.payment_preimages ||
                        self.pending_monitor_events != other.pending_monitor_events ||
                        self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly
-                       self.onchain_events_waiting_threshold_conf != other.onchain_events_waiting_threshold_conf ||
+                       self.onchain_events_awaiting_threshold_conf != other.onchain_events_awaiting_threshold_conf ||
                        self.outputs_to_watch != other.outputs_to_watch ||
                        self.lockdown_from_offchain != other.lockdown_from_offchain ||
                        self.holder_tx_signed != other.holder_tx_signed
@@ -931,24 +956,23 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
                        event.write(writer)?;
                }
 
-               self.last_block_hash.write(writer)?;
+               self.best_block.block_hash().write(writer)?;
+               writer.write_all(&byte_utils::be32_to_array(self.best_block.height()))?;
 
-               writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?;
-               for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() {
-                       writer.write_all(&byte_utils::be32_to_array(**target))?;
-                       writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?;
-                       for ev in events.iter() {
-                               match *ev {
-                                       OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                               0u8.write(writer)?;
-                                               htlc_update.0.write(writer)?;
-                                               htlc_update.1.write(writer)?;
-                                       },
-                                       OnchainEvent::MaturingOutput { ref descriptor } => {
-                                               1u8.write(writer)?;
-                                               descriptor.write(writer)?;
-                                       },
-                               }
+               writer.write_all(&byte_utils::be64_to_array(self.onchain_events_awaiting_threshold_conf.len() as u64))?;
+               for ref entry in self.onchain_events_awaiting_threshold_conf.iter() {
+                       entry.txid.write(writer)?;
+                       writer.write_all(&byte_utils::be32_to_array(entry.height))?;
+                       match entry.event {
+                               OnchainEvent::HTLCUpdate { ref htlc_update } => {
+                                       0u8.write(writer)?;
+                                       htlc_update.0.write(writer)?;
+                                       htlc_update.1.write(writer)?;
+                               },
+                               OnchainEvent::MaturingOutput { ref descriptor } => {
+                                       1u8.write(writer)?;
+                                       descriptor.write(writer)?;
+                               },
                        }
                }
 
@@ -977,7 +1001,7 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
                          funding_redeemscript: Script, channel_value_satoshis: u64,
                          commitment_transaction_number_obscure_factor: u64,
                          initial_holder_commitment_tx: HolderCommitmentTransaction,
-                         last_block_hash: BlockHash) -> ChannelMonitor<Signer> {
+                         best_block: BestBlock) -> ChannelMonitor<Signer> {
 
                assert!(commitment_transaction_number_obscure_factor <= (1 << 48));
                let our_channel_close_key_hash = WPubkeyHash::hash(&shutdown_pubkey.serialize());
@@ -1056,7 +1080,7 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
                                pending_monitor_events: Vec::new(),
                                pending_events: Vec::new(),
 
-                               onchain_events_waiting_threshold_conf: HashMap::new(),
+                               onchain_events_awaiting_threshold_conf: Vec::new(),
                                outputs_to_watch,
 
                                onchain_tx_handler,
@@ -1064,7 +1088,8 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
                                lockdown_from_offchain: false,
                                holder_tx_signed: false,
 
-                               last_block_hash,
+                               best_block,
+
                                secp_ctx,
                        }),
                }
@@ -1254,7 +1279,7 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
                broadcaster: B,
                fee_estimator: F,
                logger: L,
-       ) -> Vec<(Txid, Vec<(u32, TxOut)>)>
+       ) -> Vec<TransactionOutputs>
        where
                B::Target: BroadcasterInterface,
                F::Target: FeeEstimator,
@@ -1281,6 +1306,101 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
                self.inner.lock().unwrap().block_disconnected(
                        header, height, broadcaster, fee_estimator, logger)
        }
+
+       /// Processes transactions confirmed in a block with the given header and height, returning new
+       /// outputs to watch. See [`block_connected`] for details.
+       ///
+       /// Used instead of [`block_connected`] by clients that are notified of transactions rather than
+       /// blocks. May be called before or after [`update_best_block`] for transactions in the
+       /// corresponding block. See [`update_best_block`] for further calling expectations. 
+       ///
+       /// [`block_connected`]: Self::block_connected
+       /// [`update_best_block`]: Self::update_best_block
+       pub fn transactions_confirmed<B: Deref, F: Deref, L: Deref>(
+               &self,
+               header: &BlockHeader,
+               txdata: &TransactionData,
+               height: u32,
+               broadcaster: B,
+               fee_estimator: F,
+               logger: L,
+       ) -> Vec<TransactionOutputs>
+       where
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+               L::Target: Logger,
+       {
+               self.inner.lock().unwrap().transactions_confirmed(
+                       header, txdata, height, broadcaster, fee_estimator, logger)
+       }
+
+       /// Processes a transaction that was reorganized out of the chain.
+       ///
+       /// Used instead of [`block_disconnected`] by clients that are notified of transactions rather
+       /// than blocks. May be called before or after [`update_best_block`] for transactions in the
+       /// corresponding block. See [`update_best_block`] for further calling expectations.
+       ///
+       /// [`block_disconnected`]: Self::block_disconnected
+       /// [`update_best_block`]: Self::update_best_block
+       pub fn transaction_unconfirmed<B: Deref, F: Deref, L: Deref>(
+               &self,
+               txid: &Txid,
+               broadcaster: B,
+               fee_estimator: F,
+               logger: L,
+       ) where
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+               L::Target: Logger,
+       {
+               self.inner.lock().unwrap().transaction_unconfirmed(
+                       txid, broadcaster, fee_estimator, logger);
+       }
+
+       /// Updates the monitor with the current best chain tip, returning new outputs to watch. See
+       /// [`block_connected`] for details.
+       ///
+       /// Used instead of [`block_connected`] by clients that are notified of transactions rather than
+       /// blocks. May be called before or after [`transactions_confirmed`] for the corresponding
+       /// block.
+       ///
+       /// Must be called after new blocks become available for the most recent block. Intermediary
+       /// blocks, however, may be safely skipped. In the event of a chain re-organization, this only
+       /// needs to be called for the most recent block assuming `transaction_unconfirmed` is called
+       /// for any affected transactions.
+       ///
+       /// [`block_connected`]: Self::block_connected
+       /// [`transactions_confirmed`]: Self::transactions_confirmed
+       /// [`transaction_unconfirmed`]: Self::transaction_unconfirmed
+       pub fn update_best_block<B: Deref, F: Deref, L: Deref>(
+               &self,
+               header: &BlockHeader,
+               height: u32,
+               broadcaster: B,
+               fee_estimator: F,
+               logger: L,
+       ) -> Vec<TransactionOutputs>
+       where
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+               L::Target: Logger,
+       {
+               self.inner.lock().unwrap().update_best_block(
+                       header, height, broadcaster, fee_estimator, logger)
+       }
+
+       /// Returns the set of txids that should be monitored for re-organization out of the chain.
+       pub fn get_relevant_txids(&self) -> Vec<Txid> {
+               let inner = self.inner.lock().unwrap();
+               let mut txids: Vec<Txid> = inner.onchain_events_awaiting_threshold_conf
+                       .iter()
+                       .map(|entry| entry.txid)
+                       .chain(inner.onchain_tx_handler.get_relevant_txids().into_iter())
+                       .collect();
+               txids.sort_unstable();
+               txids.dedup();
+               txids
+       }
 }
 
 impl<Signer: Sign> ChannelMonitorImpl<Signer> {
@@ -1574,7 +1694,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
        /// HTLC-Success/HTLC-Timeout transactions.
        /// Return updates for HTLC pending in the channel and failed automatically by the broadcast of
        /// revoked counterparty commitment tx
-       fn check_spend_counterparty_transaction<L: Deref>(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec<ClaimRequest>, (Txid, Vec<(u32, TxOut)>)) where L::Target: Logger {
+       fn check_spend_counterparty_transaction<L: Deref>(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec<ClaimRequest>, TransactionOutputs) where L::Target: Logger {
                // Most secp and related errors trying to create keys means we have no hope of constructing
                // a spend transaction...so we return no transactions to broadcast
                let mut claimable_outpoints = Vec::new();
@@ -1639,24 +1759,24 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                                if let Some(ref outpoints) = self.counterparty_claimable_outpoints.get($txid) {
                                                        for &(ref htlc, ref source_option) in outpoints.iter() {
                                                                if let &Some(ref source) = source_option {
-                                                                       log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1);
-                                                                       match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                                                                               hash_map::Entry::Occupied(mut entry) => {
-                                                                                       let e = entry.get_mut();
-                                                                                       e.retain(|ref event| {
-                                                                                               match **event {
-                                                                                                       OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                                                                                               return htlc_update.0 != **source
-                                                                                                       },
-                                                                                                       _ => true
-                                                                                               }
-                                                                                       });
-                                                                                       e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())});
-                                                                               }
-                                                                               hash_map::Entry::Vacant(entry) => {
-                                                                                       entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]);
+                                                                       self.onchain_events_awaiting_threshold_conf.retain(|ref entry| {
+                                                                               if entry.height != height { return true; }
+                                                                               match entry.event {
+                                                                                        OnchainEvent::HTLCUpdate { ref htlc_update } => {
+                                                                                                htlc_update.0 != **source
+                                                                                        },
+                                                                                        _ => true,
                                                                                }
-                                                                       }
+                                                                       });
+                                                                       let entry = OnchainEventEntry {
+                                                                               txid: *$txid,
+                                                                               height,
+                                                                               event: OnchainEvent::HTLCUpdate {
+                                                                                       htlc_update: ((**source).clone(), htlc.payment_hash.clone())
+                                                                               },
+                                                                       };
+                                                                       log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold());
+                                                                       self.onchain_events_awaiting_threshold_conf.push(entry);
                                                                }
                                                        }
                                                }
@@ -1705,23 +1825,22 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                                                        }
                                                                }
                                                                log_trace!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of counterparty commitment transaction", log_bytes!(htlc.payment_hash.0), $commitment_tx);
-                                                               match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                                                                       hash_map::Entry::Occupied(mut entry) => {
-                                                                               let e = entry.get_mut();
-                                                                               e.retain(|ref event| {
-                                                                                       match **event {
-                                                                                               OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                                                                                       return htlc_update.0 != **source
-                                                                                               },
-                                                                                               _ => true
-                                                                                       }
-                                                                               });
-                                                                               e.push(OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())});
-                                                                       }
-                                                                       hash_map::Entry::Vacant(entry) => {
-                                                                               entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ((**source).clone(), htlc.payment_hash.clone())}]);
+                                                               self.onchain_events_awaiting_threshold_conf.retain(|ref entry| {
+                                                                       if entry.height != height { return true; }
+                                                                       match entry.event {
+                                                                                OnchainEvent::HTLCUpdate { ref htlc_update } => {
+                                                                                        htlc_update.0 != **source
+                                                                                },
+                                                                                _ => true,
                                                                        }
-                                                               }
+                                                               });
+                                                               self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry {
+                                                                       txid: *$txid,
+                                                                       height,
+                                                                       event: OnchainEvent::HTLCUpdate {
+                                                                               htlc_update: ((**source).clone(), htlc.payment_hash.clone())
+                                                                       },
+                                                               });
                                                        }
                                                }
                                        }
@@ -1786,7 +1905,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
        }
 
        /// Attempts to claim a counterparty HTLC-Success/HTLC-Timeout's outputs using the revocation key
-       fn check_spend_counterparty_htlc<L: Deref>(&mut self, tx: &Transaction, commitment_number: u64, height: u32, logger: &L) -> (Vec<ClaimRequest>, Option<(Txid, Vec<(u32, TxOut)>)>) where L::Target: Logger {
+       fn check_spend_counterparty_htlc<L: Deref>(&mut self, tx: &Transaction, commitment_number: u64, height: u32, logger: &L) -> (Vec<ClaimRequest>, Option<TransactionOutputs>) where L::Target: Logger {
                let htlc_txid = tx.txid();
                if tx.input.len() != 1 || tx.output.len() != 1 || tx.input[0].witness.len() != 5 {
                        return (Vec::new(), None)
@@ -1855,31 +1974,29 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
        /// Attempts to claim any claimable HTLCs in a commitment transaction which was not (yet)
        /// revoked using data in holder_claimable_outpoints.
        /// Should not be used if check_spend_revoked_transaction succeeds.
-       fn check_spend_holder_transaction<L: Deref>(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec<ClaimRequest>, (Txid, Vec<(u32, TxOut)>)) where L::Target: Logger {
+       fn check_spend_holder_transaction<L: Deref>(&mut self, tx: &Transaction, height: u32, logger: &L) -> (Vec<ClaimRequest>, TransactionOutputs) where L::Target: Logger {
                let commitment_txid = tx.txid();
                let mut claim_requests = Vec::new();
                let mut watch_outputs = Vec::new();
 
                macro_rules! wait_threshold_conf {
-                       ($height: expr, $source: expr, $commitment_tx: expr, $payment_hash: expr) => {
-                               log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, height + ANTI_REORG_DELAY - 1);
-                               match self.onchain_events_waiting_threshold_conf.entry($height + ANTI_REORG_DELAY - 1) {
-                                       hash_map::Entry::Occupied(mut entry) => {
-                                               let e = entry.get_mut();
-                                               e.retain(|ref event| {
-                                                       match **event {
-                                                               OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                                                       return htlc_update.0 != $source
-                                                               },
-                                                               _ => true
-                                                       }
-                                               });
-                                               e.push(OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)});
+                       ($source: expr, $commitment_tx: expr, $payment_hash: expr) => {
+                               self.onchain_events_awaiting_threshold_conf.retain(|ref entry| {
+                                       if entry.height != height { return true; }
+                                       match entry.event {
+                                                OnchainEvent::HTLCUpdate { ref htlc_update } => {
+                                                        htlc_update.0 != $source
+                                                },
+                                                _ => true,
                                        }
-                                       hash_map::Entry::Vacant(entry) => {
-                                               entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash)}]);
-                                       }
-                               }
+                               });
+                               let entry = OnchainEventEntry {
+                                       txid: commitment_txid,
+                                       height,
+                                       event: OnchainEvent::HTLCUpdate { htlc_update: ($source, $payment_hash) },
+                               };
+                               log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, entry.confirmation_threshold());
+                               self.onchain_events_awaiting_threshold_conf.push(entry);
                        }
                }
 
@@ -1915,7 +2032,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                for &(ref htlc, _, ref source) in &$holder_tx.htlc_outputs {
                                        if htlc.transaction_output_index.is_none() {
                                                if let &Some(ref source) = source {
-                                                       wait_threshold_conf!(height, source.clone(), "lastest", htlc.payment_hash.clone());
+                                                       wait_threshold_conf!(source.clone(), "lastest", htlc.payment_hash.clone());
                                                }
                                        }
                                }
@@ -1980,10 +2097,58 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                return res
        }
 
-       pub fn block_connected<B: Deref, F: Deref, L: Deref>(&mut self, header: &BlockHeader, txdata: &TransactionData, height: u32, broadcaster: B, fee_estimator: F, logger: L)-> Vec<(Txid, Vec<(u32, TxOut)>)>
+       pub fn block_connected<B: Deref, F: Deref, L: Deref>(&mut self, header: &BlockHeader, txdata: &TransactionData, height: u32, broadcaster: B, fee_estimator: F, logger: L) -> Vec<TransactionOutputs>
                where B::Target: BroadcasterInterface,
                      F::Target: FeeEstimator,
                                        L::Target: Logger,
+       {
+               let block_hash = header.block_hash();
+               log_trace!(logger, "New best block {} at height {}", block_hash, height);
+               self.best_block = BestBlock::new(block_hash, height);
+
+               self.transactions_confirmed(header, txdata, height, broadcaster, fee_estimator, logger)
+       }
+
+       fn update_best_block<B: Deref, F: Deref, L: Deref>(
+               &mut self,
+               header: &BlockHeader,
+               height: u32,
+               broadcaster: B,
+               fee_estimator: F,
+               logger: L,
+       ) -> Vec<TransactionOutputs>
+       where
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+               L::Target: Logger,
+       {
+               let block_hash = header.block_hash();
+               log_trace!(logger, "New best block {} at height {}", block_hash, height);
+
+               if height > self.best_block.height() {
+                       self.best_block = BestBlock::new(block_hash, height);
+                       self.block_confirmed(height, vec![], vec![], vec![], broadcaster, fee_estimator, logger)
+               } else {
+                       self.best_block = BestBlock::new(block_hash, height);
+                       self.onchain_events_awaiting_threshold_conf.retain(|ref entry| entry.height <= height);
+                       self.onchain_tx_handler.block_disconnected(height + 1, broadcaster, fee_estimator, logger);
+                       Vec::new()
+               }
+       }
+
+       fn transactions_confirmed<B: Deref, F: Deref, L: Deref>(
+               &mut self,
+               header: &BlockHeader,
+               txdata: &TransactionData,
+               height: u32,
+               broadcaster: B,
+               fee_estimator: F,
+               logger: L,
+       ) -> Vec<TransactionOutputs>
+       where
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+               L::Target: Logger,
        {
                let txn_matched = self.filter_block(txdata);
                for tx in &txn_matched {
@@ -2039,11 +2204,28 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
 
                        self.is_paying_spendable_output(&tx, height, &logger);
                }
+
+               self.block_confirmed(height, txn_matched, watch_outputs, claimable_outpoints, broadcaster, fee_estimator, logger)
+       }
+
+       fn block_confirmed<B: Deref, F: Deref, L: Deref>(
+               &mut self,
+               height: u32,
+               txn_matched: Vec<&Transaction>,
+               mut watch_outputs: Vec<TransactionOutputs>,
+               mut claimable_outpoints: Vec<ClaimRequest>,
+               broadcaster: B,
+               fee_estimator: F,
+               logger: L,
+       ) -> Vec<TransactionOutputs>
+       where
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+               L::Target: Logger,
+       {
                let should_broadcast = self.would_broadcast_at_height(height, &logger);
                if should_broadcast {
                        claimable_outpoints.push(ClaimRequest { absolute_timelock: height, aggregable: false, outpoint: BitcoinOutPoint { txid: self.funding_info.0.txid.clone(), vout: self.funding_info.0.index as u32 }, witness_data: InputMaterial::Funding { funding_redeemscript: self.funding_redeemscript.clone() }});
-               }
-               if should_broadcast {
                        self.pending_monitor_events.push(MonitorEvent::CommitmentTxBroadcasted(self.funding_info.0));
                        let commitment_tx = self.onchain_tx_handler.get_fully_signed_holder_tx(&self.funding_redeemscript);
                        self.holder_tx_signed = true;
@@ -2054,29 +2236,68 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                        }
                        claimable_outpoints.append(&mut new_outpoints);
                }
-               if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) {
-                       for ev in events {
-                               match ev {
-                                       OnchainEvent::HTLCUpdate { htlc_update } => {
-                                               log_trace!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
-                                               self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
-                                                       payment_hash: htlc_update.1,
-                                                       payment_preimage: None,
-                                                       source: htlc_update.0,
-                                               }));
-                                       },
-                                       OnchainEvent::MaturingOutput { descriptor } => {
-                                               log_trace!(logger, "Descriptor {} has got enough confirmations to be passed upstream", log_spendable!(descriptor));
-                                               self.pending_events.push(Event::SpendableOutputs {
-                                                       outputs: vec![descriptor]
-                                               });
+
+               // Find which on-chain events have reached their confirmation threshold.
+               let onchain_events_awaiting_threshold_conf =
+                       self.onchain_events_awaiting_threshold_conf.drain(..).collect::<Vec<_>>();
+               let mut onchain_events_reaching_threshold_conf = Vec::new();
+               for entry in onchain_events_awaiting_threshold_conf {
+                       if entry.has_reached_confirmation_threshold(height) {
+                               onchain_events_reaching_threshold_conf.push(entry);
+                       } else {
+                               self.onchain_events_awaiting_threshold_conf.push(entry);
+                       }
+               }
+
+               // Used to check for duplicate HTLC resolutions.
+               #[cfg(debug_assertions)]
+               let unmatured_htlcs: Vec<_> = self.onchain_events_awaiting_threshold_conf
+                       .iter()
+                       .filter_map(|entry| match &entry.event {
+                               OnchainEvent::HTLCUpdate { htlc_update } => Some(htlc_update.0.clone()),
+                               OnchainEvent::MaturingOutput { .. } => None,
+                       })
+                       .collect();
+               #[cfg(debug_assertions)]
+               let mut matured_htlcs = Vec::new();
+
+               // Produce actionable events from on-chain events having reached their threshold.
+               for entry in onchain_events_reaching_threshold_conf.drain(..) {
+                       match entry.event {
+                               OnchainEvent::HTLCUpdate { htlc_update } => {
+                                       // Check for duplicate HTLC resolutions.
+                                       #[cfg(debug_assertions)]
+                                       {
+                                               debug_assert!(
+                                                       unmatured_htlcs.iter().find(|&htlc| htlc == &htlc_update.0).is_none(),
+                                                       "An unmature HTLC transaction conflicts with a maturing one; failed to \
+                                                        call either transaction_unconfirmed for the conflicting transaction \
+                                                        or block_disconnected for a block containing it.");
+                                               debug_assert!(
+                                                       matured_htlcs.iter().find(|&htlc| htlc == &htlc_update.0).is_none(),
+                                                       "A matured HTLC transaction conflicts with a maturing one; failed to \
+                                                        call either transaction_unconfirmed for the conflicting transaction \
+                                                        or block_disconnected for a block containing it.");
+                                               matured_htlcs.push(htlc_update.0.clone());
                                        }
+
+                                       log_trace!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
+                                       self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
+                                               payment_hash: htlc_update.1,
+                                               payment_preimage: None,
+                                               source: htlc_update.0,
+                                       }));
+                               },
+                               OnchainEvent::MaturingOutput { descriptor } => {
+                                       log_trace!(logger, "Descriptor {} has got enough confirmations to be passed upstream", log_spendable!(descriptor));
+                                       self.pending_events.push(Event::SpendableOutputs {
+                                               outputs: vec![descriptor]
+                                       });
                                }
                        }
                }
 
                self.onchain_tx_handler.update_claims_view(&txn_matched, claimable_outpoints, Some(height), &&*broadcaster, &&*fee_estimator, &&*logger);
-               self.last_block_hash = block_hash;
 
                // Determine new outputs to watch by comparing against previously known outputs to watch,
                // updating the latter in the process.
@@ -2108,15 +2329,29 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
        {
                log_trace!(logger, "Block {} at height {} disconnected", header.block_hash(), height);
 
-               if let Some(_) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) {
-                       //We may discard:
-                       //- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected
-                       //- maturing spendable output has transaction paying us has been disconnected
-               }
+               //We may discard:
+               //- htlc update there as failure-trigger tx (revoked commitment tx, non-revoked commitment tx, HTLC-timeout tx) has been disconnected
+               //- maturing spendable output has transaction paying us has been disconnected
+               self.onchain_events_awaiting_threshold_conf.retain(|ref entry| entry.height < height);
 
                self.onchain_tx_handler.block_disconnected(height, broadcaster, fee_estimator, logger);
 
-               self.last_block_hash = header.prev_blockhash;
+               self.best_block = BestBlock::new(header.prev_blockhash, height - 1);
+       }
+
+       fn transaction_unconfirmed<B: Deref, F: Deref, L: Deref>(
+               &mut self,
+               txid: &Txid,
+               broadcaster: B,
+               fee_estimator: F,
+               logger: L,
+       ) where
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+               L::Target: Logger,
+       {
+               self.onchain_events_awaiting_threshold_conf.retain(|ref entry| entry.txid != *txid);
+               self.onchain_tx_handler.transaction_unconfirmed(txid, broadcaster, fee_estimator, logger);
        }
 
        /// Filters a block's `txdata` for transactions spending watched outputs or for any child
@@ -2344,24 +2579,22 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                                }));
                                        }
                                } else {
-                                       log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
-                                       match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                                               hash_map::Entry::Occupied(mut entry) => {
-                                                       let e = entry.get_mut();
-                                                       e.retain(|ref event| {
-                                                               match **event {
-                                                                       OnchainEvent::HTLCUpdate { ref htlc_update } => {
-                                                                               return htlc_update.0 != source
-                                                                       },
-                                                                       _ => true
-                                                               }
-                                                       });
-                                                       e.push(OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)});
-                                               }
-                                               hash_map::Entry::Vacant(entry) => {
-                                                       entry.insert(vec![OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash)}]);
+                                       self.onchain_events_awaiting_threshold_conf.retain(|ref entry| {
+                                               if entry.height != height { return true; }
+                                               match entry.event {
+                                                        OnchainEvent::HTLCUpdate { ref htlc_update } => {
+                                                                htlc_update.0 != source
+                                                        },
+                                                        _ => true,
                                                }
-                                       }
+                                       });
+                                       let entry = OnchainEventEntry {
+                                               txid: tx.txid(),
+                                               height,
+                                               event: OnchainEvent::HTLCUpdate { htlc_update: (source, payment_hash) },
+                                       };
+                                       log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), entry.confirmation_threshold());
+                                       self.onchain_events_awaiting_threshold_conf.push(entry);
                                }
                        }
                }
@@ -2420,16 +2653,13 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                        }
                }
                if let Some(spendable_output) = spendable_output {
-                       log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), height + ANTI_REORG_DELAY - 1);
-                       match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                               hash_map::Entry::Occupied(mut entry) => {
-                                       let e = entry.get_mut();
-                                       e.push(OnchainEvent::MaturingOutput { descriptor: spendable_output });
-                               }
-                               hash_map::Entry::Vacant(entry) => {
-                                       entry.insert(vec![OnchainEvent::MaturingOutput { descriptor: spendable_output }]);
-                               }
-                       }
+                       let entry = OnchainEventEntry {
+                               txid: tx.txid(),
+                               height: height,
+                               event: OnchainEvent::MaturingOutput { descriptor: spendable_output.clone() },
+                       };
+                       log_trace!(logger, "Maturing {} until {}", log_spendable!(spendable_output), entry.confirmation_threshold());
+                       self.onchain_events_awaiting_threshold_conf.push(entry);
                }
        }
 }
@@ -2692,34 +2922,30 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
                        }
                }
 
-               let last_block_hash: BlockHash = Readable::read(reader)?;
+               let best_block = BestBlock::new(Readable::read(reader)?, Readable::read(reader)?);
 
                let waiting_threshold_conf_len: u64 = Readable::read(reader)?;
-               let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
+               let mut onchain_events_awaiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
                for _ in 0..waiting_threshold_conf_len {
-                       let height_target = Readable::read(reader)?;
-                       let events_len: u64 = Readable::read(reader)?;
-                       let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128));
-                       for _ in 0..events_len {
-                               let ev = match <u8 as Readable>::read(reader)? {
-                                       0 => {
-                                               let htlc_source = Readable::read(reader)?;
-                                               let hash = Readable::read(reader)?;
-                                               OnchainEvent::HTLCUpdate {
-                                                       htlc_update: (htlc_source, hash)
-                                               }
-                                       },
-                                       1 => {
-                                               let descriptor = Readable::read(reader)?;
-                                               OnchainEvent::MaturingOutput {
-                                                       descriptor
-                                               }
-                                       },
-                                       _ => return Err(DecodeError::InvalidValue),
-                               };
-                               events.push(ev);
-                       }
-                       onchain_events_waiting_threshold_conf.insert(height_target, events);
+                       let txid = Readable::read(reader)?;
+                       let height = Readable::read(reader)?;
+                       let event = match <u8 as Readable>::read(reader)? {
+                               0 => {
+                                       let htlc_source = Readable::read(reader)?;
+                                       let hash = Readable::read(reader)?;
+                                       OnchainEvent::HTLCUpdate {
+                                               htlc_update: (htlc_source, hash)
+                                       }
+                               },
+                               1 => {
+                                       let descriptor = Readable::read(reader)?;
+                                       OnchainEvent::MaturingOutput {
+                                               descriptor
+                                       }
+                               },
+                               _ => return Err(DecodeError::InvalidValue),
+                       };
+                       onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid, height, event });
                }
 
                let outputs_to_watch_len: u64 = Readable::read(reader)?;
@@ -2743,7 +2969,7 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&keys_manager.get_secure_random_bytes());
 
-               Ok((last_block_hash.clone(), ChannelMonitor {
+               Ok((best_block.block_hash(), ChannelMonitor {
                        inner: Mutex::new(ChannelMonitorImpl {
                                latest_update_id,
                                commitment_transaction_number_obscure_factor,
@@ -2780,7 +3006,7 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
                                pending_monitor_events,
                                pending_events,
 
-                               onchain_events_waiting_threshold_conf,
+                               onchain_events_awaiting_threshold_conf,
                                outputs_to_watch,
 
                                onchain_tx_handler,
@@ -2788,7 +3014,8 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
                                lockdown_from_offchain,
                                holder_tx_signed,
 
-                               last_block_hash,
+                               best_block,
+
                                secp_ctx,
                        }),
                }))
@@ -2797,7 +3024,6 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
 
 #[cfg(test)]
 mod tests {
-       use bitcoin::blockdata::constants::genesis_block;
        use bitcoin::blockdata::script::{Script, Builder};
        use bitcoin::blockdata::opcodes;
        use bitcoin::blockdata::transaction::{Transaction, TxIn, TxOut, SigHashType};
@@ -2811,7 +3037,7 @@ mod tests {
        use hex;
        use chain::channelmonitor::ChannelMonitor;
        use chain::transaction::OutPoint;
-       use ln::channelmanager::{PaymentPreimage, PaymentHash};
+       use ln::channelmanager::{BestBlock, PaymentPreimage, PaymentHash};
        use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
        use ln::chan_utils;
        use ln::chan_utils::{HTLCOutputInCommitment, ChannelPublicKeys, ChannelTransactionParameters, HolderCommitmentTransaction, CounterpartyChannelTransactionParameters};
@@ -2907,13 +3133,13 @@ mod tests {
                };
                // Prune with one old state and a holder commitment tx holding a few overlaps with the
                // old state.
-               let last_block_hash = genesis_block(Network::Testnet).block_hash();
+               let best_block = BestBlock::from_genesis(Network::Testnet);
                let monitor = ChannelMonitor::new(Secp256k1::new(), keys,
                                                  &PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()), 0, &Script::new(),
                                                  (OutPoint { txid: Txid::from_slice(&[43; 32]).unwrap(), index: 0 }, Script::new()),
                                                  &channel_parameters,
                                                  Script::new(), 46, 0,
-                                                 HolderCommitmentTransaction::dummy(), last_block_hash);
+                                                 HolderCommitmentTransaction::dummy(), best_block);
 
                monitor.provide_latest_holder_commitment_tx(HolderCommitmentTransaction::dummy(), preimages_to_holder_htlcs!(preimages[0..10])).unwrap();
                let dummy_txid = dummy_tx.txid();
index 6aee5e7e67a62310961d031ee3cccfc23d3d96da..5f987427979cbb9d267a73ea7f3fc0b0dd374378 100644 (file)
@@ -24,7 +24,7 @@ use bitcoin::secp256k1;
 use ln::features::{ChannelFeatures, InitFeatures};
 use ln::msgs;
 use ln::msgs::{DecodeError, OptionalField, DataLossProtect};
-use ln::channelmanager::{PendingHTLCStatus, HTLCSource, HTLCFailReason, HTLCFailureMsg, PendingHTLCInfo, RAACommitmentOrder, PaymentPreimage, PaymentHash, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, MAX_LOCAL_BREAKDOWN_TIMEOUT};
+use ln::channelmanager::{BestBlock, PendingHTLCStatus, HTLCSource, HTLCFailReason, HTLCFailureMsg, PendingHTLCInfo, RAACommitmentOrder, PaymentPreimage, PaymentHash, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, MAX_LOCAL_BREAKDOWN_TIMEOUT};
 use ln::chan_utils::{CounterpartyCommitmentSecrets, TxCreationKeys, HTLCOutputInCommitment, HTLC_SUCCESS_TX_WEIGHT, HTLC_TIMEOUT_TX_WEIGHT, make_funding_redeemscript, ChannelPublicKeys, CommitmentTransaction, HolderCommitmentTransaction, ChannelTransactionParameters, CounterpartyChannelTransactionParameters, MAX_HTLCS, get_commitment_transaction_number_obscure_factor};
 use ln::chan_utils;
 use chain::chaininterface::{FeeEstimator,ConfirmationTarget};
@@ -1531,7 +1531,7 @@ impl<Signer: Sign> Channel<Signer> {
                &self.get_counterparty_pubkeys().funding_pubkey
        }
 
-       pub fn funding_created<L: Deref>(&mut self, msg: &msgs::FundingCreated, last_block_hash: BlockHash, logger: &L) -> Result<(msgs::FundingSigned, ChannelMonitor<Signer>), ChannelError> where L::Target: Logger {
+       pub fn funding_created<L: Deref>(&mut self, msg: &msgs::FundingCreated, best_block: BestBlock, logger: &L) -> Result<(msgs::FundingSigned, ChannelMonitor<Signer>), ChannelError> where L::Target: Logger {
                if self.is_outbound() {
                        return Err(ChannelError::Close("Received funding_created for an outbound channel?".to_owned()));
                }
@@ -1585,7 +1585,7 @@ impl<Signer: Sign> Channel<Signer> {
                                                          &self.channel_transaction_parameters,
                                                          funding_redeemscript.clone(), self.channel_value_satoshis,
                                                          obscure_factor,
-                                                         holder_commitment_tx, last_block_hash);
+                                                         holder_commitment_tx, best_block);
 
                channel_monitor.provide_latest_counterparty_commitment_tx(counterparty_initial_commitment_txid, Vec::new(), self.cur_counterparty_commitment_transaction_number, self.counterparty_cur_commitment_point.unwrap(), logger);
 
@@ -1602,7 +1602,7 @@ impl<Signer: Sign> Channel<Signer> {
 
        /// Handles a funding_signed message from the remote end.
        /// If this call is successful, broadcast the funding transaction (and not before!)
-       pub fn funding_signed<L: Deref>(&mut self, msg: &msgs::FundingSigned, last_block_hash: BlockHash, logger: &L) -> Result<(ChannelMonitor<Signer>, Transaction), ChannelError> where L::Target: Logger {
+       pub fn funding_signed<L: Deref>(&mut self, msg: &msgs::FundingSigned, best_block: BestBlock, logger: &L) -> Result<(ChannelMonitor<Signer>, Transaction), ChannelError> where L::Target: Logger {
                if !self.is_outbound() {
                        return Err(ChannelError::Close("Received funding_signed for an inbound channel?".to_owned()));
                }
@@ -1655,7 +1655,7 @@ impl<Signer: Sign> Channel<Signer> {
                                                          &self.channel_transaction_parameters,
                                                          funding_redeemscript.clone(), self.channel_value_satoshis,
                                                          obscure_factor,
-                                                         holder_commitment_tx, last_block_hash);
+                                                         holder_commitment_tx, best_block);
 
                channel_monitor.provide_latest_counterparty_commitment_tx(counterparty_initial_bitcoin_tx.txid, Vec::new(), self.cur_counterparty_commitment_transaction_number, self.counterparty_cur_commitment_point.unwrap(), logger);
 
@@ -4825,7 +4825,7 @@ mod tests {
        use bitcoin::network::constants::Network;
        use bitcoin::hashes::hex::FromHex;
        use hex;
-       use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
+       use ln::channelmanager::{BestBlock, HTLCSource, PaymentPreimage, PaymentHash};
        use ln::channel::{Channel,InboundHTLCOutput,OutboundHTLCOutput,InboundHTLCState,OutboundHTLCState,HTLCOutputInCommitment,HTLCCandidate,HTLCInitiator,TxCreationKeys};
        use ln::channel::MAX_FUNDING_SATOSHIS;
        use ln::features::InitFeatures;
@@ -5038,8 +5038,8 @@ mod tests {
                let secp_ctx = Secp256k1::new();
                let seed = [42; 32];
                let network = Network::Testnet;
-               let chain_hash = genesis_block(network).header.block_hash();
-               let last_block_hash = chain_hash;
+               let best_block = BestBlock::from_genesis(network);
+               let chain_hash = best_block.block_hash();
                let keys_provider = test_utils::TestKeysInterface::new(&seed, network);
 
                // Go through the flow of opening a channel between two nodes.
@@ -5065,10 +5065,10 @@ mod tests {
                }]};
                let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
                let funding_created_msg = node_a_chan.get_outbound_funding_created(tx.clone(), funding_outpoint, &&logger).unwrap();
-               let (funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, last_block_hash, &&logger).unwrap();
+               let (funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&logger).unwrap();
 
                // Node B --> Node A: funding signed
-               let _ = node_a_chan.funding_signed(&funding_signed_msg, last_block_hash, &&logger);
+               let _ = node_a_chan.funding_signed(&funding_signed_msg, best_block, &&logger);
 
                // Now disconnect the two nodes and check that the commitment point in
                // Node B's channel_reestablish message is sane.
index f9e00bc5d035a3f1742f69db9c55e9ee15d84f7c..39426b3fa7b2656b7667c29b94d94c50d6b4cc96 100644 (file)
@@ -352,9 +352,6 @@ struct PeerState {
        latest_features: InitFeatures,
 }
 
-#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
-const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assume they're the same) for ChannelManager::latest_block_height";
-
 /// SimpleArcChannelManager is useful when you need a ChannelManager with a static lifetime, e.g.
 /// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
 /// lifetimes). Other times you can afford a reference, which is more efficient, in which case
@@ -424,10 +421,9 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
        tx_broadcaster: T,
 
        #[cfg(test)]
-       pub(super) latest_block_height: AtomicUsize,
+       pub(super) best_block: RwLock<BestBlock>,
        #[cfg(not(test))]
-       latest_block_height: AtomicUsize,
-       last_block_hash: RwLock<BlockHash>,
+       best_block: RwLock<BestBlock>,
        secp_ctx: Secp256k1<secp256k1::All>,
 
        #[cfg(any(test, feature = "_test_utils"))]
@@ -475,13 +471,38 @@ pub struct ChainParameters {
        /// The network for determining the `chain_hash` in Lightning messages.
        pub network: Network,
 
-       /// The hash of the latest block successfully connected.
-       pub latest_hash: BlockHash,
-
-       /// The height of the latest block successfully connected.
+       /// The hash and height of the latest block successfully connected.
        ///
        /// Used to track on-chain channel funding outputs and send payments with reliable timelocks.
-       pub latest_height: usize,
+       pub best_block: BestBlock,
+}
+
+/// The best known block as identified by its hash and height.
+#[derive(Clone, Copy)]
+pub struct BestBlock {
+       block_hash: BlockHash,
+       height: u32,
+}
+
+impl BestBlock {
+       /// Returns the best block from the genesis of the given network.
+       pub fn from_genesis(network: Network) -> Self {
+               BestBlock {
+                       block_hash: genesis_block(network).header.block_hash(),
+                       height: 0,
+               }
+       }
+
+       /// Returns the best block as identified by the given block hash and height.
+       pub fn new(block_hash: BlockHash, height: u32) -> Self {
+               BestBlock { block_hash, height }
+       }
+
+       /// Returns the best block hash.
+       pub fn block_hash(&self) -> BlockHash { self.block_hash }
+
+       /// Returns the best block height.
+       pub fn height(&self) -> u32 { self.height }
 }
 
 /// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is
@@ -822,8 +843,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        chain_monitor,
                        tx_broadcaster,
 
-                       latest_block_height: AtomicUsize::new(params.latest_height),
-                       last_block_hash: RwLock::new(params.latest_hash),
+                       best_block: RwLock::new(params.best_block),
 
                        channel_state: Mutex::new(ChannelHolder{
                                by_id: HashMap::new(),
@@ -1176,7 +1196,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                // HTLC_FAIL_BACK_BUFFER blocks to go.
                                // Also, ensure that, in the case of an unknown payment hash, our payment logic has enough time to fail the HTLC backward
                                // before our onchain logic triggers a channel closure (see HTLC_FAIL_BACK_BUFFER rational).
-                               if (msg.cltv_expiry as u64) <= self.latest_block_height.load(Ordering::Acquire) as u64 + HTLC_FAIL_BACK_BUFFER as u64 + 1 {
+                               if (msg.cltv_expiry as u64) <= self.best_block.read().unwrap().height() as u64 + HTLC_FAIL_BACK_BUFFER as u64 + 1 {
                                        return_err!("The final CLTV expiry is too soon to handle", 17, &[0;0]);
                                }
                                // final_incorrect_htlc_amount
@@ -1299,7 +1319,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if (msg.cltv_expiry as u64) < (*outgoing_cltv_value) as u64 + chan.get_cltv_expiry_delta() as u64 { // incorrect_cltv_expiry
                                                break Some(("Forwarding node has tampered with the intended HTLC values or origin node has an obsolete cltv_expiry_delta", 0x1000 | 13, Some(self.get_channel_update(chan).unwrap())));
                                        }
-                                       let cur_height = self.latest_block_height.load(Ordering::Acquire) as u32 + 1;
+                                       let cur_height = self.best_block.read().unwrap().height() + 1;
                                        // Theoretically, channel counterparty shouldn't send us a HTLC expiring now, but we want to be robust wrt to counterparty
                                        // packet sanitization (see HTLC_FAIL_BACK_BUFFER rational)
                                        if msg.cltv_expiry <= cur_height + HTLC_FAIL_BACK_BUFFER as u32 { // expiry_too_soon
@@ -1516,7 +1536,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        return Err(PaymentSendFailure::PathParameterError(path_errs));
                }
 
-               let cur_height = self.latest_block_height.load(Ordering::Acquire) as u32 + 1;
+               let cur_height = self.best_block.read().unwrap().height() + 1;
                let mut results = Vec::new();
                for path in route.paths.iter() {
                        results.push(self.send_payment_along_path(&path, &payment_hash, payment_secret, total_value, cur_height));
@@ -1910,10 +1930,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                                for htlc in htlcs.iter() {
                                                                                        let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
                                                                                        htlc_msat_height_data.extend_from_slice(
-                                                                                               &byte_utils::be32_to_array(
-                                                                                                       self.latest_block_height.load(Ordering::Acquire)
-                                                                                                               as u32,
-                                                                                               ),
+                                                                                               &byte_utils::be32_to_array(self.best_block.read().unwrap().height()),
                                                                                        );
                                                                                        failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData {
                                                                                                        short_channel_id: htlc.prev_hop.short_channel_id,
@@ -2033,8 +2050,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                if channel_state.is_none() { channel_state = Some(self.channel_state.lock().unwrap()); }
                                let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
                                htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(
-                                       self.latest_block_height.load(Ordering::Acquire) as u32,
-                               ));
+                                               self.best_block.read().unwrap().height()));
                                self.fail_htlc_backwards_internal(channel_state.take().unwrap(),
                                                HTLCSource::PreviousHopData(htlc.prev_hop), payment_hash,
                                                HTLCFailReason::Reason { failure_code: 0x4000 | 15, data: htlc_msat_height_data });
@@ -2248,8 +2264,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                if (is_mpp && !valid_mpp) || (!is_mpp && (htlc.value < expected_amount || htlc.value > expected_amount * 2)) {
                                        let mut htlc_msat_height_data = byte_utils::be64_to_array(htlc.value).to_vec();
                                        htlc_msat_height_data.extend_from_slice(&byte_utils::be32_to_array(
-                                               self.latest_block_height.load(Ordering::Acquire) as u32,
-                                       ));
+                                                       self.best_block.read().unwrap().height()));
                                        self.fail_htlc_backwards_internal(channel_state.take().unwrap(),
                                                                         HTLCSource::PreviousHopData(htlc.prev_hop), &payment_hash,
                                                                         HTLCFailReason::Reason { failure_code: 0x4000|15, data: htlc_msat_height_data });
@@ -2534,7 +2549,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
        fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
                let ((funding_msg, monitor), mut chan) = {
-                       let last_block_hash = *self.last_block_hash.read().unwrap();
+                       let best_block = *self.best_block.read().unwrap();
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_lock;
                        match channel_state.by_id.entry(msg.temporary_channel_id.clone()) {
@@ -2542,7 +2557,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.temporary_channel_id));
                                        }
-                                       (try_chan_entry!(self, chan.get_mut().funding_created(msg, last_block_hash, &self.logger), channel_state, chan), chan.remove())
+                                       (try_chan_entry!(self, chan.get_mut().funding_created(msg, best_block, &self.logger), channel_state, chan), chan.remove())
                                },
                                hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.temporary_channel_id))
                        }
@@ -2591,7 +2606,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
        fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
                let funding_tx = {
-                       let last_block_hash = *self.last_block_hash.read().unwrap();
+                       let best_block = *self.best_block.read().unwrap();
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_lock;
                        match channel_state.by_id.entry(msg.channel_id) {
@@ -2599,7 +2614,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
-                                       let (monitor, funding_tx) = match chan.get_mut().funding_signed(&msg, last_block_hash, &self.logger) {
+                                       let (monitor, funding_tx) = match chan.get_mut().funding_signed(&msg, best_block, &self.logger) {
                                                Ok(update) => update,
                                                Err(e) => try_chan_entry!(self, Err(e), channel_state, chan),
                                        };
@@ -3339,24 +3354,30 @@ where
        L::Target: Logger,
 {
        fn block_connected(&self, block: &Block, height: u32) {
-               assert_eq!(*self.last_block_hash.read().unwrap(), block.header.prev_blockhash,
-                       "Blocks must be connected in chain-order - the connected header must build on the last connected header");
-               assert_eq!(self.latest_block_height.load(Ordering::Acquire) as u64, height as u64 - 1,
-                       "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
+               {
+                       let best_block = self.best_block.read().unwrap();
+                       assert_eq!(best_block.block_hash(), block.header.prev_blockhash,
+                               "Blocks must be connected in chain-order - the connected header must build on the last connected header");
+                       assert_eq!(best_block.height(), height - 1,
+                               "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
+               }
+
                let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
                self.transactions_confirmed(&block.header, height, &txdata);
                self.update_best_block(&block.header, height);
        }
 
        fn block_disconnected(&self, header: &BlockHeader, height: u32) {
-               assert_eq!(*self.last_block_hash.read().unwrap(), header.block_hash(),
-                       "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
-
                let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
-               let new_height = self.latest_block_height.fetch_sub(1, Ordering::AcqRel) as u32 - 1;
-               assert_eq!(new_height, height - 1,
-                       "Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
-               *self.last_block_hash.write().unwrap() = header.prev_blockhash;
+               let new_height = height - 1;
+               {
+                       let mut best_block = self.best_block.write().unwrap();
+                       assert_eq!(best_block.block_hash(), header.block_hash(),
+                               "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
+                       assert_eq!(best_block.height(), height,
+                               "Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
+                       *best_block = BestBlock::new(header.prev_blockhash, new_height)
+               }
 
                self.do_chain_event(Some(new_height), |channel| channel.update_best_block(new_height, header.time));
        }
@@ -3513,8 +3534,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
                let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
 
-               self.latest_block_height.store(height as usize, Ordering::Release);
-               *self.last_block_hash.write().unwrap() = block_hash;
+               *self.best_block.write().unwrap() = BestBlock::new(block_hash, height);
 
                self.do_chain_event(Some(height), |channel| channel.update_best_block(height, header.time));
 
@@ -4147,8 +4167,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
                writer.write_all(&[MIN_SERIALIZATION_VERSION; 1])?;
 
                self.genesis_hash.write(writer)?;
-               (self.latest_block_height.load(Ordering::Acquire) as u32).write(writer)?;
-               self.last_block_hash.read().unwrap().write(writer)?;
+               {
+                       let best_block = self.best_block.read().unwrap();
+                       best_block.height().write(writer)?;
+                       best_block.block_hash().write(writer)?;
+               }
 
                let channel_state = self.channel_state.lock().unwrap();
                let mut unfunded_channels = 0;
@@ -4340,8 +4363,8 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                }
 
                let genesis_hash: BlockHash = Readable::read(reader)?;
-               let latest_block_height: u32 = Readable::read(reader)?;
-               let last_block_hash: BlockHash = Readable::read(reader)?;
+               let best_block_height: u32 = Readable::read(reader)?;
+               let best_block_hash: BlockHash = Readable::read(reader)?;
 
                let mut failed_htlcs = Vec::new();
 
@@ -4449,8 +4472,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                        chain_monitor: args.chain_monitor,
                        tx_broadcaster: args.tx_broadcaster,
 
-                       latest_block_height: AtomicUsize::new(latest_block_height as usize),
-                       last_block_hash: RwLock::new(last_block_hash),
+                       best_block: RwLock::new(BestBlock::new(best_block_hash, best_block_height)),
 
                        channel_state: Mutex::new(ChannelHolder {
                                by_id,
@@ -4484,7 +4506,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                //TODO: Broadcast channel update for closed channels, but only after we've made a
                //connection or two.
 
-               Ok((last_block_hash.clone(), channel_manager))
+               Ok((best_block_hash.clone(), channel_manager))
        }
 }
 
@@ -4545,7 +4567,7 @@ pub mod bench {
        use chain::chainmonitor::ChainMonitor;
        use chain::channelmonitor::Persist;
        use chain::keysinterface::{KeysManager, InMemorySigner};
-       use ln::channelmanager::{ChainParameters, ChannelManager, PaymentHash, PaymentPreimage};
+       use ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage};
        use ln::features::InitFeatures;
        use ln::functional_test_utils::*;
        use ln::msgs::ChannelMessageHandler;
@@ -4597,8 +4619,7 @@ pub mod bench {
                let keys_manager_a = KeysManager::new(&seed_a, 42, 42);
                let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &logger_a, &keys_manager_a, config.clone(), ChainParameters {
                        network,
-                       latest_hash: genesis_hash,
-                       latest_height: 0,
+                       best_block: BestBlock::from_genesis(network),
                });
                let node_a_holder = NodeHolder { node: &node_a };
 
@@ -4608,8 +4629,7 @@ pub mod bench {
                let keys_manager_b = KeysManager::new(&seed_b, 42, 42);
                let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &logger_b, &keys_manager_b, config.clone(), ChainParameters {
                        network,
-                       latest_hash: genesis_hash,
-                       latest_height: 0,
+                       best_block: BestBlock::from_genesis(network),
                });
                let node_b_holder = NodeHolder { node: &node_b };
 
index e3f5003aaf67f58ccbf91e9b112dbf28525b387e..dee3c6ea3957b4ccf5b84c907df6bb2888bee40e 100644 (file)
@@ -13,7 +13,7 @@
 use chain::{Listen, Watch};
 use chain::channelmonitor::ChannelMonitor;
 use chain::transaction::OutPoint;
-use ln::channelmanager::{ChainParameters, ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
+use ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
 use routing::router::{Route, get_route};
 use routing::network_graph::{NetGraphMsgHandler, NetworkGraph};
 use ln::features::InitFeatures;
@@ -122,21 +122,25 @@ pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block)
        do_connect_block(node, block, false);
 }
 
-fn do_connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, skip_manager: bool) {
-       let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
+fn do_connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, skip_intermediaries: bool) {
        let height = node.best_block_info().1 + 1;
-       node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height);
-       if !skip_manager {
+       if !skip_intermediaries {
+               let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
                match *node.connect_style.borrow() {
                        ConnectStyle::BestBlockFirst|ConnectStyle::BestBlockFirstSkippingBlocks => {
+                               node.chain_monitor.chain_monitor.update_best_block(&block.header, height);
+                               node.chain_monitor.chain_monitor.transactions_confirmed(&block.header, &txdata, height);
                                node.node.update_best_block(&block.header, height);
-                               node.node.transactions_confirmed(&block.header, height, &block.txdata.iter().enumerate().collect::<Vec<_>>());
+                               node.node.transactions_confirmed(&block.header, height, &txdata);
                        },
                        ConnectStyle::TransactionsFirst|ConnectStyle::TransactionsFirstSkippingBlocks => {
-                               node.node.transactions_confirmed(&block.header, height, &block.txdata.iter().enumerate().collect::<Vec<_>>());
+                               node.chain_monitor.chain_monitor.transactions_confirmed(&block.header, &txdata, height);
+                               node.chain_monitor.chain_monitor.update_best_block(&block.header, height);
+                               node.node.transactions_confirmed(&block.header, height, &txdata);
                                node.node.update_best_block(&block.header, height);
                        },
                        ConnectStyle::FullBlockViaListen => {
+                               node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height);
                                Listen::block_connected(node.node, &block, height);
                        }
                }
@@ -151,17 +155,19 @@ pub fn disconnect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, count: u32)
                assert!(orig_header.1 > 0); // Cannot disconnect genesis
                let prev_header = node.blocks.borrow().last().unwrap().clone();
 
-               node.chain_monitor.chain_monitor.block_disconnected(&orig_header.0, orig_header.1);
                match *node.connect_style.borrow() {
                        ConnectStyle::FullBlockViaListen => {
+                               node.chain_monitor.chain_monitor.block_disconnected(&orig_header.0, orig_header.1);
                                Listen::block_disconnected(node.node, &orig_header.0, orig_header.1);
                        },
                        ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks => {
                                if i == count - 1 {
+                                       node.chain_monitor.chain_monitor.update_best_block(&prev_header.0, prev_header.1);
                                        node.node.update_best_block(&prev_header.0, prev_header.1);
                                }
                        },
                        _ => {
+                               node.chain_monitor.chain_monitor.update_best_block(&prev_header.0, prev_header.1);
                                node.node.update_best_block(&prev_header.0, prev_header.1);
                        },
                }
@@ -1279,8 +1285,7 @@ pub fn create_node_chanmgrs<'a, 'b>(node_count: usize, cfgs: &'a Vec<NodeCfg<'b>
                let network = Network::Testnet;
                let params = ChainParameters {
                        network,
-                       latest_hash: genesis_block(network).header.block_hash(),
-                       latest_height: 0,
+                       best_block: BestBlock::from_genesis(network),
                };
                let node = ChannelManager::new(cfgs[i].fee_estimator, &cfgs[i].chain_monitor, cfgs[i].tx_broadcaster, cfgs[i].logger, cfgs[i].keys_manager, if node_config[i].is_some() { node_config[i].clone().unwrap() } else { default_config }, params);
                chanmgrs.push(node);
index 2ab48831b2cb8256bdcf3247ccc9edab03573fc3..9a0c06e776cf4db266050b7986d08dc7fab67d2b 100644 (file)
@@ -51,7 +51,6 @@ use regex;
 use std::collections::{BTreeSet, HashMap, HashSet};
 use std::default::Default;
 use std::sync::Mutex;
-use std::sync::atomic::Ordering;
 
 use ln::functional_test_utils::*;
 use ln::chan_utils::CommitmentTransaction;
@@ -1585,7 +1584,7 @@ fn test_fee_spike_violation_fails_htlc() {
        let secp_ctx = Secp256k1::new();
        let session_priv = SecretKey::from_slice(&[42; 32]).expect("RNG is bad!");
 
-       let cur_height = nodes[1].node.latest_block_height.load(Ordering::Acquire) as u32 + 1;
+       let cur_height = nodes[1].node.best_block.read().unwrap().height() + 1;
 
        let onion_keys = onion_utils::construct_onion_keys(&secp_ctx, &route.paths[0], &session_priv).unwrap();
        let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route.paths[0], 3460001, &None, cur_height).unwrap();
@@ -1756,7 +1755,7 @@ fn test_chan_reserve_violation_inbound_htlc_outbound_channel() {
        // Need to manually create the update_add_htlc message to go around the channel reserve check in send_htlc()
        let secp_ctx = Secp256k1::new();
        let session_priv = SecretKey::from_slice(&[42; 32]).unwrap();
-       let cur_height = nodes[1].node.latest_block_height.load(Ordering::Acquire) as u32 + 1;
+       let cur_height = nodes[1].node.best_block.read().unwrap().height() + 1;
        let onion_keys = onion_utils::construct_onion_keys(&secp_ctx, &route.paths[0], &session_priv).unwrap();
        let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route.paths[0], 1000, &None, cur_height).unwrap();
        let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, [0; 32], &payment_hash);
@@ -1879,7 +1878,7 @@ fn test_chan_reserve_violation_inbound_htlc_inbound_chan() {
        // Need to manually create the update_add_htlc message to go around the channel reserve check in send_htlc()
        let secp_ctx = Secp256k1::new();
        let session_priv = SecretKey::from_slice(&[42; 32]).unwrap();
-       let cur_height = nodes[0].node.latest_block_height.load(Ordering::Acquire) as u32 + 1;
+       let cur_height = nodes[0].node.best_block.read().unwrap().height() + 1;
        let onion_keys = onion_utils::construct_onion_keys(&secp_ctx, &route_2.paths[0], &session_priv).unwrap();
        let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route_2.paths[0], recv_value_2, &None, cur_height).unwrap();
        let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, [0; 32], &our_payment_hash_1);
@@ -2941,8 +2940,7 @@ fn test_htlc_on_chain_success() {
        check_tx_local_broadcast!(nodes[0], true, commitment_tx[0], chan_1.3);
 }
 
-#[test]
-fn test_htlc_on_chain_timeout() {
+fn do_test_htlc_on_chain_timeout(connect_style: ConnectStyle) {
        // Test that in case of a unilateral close onchain, we detect the state of output and
        // timeout the HTLC backward accordingly. So here we test that ChannelManager is
        // broadcasting the right event to other nodes in payment path.
@@ -2954,7 +2952,10 @@ fn test_htlc_on_chain_timeout() {
        let chanmon_cfgs = create_chanmon_cfgs(3);
        let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
        let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
-       let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+       let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+       *nodes[0].connect_style.borrow_mut() = connect_style;
+       *nodes[1].connect_style.borrow_mut() = connect_style;
+       *nodes[2].connect_style.borrow_mut() = connect_style;
 
        // Create some intial channels
        let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
@@ -3068,6 +3069,13 @@ fn test_htlc_on_chain_timeout() {
        assert_eq!(node_txn[2].clone().input[0].witness.last().unwrap().len(), OFFERED_HTLC_SCRIPT_WEIGHT);
 }
 
+#[test]
+fn test_htlc_on_chain_timeout() {
+       do_test_htlc_on_chain_timeout(ConnectStyle::BestBlockFirstSkippingBlocks);
+       do_test_htlc_on_chain_timeout(ConnectStyle::TransactionsFirstSkippingBlocks);
+       do_test_htlc_on_chain_timeout(ConnectStyle::FullBlockViaListen);
+}
+
 #[test]
 fn test_simple_commitment_revoked_fail_backward() {
        // Test that in case of a revoked commitment tx, we detect the resolution of output by justice tx
@@ -3400,7 +3408,7 @@ fn fail_backward_pending_htlc_upon_channel_failure() {
 
                let secp_ctx = Secp256k1::new();
                let session_priv = SecretKey::from_slice(&[42; 32]).unwrap();
-               let current_height = nodes[1].node.latest_block_height.load(Ordering::Acquire) as u32 + 1;
+               let current_height = nodes[1].node.best_block.read().unwrap().height() + 1;
                let net_graph_msg_handler = &nodes[1].net_graph_msg_handler;
                let route = get_route(&nodes[1].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[0].node.get_our_node_id(), None, None, &Vec::new(), 50_000, TEST_FINAL_CLTV, &logger).unwrap();
                let (onion_payloads, _amount_msat, cltv_expiry) = onion_utils::build_onion_payloads(&route.paths[0], 50_000, &None, current_height).unwrap();
@@ -6504,7 +6512,7 @@ fn test_update_add_htlc_bolt2_receiver_check_max_htlc_limit() {
        let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
        let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), None, None, &[], 3999999, TEST_FINAL_CLTV, &logger).unwrap();
 
-       let cur_height = nodes[0].node.latest_block_height.load(Ordering::Acquire) as u32 + 1;
+       let cur_height = nodes[0].node.best_block.read().unwrap().height() + 1;
        let onion_keys = onion_utils::construct_onion_keys(&Secp256k1::signing_only(), &route.paths[0], &session_priv).unwrap();
        let (onion_payloads, _htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(&route.paths[0], 3999999, &None, cur_height).unwrap();
        let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, [0; 32], &our_payment_hash);
index f3d8b2e9489d5078dc79dbc42be835fa54fe56f5..8e531b501c3c0ede233154596e08d340a39aef35 100644 (file)
@@ -32,16 +32,37 @@ use util::logger::Logger;
 use util::ser::{Readable, ReadableArgs, Writer, Writeable, VecWriter};
 use util::byte_utils;
 
-use std::collections::{HashMap, hash_map};
+use std::collections::HashMap;
 use std::cmp;
 use std::ops::Deref;
 use std::mem::replace;
 
 const MAX_ALLOC_SIZE: usize = 64*1024;
 
+/// An entry for an [`OnchainEvent`], stating the block height when the event was observed and the
+/// transaction causing it.
+///
+/// Used to determine when the on-chain event can be considered safe from a chain reorganization.
+#[derive(PartialEq)]
+struct OnchainEventEntry {
+       txid: Txid,
+       height: u32,
+       event: OnchainEvent,
+}
+
+impl OnchainEventEntry {
+       fn confirmation_threshold(&self) -> u32 {
+               self.height + ANTI_REORG_DELAY - 1
+       }
+
+       fn has_reached_confirmation_threshold(&self, height: u32) -> bool {
+               height >= self.confirmation_threshold()
+       }
+}
+
 /// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it
 /// once they mature to enough confirmations (ANTI_REORG_DELAY)
-#[derive(Clone, PartialEq)]
+#[derive(PartialEq)]
 enum OnchainEvent {
        /// Outpoint under claim process by our own tx, once this one get enough confirmations, we remove it from
        /// bump-txn candidate buffer.
@@ -280,7 +301,7 @@ pub struct OnchainTxHandler<ChannelSigner: Sign> {
        #[cfg(not(test))]
        claimable_outpoints: HashMap<BitcoinOutPoint, (Txid, u32)>,
 
-       onchain_events_waiting_threshold_conf: HashMap<u32, Vec<OnchainEvent>>,
+       onchain_events_awaiting_threshold_conf: Vec<OnchainEventEntry>,
 
        latest_height: u32,
 
@@ -317,21 +338,19 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                        claim_and_height.1.write(writer)?;
                }
 
-               writer.write_all(&byte_utils::be64_to_array(self.onchain_events_waiting_threshold_conf.len() as u64))?;
-               for (ref target, ref events) in self.onchain_events_waiting_threshold_conf.iter() {
-                       writer.write_all(&byte_utils::be32_to_array(**target))?;
-                       writer.write_all(&byte_utils::be64_to_array(events.len() as u64))?;
-                       for ev in events.iter() {
-                               match *ev {
-                                       OnchainEvent::Claim { ref claim_request } => {
-                                               writer.write_all(&[0; 1])?;
-                                               claim_request.write(writer)?;
-                                       },
-                                       OnchainEvent::ContentiousOutpoint { ref outpoint, ref input_material } => {
-                                               writer.write_all(&[1; 1])?;
-                                               outpoint.write(writer)?;
-                                               input_material.write(writer)?;
-                                       }
+               writer.write_all(&byte_utils::be64_to_array(self.onchain_events_awaiting_threshold_conf.len() as u64))?;
+               for ref entry in self.onchain_events_awaiting_threshold_conf.iter() {
+                       entry.txid.write(writer)?;
+                       writer.write_all(&byte_utils::be32_to_array(entry.height))?;
+                       match entry.event {
+                               OnchainEvent::Claim { ref claim_request } => {
+                                       writer.write_all(&[0; 1])?;
+                                       claim_request.write(writer)?;
+                               },
+                               OnchainEvent::ContentiousOutpoint { ref outpoint, ref input_material } => {
+                                       writer.write_all(&[1; 1])?;
+                                       outpoint.write(writer)?;
+                                       input_material.write(writer)?;
                                }
                        }
                }
@@ -377,32 +396,28 @@ impl<'a, K: KeysInterface> ReadableArgs<&'a K> for OnchainTxHandler<K::Signer> {
                        claimable_outpoints.insert(outpoint, (ancestor_claim_txid, height));
                }
                let waiting_threshold_conf_len: u64 = Readable::read(reader)?;
-               let mut onchain_events_waiting_threshold_conf = HashMap::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
+               let mut onchain_events_awaiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128));
                for _ in 0..waiting_threshold_conf_len {
-                       let height_target = Readable::read(reader)?;
-                       let events_len: u64 = Readable::read(reader)?;
-                       let mut events = Vec::with_capacity(cmp::min(events_len as usize, MAX_ALLOC_SIZE / 128));
-                       for _ in 0..events_len {
-                               let ev = match <u8 as Readable>::read(reader)? {
-                                       0 => {
-                                               let claim_request = Readable::read(reader)?;
-                                               OnchainEvent::Claim {
-                                                       claim_request
-                                               }
-                                       },
-                                       1 => {
-                                               let outpoint = Readable::read(reader)?;
-                                               let input_material = Readable::read(reader)?;
-                                               OnchainEvent::ContentiousOutpoint {
-                                                       outpoint,
-                                                       input_material
-                                               }
+                       let txid = Readable::read(reader)?;
+                       let height = Readable::read(reader)?;
+                       let event = match <u8 as Readable>::read(reader)? {
+                               0 => {
+                                       let claim_request = Readable::read(reader)?;
+                                       OnchainEvent::Claim {
+                                               claim_request
                                        }
-                                       _ => return Err(DecodeError::InvalidValue),
-                               };
-                               events.push(ev);
-                       }
-                       onchain_events_waiting_threshold_conf.insert(height_target, events);
+                               },
+                               1 => {
+                                       let outpoint = Readable::read(reader)?;
+                                       let input_material = Readable::read(reader)?;
+                                       OnchainEvent::ContentiousOutpoint {
+                                               outpoint,
+                                               input_material
+                                       }
+                               }
+                               _ => return Err(DecodeError::InvalidValue),
+                       };
+                       onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { txid, height, event });
                }
                let latest_height = Readable::read(reader)?;
 
@@ -419,7 +434,7 @@ impl<'a, K: KeysInterface> ReadableArgs<&'a K> for OnchainTxHandler<K::Signer> {
                        channel_transaction_parameters: channel_parameters,
                        claimable_outpoints,
                        pending_claim_requests,
-                       onchain_events_waiting_threshold_conf,
+                       onchain_events_awaiting_threshold_conf,
                        latest_height,
                        secp_ctx,
                })
@@ -438,7 +453,7 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                        channel_transaction_parameters: channel_parameters,
                        pending_claim_requests: HashMap::new(),
                        claimable_outpoints: HashMap::new(),
-                       onchain_events_waiting_threshold_conf: HashMap::new(),
+                       onchain_events_awaiting_threshold_conf: Vec::new(),
                        latest_height: 0,
 
                        secp_ctx,
@@ -756,16 +771,13 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
 
                                                macro_rules! clean_claim_request_after_safety_delay {
                                                        () => {
-                                                               let new_event = OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() };
-                                                               match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                                                                       hash_map::Entry::Occupied(mut entry) => {
-                                                                               if !entry.get().contains(&new_event) {
-                                                                                       entry.get_mut().push(new_event);
-                                                                               }
-                                                                       },
-                                                                       hash_map::Entry::Vacant(entry) => {
-                                                                               entry.insert(vec![new_event]);
-                                                                       }
+                                                               let entry = OnchainEventEntry {
+                                                                       txid: tx.txid(),
+                                                                       height,
+                                                                       event: OnchainEvent::Claim { claim_request: first_claim_txid_height.0.clone() }
+                                                               };
+                                                               if !self.onchain_events_awaiting_threshold_conf.contains(&entry) {
+                                                                       self.onchain_events_awaiting_threshold_conf.push(entry);
                                                                }
                                                        }
                                                }
@@ -799,24 +811,23 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                                }
                        }
                        for (outpoint, input_material) in claimed_outputs_material.drain(..) {
-                               let new_event = OnchainEvent::ContentiousOutpoint { outpoint, input_material };
-                               match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) {
-                                       hash_map::Entry::Occupied(mut entry) => {
-                                               if !entry.get().contains(&new_event) {
-                                                       entry.get_mut().push(new_event);
-                                               }
-                                       },
-                                       hash_map::Entry::Vacant(entry) => {
-                                               entry.insert(vec![new_event]);
-                                       }
+                               let entry = OnchainEventEntry {
+                                       txid: tx.txid(),
+                                       height,
+                                       event: OnchainEvent::ContentiousOutpoint { outpoint, input_material },
+                               };
+                               if !self.onchain_events_awaiting_threshold_conf.contains(&entry) {
+                                       self.onchain_events_awaiting_threshold_conf.push(entry);
                                }
                        }
                }
 
                // After security delay, either our claim tx got enough confs or outpoint is definetely out of reach
-               if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&height) {
-                       for ev in events {
-                               match ev {
+               let onchain_events_awaiting_threshold_conf =
+                       self.onchain_events_awaiting_threshold_conf.drain(..).collect::<Vec<_>>();
+               for entry in onchain_events_awaiting_threshold_conf {
+                       if entry.has_reached_confirmation_threshold(height) {
+                               match entry.event {
                                        OnchainEvent::Claim { claim_request } => {
                                                // We may remove a whole set of claim outpoints here, as these one may have
                                                // been aggregated in a single tx and claimed so atomically
@@ -830,13 +841,15 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                                                self.claimable_outpoints.remove(&outpoint);
                                        }
                                }
+                       } else {
+                               self.onchain_events_awaiting_threshold_conf.push(entry);
                        }
                }
 
                // Check if any pending claim request must be rescheduled
                for (first_claim_txid, ref claim_data) in self.pending_claim_requests.iter() {
-                       if let Some(h) = claim_data.height_timer {
-                               if h == height {
+                       if let Some(height_timer) = claim_data.height_timer {
+                               if height >= height_timer {
                                        bump_candidates.insert(*first_claim_txid, (*claim_data).clone());
                                }
                        }
@@ -856,17 +869,43 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                }
        }
 
+       pub(crate) fn transaction_unconfirmed<B: Deref, F: Deref, L: Deref>(
+               &mut self,
+               txid: &Txid,
+               broadcaster: B,
+               fee_estimator: F,
+               logger: L,
+       ) where
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+               L::Target: Logger,
+       {
+               let mut height = None;
+               for entry in self.onchain_events_awaiting_threshold_conf.iter() {
+                       if entry.txid == *txid {
+                               height = Some(entry.height);
+                               break;
+                       }
+               }
+
+               if let Some(height) = height {
+                       self.block_disconnected(height, broadcaster, fee_estimator, logger);
+               }
+       }
+
        pub(crate) fn block_disconnected<B: Deref, F: Deref, L: Deref>(&mut self, height: u32, broadcaster: B, fee_estimator: F, logger: L)
                where B::Target: BroadcasterInterface,
                      F::Target: FeeEstimator,
                                        L::Target: Logger,
        {
                let mut bump_candidates = HashMap::new();
-               if let Some(events) = self.onchain_events_waiting_threshold_conf.remove(&(height + ANTI_REORG_DELAY - 1)) {
-                       //- our claim tx on a commitment tx output
-                       //- resurect outpoint back in its claimable set and regenerate tx
-                       for ev in events {
-                               match ev {
+               let onchain_events_awaiting_threshold_conf =
+                       self.onchain_events_awaiting_threshold_conf.drain(..).collect::<Vec<_>>();
+               for entry in onchain_events_awaiting_threshold_conf {
+                       if entry.height >= height {
+                               //- our claim tx on a commitment tx output
+                               //- resurect outpoint back in its claimable set and regenerate tx
+                               match entry.event {
                                        OnchainEvent::ContentiousOutpoint { outpoint, input_material } => {
                                                if let Some(ancestor_claimable_txid) = self.claimable_outpoints.get(&outpoint) {
                                                        if let Some(claim_material) = self.pending_claim_requests.get_mut(&ancestor_claimable_txid.0) {
@@ -879,6 +918,8 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                                        },
                                        _ => {},
                                }
+                       } else {
+                               self.onchain_events_awaiting_threshold_conf.push(entry);
                        }
                }
                for (_, claim_material) in bump_candidates.iter_mut() {
@@ -895,7 +936,7 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                // right now if one of the outpoint get disconnected, just erase whole pending claim request.
                let mut remove_request = Vec::new();
                self.claimable_outpoints.retain(|_, ref v|
-                       if v.1 == height {
+                       if v.1 >= height {
                        remove_request.push(v.0.clone());
                        false
                        } else { true });
@@ -904,6 +945,16 @@ impl<ChannelSigner: Sign> OnchainTxHandler<ChannelSigner> {
                }
        }
 
+       pub(crate) fn get_relevant_txids(&self) -> Vec<Txid> {
+               let mut txids: Vec<Txid> = self.onchain_events_awaiting_threshold_conf
+                       .iter()
+                       .map(|entry| entry.txid)
+                       .collect();
+               txids.sort_unstable();
+               txids.dedup();
+               txids
+       }
+
        pub(crate) fn provide_latest_holder_tx(&mut self, tx: HolderCommitmentTransaction) {
                self.prev_holder_commitment = Some(replace(&mut self.holder_commitment, tx));
                self.holder_htlc_sigs = None;