Merge pull request #838 from TheBlueMatt/2021-03-skip-blocks
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Mon, 5 Apr 2021 22:12:45 +0000 (22:12 +0000)
committerGitHub <noreply@github.com>
Mon, 5 Apr 2021 22:12:45 +0000 (22:12 +0000)
Make `Channel`'s block connection API more electrum-friendly

1  2 
fuzz/src/chanmon_consistency.rs
lightning-persister/src/lib.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs

index e7cd990086163910ebd061f798325ac301ee9950,55c2ffd57ff56982898e55c801893f1099fbd4a2..87b95cf2a538a972e882a2d2ad0ca6fb2bd99c47
@@@ -234,7 -234,7 +234,7 @@@ fn check_api_err(api_err: APIError) 
                                _ if err.starts_with("Cannot send value that would put our balance under counterparty-announced channel reserve value") => {},
                                _ if err.starts_with("Cannot send value that would overdraw remaining funds.") => {},
                                _ if err.starts_with("Cannot send value that would not leave enough to pay for fees.") => {},
 -                              _ => panic!(err),
 +                              _ => panic!("{}", err),
                        }
                },
                APIError::MonitorUpdateFailed => {
@@@ -435,11 -435,11 +435,11 @@@ pub fn do_test<Out: test_logger::Output
                        let chain_hash = genesis_block(Network::Bitcoin).block_hash();
                        let mut header = BlockHeader { version: 0x20000000, prev_blockhash: chain_hash, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                        let txdata: Vec<_> = channel_txn.iter().enumerate().map(|(i, tx)| (i + 1, tx)).collect();
-                       $node.block_connected(&header, &txdata, 1);
-                       for i in 2..100 {
+                       $node.transactions_confirmed(&header, 1, &txdata);
+                       for _ in 2..100 {
                                header = BlockHeader { version: 0x20000000, prev_blockhash: header.block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-                               $node.block_connected(&header, &[], i);
                        }
+                       $node.update_best_block(&header, 99);
                } }
        }
  
index 44586422cfb48cd16131524770e62e3e69a67a4d,4976f929d631c1df6e76074d5e4c067bf166da39..af11dfd167b4d51f1c6a1bde4ea7ab5eb42308a9
@@@ -3,9 -3,6 +3,9 @@@
  #![deny(broken_intra_doc_links)]
  #![deny(missing_docs)]
  
 +#![cfg_attr(all(test, feature = "unstable"), feature(test))]
 +#[cfg(all(test, feature = "unstable"))] extern crate test;
 +
  mod util;
  
  extern crate lightning;
@@@ -244,7 -241,7 +244,7 @@@ mod tests 
                // Force close because cooperative close doesn't result in any persisted
                // updates.
                nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
-               check_closed_broadcast!(nodes[0], false);
+               check_closed_broadcast!(nodes[0], true);
                check_added_monitors!(nodes[0], 1);
  
                let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
  
                let header = BlockHeader { version: 0x20000000, prev_blockhash: nodes[0].best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                connect_block(&nodes[1], &Block { header, txdata: vec![node_txn[0].clone(), node_txn[0].clone()]});
-               check_closed_broadcast!(nodes[1], false);
+               check_closed_broadcast!(nodes[1], true);
                check_added_monitors!(nodes[1], 1);
  
                // Make sure everything is persisted as expected after close.
                added_monitors.clear();
        }
  }
 +
 +#[cfg(all(test, feature = "unstable"))]
 +pub mod bench {
 +      use test::Bencher;
 +
 +      #[bench]
 +      fn bench_sends(bench: &mut Bencher) {
 +              let persister_a = super::FilesystemPersister::new("bench_filesystem_persister_a".to_string());
 +              let persister_b = super::FilesystemPersister::new("bench_filesystem_persister_b".to_string());
 +              lightning::ln::channelmanager::bench::bench_two_sends(bench, persister_a, persister_b);
 +      }
 +}
index 19fdd97a65a586fabb9a22b2f482dca995facf97,31c8dccae637e6c93a9ae41affa3f94da129ea3e..efb2fe9c37bd9a8ec2c9a36d9a521072a6f8c6f3
@@@ -434,7 -434,6 +434,7 @@@ pub struct ChannelManager<Signer: Sign
        #[cfg(not(any(test, feature = "_test_utils")))]
        channel_state: Mutex<ChannelHolder<Signer>>,
        our_network_key: SecretKey,
 +      our_network_pubkey: PublicKey,
  
        /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
        /// value increases strictly since we don't assume access to a time source.
@@@ -823,6 -822,7 +823,6 @@@ impl<Signer: Sign, M: Deref, T: Deref, 
  
                        latest_block_height: AtomicUsize::new(params.latest_height),
                        last_block_hash: RwLock::new(params.latest_hash),
 -                      secp_ctx,
  
                        channel_state: Mutex::new(ChannelHolder{
                                by_id: HashMap::new(),
                                pending_msg_events: Vec::new(),
                        }),
                        our_network_key: keys_manager.get_node_secret(),
 +                      our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()),
 +                      secp_ctx,
  
                        last_node_announcement_serial: AtomicUsize::new(0),
  
                }
        }
  
-       fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: Option<&PublicKey>) -> Result<(), APIError> {
+       fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: Option<&PublicKey>) -> Result<PublicKey, APIError> {
                let mut chan = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
                        if let hash_map::Entry::Occupied(chan) = channel_state.by_id.entry(channel_id.clone()) {
                                if let Some(node_id) = peer_node_id {
                                        if chan.get().get_counterparty_node_id() != *node_id {
-                                               // Error or Ok here doesn't matter - the result is only exposed publicly
-                                               // when peer_node_id is None anyway.
-                                               return Ok(());
+                                               return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()});
                                        }
                                }
                                if let Some(short_id) = chan.get().get_short_channel_id() {
                        });
                }
  
-               Ok(())
+               Ok(chan.get_counterparty_node_id())
        }
  
        /// Force closes a channel, immediately broadcasting the latest local commitment transaction to
        /// the chain and rejecting new HTLCs on the given channel. Fails if channel_id is unknown to the manager.
        pub fn force_close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
-               self.force_close_channel_with_peer(channel_id, None)
+               match self.force_close_channel_with_peer(channel_id, None) {
+                       Ok(counterparty_node_id) => {
+                               self.channel_state.lock().unwrap().pending_msg_events.push(
+                                       events::MessageSendEvent::HandleError {
+                                               node_id: counterparty_node_id,
+                                               action: msgs::ErrorAction::SendErrorMessage {
+                                                       msg: msgs::ErrorMessage { channel_id: *channel_id, data: "Channel force-closed".to_owned() }
+                                               },
+                                       }
+                               );
+                               Ok(())
+                       },
+                       Err(e) => Err(e)
+               }
        }
  
        /// Force close all channels, immediately broadcasting the latest local commitment transaction
  
        /// Gets the node_id held by this ChannelManager
        pub fn get_our_node_id(&self) -> PublicKey {
 -              PublicKey::from_secret_key(&self.secp_ctx, &self.our_network_key)
 +              self.our_network_pubkey.clone()
        }
  
        /// Restores a single, given channel to normal operation after a
                                                                        msg: update
                                                                });
                                                        }
+                                                       pending_msg_events.push(events::MessageSendEvent::HandleError {
+                                                               node_id: chan.get_counterparty_node_id(),
+                                                               action: msgs::ErrorAction::SendErrorMessage {
+                                                                       msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
+                                                               },
+                                                       });
                                                }
                                        },
                                }
@@@ -3278,12 -3293,26 +3295,26 @@@ wher
        L::Target: Logger,
  {
        fn block_connected(&self, block: &Block, height: u32) {
+               assert_eq!(*self.last_block_hash.read().unwrap(), block.header.prev_blockhash,
+                       "Blocks must be connected in chain-order - the connected header must build on the last connected header");
+               assert_eq!(self.latest_block_height.load(Ordering::Acquire) as u64, height as u64 - 1,
+                       "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
                let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
-               ChannelManager::block_connected(self, &block.header, &txdata, height);
+               self.transactions_confirmed(&block.header, height, &txdata);
+               self.update_best_block(&block.header, height);
        }
  
-       fn block_disconnected(&self, header: &BlockHeader, _height: u32) {
-               ChannelManager::block_disconnected(self, header);
+       fn block_disconnected(&self, header: &BlockHeader, height: u32) {
+               assert_eq!(*self.last_block_hash.read().unwrap(), header.block_hash(),
+                       "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
+               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               let new_height = self.latest_block_height.fetch_sub(1, Ordering::AcqRel) as u32 - 1;
+               assert_eq!(new_height, height - 1,
+                       "Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
+               *self.last_block_hash.write().unwrap() = header.prev_blockhash;
+               self.do_chain_event(new_height, |channel| channel.update_best_block(new_height, header.time));
        }
  }
  
@@@ -3294,22 -3323,11 +3325,11 @@@ impl<Signer: Sign, M: Deref, T: Deref, 
          F::Target: FeeEstimator,
          L::Target: Logger,
  {
-       /// Updates channel state based on transactions seen in a connected block.
-       pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
+       fn do_chain_event<FN: Fn(&mut Channel<Signer>) -> Result<(Option<msgs::FundingLocked>, Vec<(HTLCSource, PaymentHash)>), msgs::ErrorMessage>>
+                       (&self, height: u32, f: FN) {
                // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
                // during initialization prior to the chain_monitor being fully configured in some cases.
                // See the docs for `ChannelManagerReadArgs` for more.
-               let block_hash = header.block_hash();
-               log_trace!(self.logger, "Block {} at height {} connected", block_hash, height);
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
-               assert_eq!(*self.last_block_hash.read().unwrap(), header.prev_blockhash,
-                       "Blocks must be connected in chain-order - the connected header must build on the last connected header");
-               assert_eq!(self.latest_block_height.load(Ordering::Acquire) as u64, height as u64 - 1,
-                       "Blocks must be connected in chain-order - the connected header must build on the last connected header");
-               self.latest_block_height.store(height as usize, Ordering::Release);
-               *self.last_block_hash.write().unwrap() = block_hash;
  
                let mut failed_channels = Vec::new();
                let mut timed_out_htlcs = Vec::new();
                        let short_to_id = &mut channel_state.short_to_id;
                        let pending_msg_events = &mut channel_state.pending_msg_events;
                        channel_state.by_id.retain(|_, channel| {
-                               let res = channel.block_connected(header, txdata, height);
+                               let res = f(channel);
                                if let Ok((chan_res, mut timed_out_pending_htlcs)) = res {
                                        for (source, payment_hash) in timed_out_pending_htlcs.drain(..) {
                                                let chan_update = self.get_channel_update(&channel).map(|u| u.encode_with_len()).unwrap(); // Cannot add/recv HTLCs before we have a short_id so unwrap is safe
                                                short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id());
                                        }
                                } else if let Err(e) = res {
+                                       if let Some(short_id) = channel.get_short_channel_id() {
+                                               short_to_id.remove(&short_id);
+                                       }
+                                       // It looks like our counterparty went on-chain or funding transaction was
+                                       // reorged out of the main chain. Close the channel.
+                                       failed_channels.push(channel.force_shutdown(true));
+                                       if let Ok(update) = self.get_channel_update(&channel) {
+                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                       msg: update
+                                               });
+                                       }
                                        pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                node_id: channel.get_counterparty_node_id(),
                                                action: msgs::ErrorAction::SendErrorMessage { msg: e },
                                        });
                                        return false;
                                }
-                               if let Some(funding_txo) = channel.get_funding_txo() {
-                                       for &(_, tx) in txdata.iter() {
-                                               for inp in tx.input.iter() {
-                                                       if inp.previous_output == funding_txo.into_bitcoin_outpoint() {
-                                                               log_trace!(self.logger, "Detected channel-closing tx {} spending {}:{}, closing channel {}", tx.txid(), inp.previous_output.txid, inp.previous_output.vout, log_bytes!(channel.channel_id()));
-                                                               if let Some(short_id) = channel.get_short_channel_id() {
-                                                                       short_to_id.remove(&short_id);
-                                                               }
-                                                               // It looks like our counterparty went on-chain. Close the channel.
-                                                               failed_channels.push(channel.force_shutdown(true));
-                                                               if let Ok(update) = self.get_channel_update(&channel) {
-                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                                               msg: update
-                                                                       });
-                                                               }
-                                                               return false;
-                                                       }
-                                               }
-                                       }
-                               }
                                true
                        });
  
                for (source, payment_hash, reason) in timed_out_htlcs.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), source, &payment_hash, reason);
                }
+       }
+       /// Updates channel state to take note of transactions which were confirmed in the given block
+       /// at the given height.
+       ///
+       /// Note that you must still call (or have called) [`update_best_block`] with the block
+       /// information which is included here.
+       ///
+       /// This method may be called before or after [`update_best_block`] for a given block's
+       /// transaction data and may be called multiple times with additional transaction data for a
+       /// given block.
+       ///
+       /// This method may be called for a previous block after an [`update_best_block`] call has
+       /// been made for a later block, however it must *not* be called with transaction data from a
+       /// block which is no longer in the best chain (ie where [`update_best_block`] has already
+       /// been informed about a blockchain reorganization which no longer includes the block which
+       /// corresponds to `header`).
+       ///
+       /// [`update_best_block`]: `Self::update_best_block`
+       pub fn transactions_confirmed(&self, header: &BlockHeader, height: u32, txdata: &TransactionData) {
+               // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
+               // during initialization prior to the chain_monitor being fully configured in some cases.
+               // See the docs for `ChannelManagerReadArgs` for more.
+               let block_hash = header.block_hash();
+               log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height);
+               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               self.do_chain_event(height, |channel| channel.transactions_confirmed(&block_hash, height, txdata, &self.logger).map(|a| (a, Vec::new())));
+       }
+       /// Updates channel state with the current best blockchain tip. You should attempt to call this
+       /// quickly after a new block becomes available, however if multiple new blocks become
+       /// available at the same time, only a single `update_best_block()` call needs to be made.
+       ///
+       /// This method should also be called immediately after any block disconnections, once at the
+       /// reorganization fork point, and once with the new chain tip. Calling this method at the
+       /// blockchain reorganization fork point ensures we learn when a funding transaction which was
+       /// previously confirmed is reorganized out of the blockchain, ensuring we do not continue to
+       /// accept payments which cannot be enforced on-chain.
+       ///
+       /// In both the block-connection and block-disconnection case, this method may be called either
+       /// once per block connected or disconnected, or simply at the fork point and new tip(s),
+       /// skipping any intermediary blocks.
+       pub fn update_best_block(&self, header: &BlockHeader, height: u32) {
+               // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
+               // during initialization prior to the chain_monitor being fully configured in some cases.
+               // See the docs for `ChannelManagerReadArgs` for more.
+               let block_hash = header.block_hash();
+               log_trace!(self.logger, "New best block: {} at height {}", block_hash, height);
+               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
+               self.latest_block_height.store(height as usize, Ordering::Release);
+               *self.last_block_hash.write().unwrap() = block_hash;
+               self.do_chain_event(height, |channel| channel.update_best_block(height, header.time));
  
                loop {
                        // Update last_node_announcement_serial to be the max of its current value and the
                }
        }
  
-       /// Updates channel state based on a disconnected block.
-       ///
-       /// If necessary, the channel may be force-closed without letting the counterparty participate
-       /// in the shutdown.
-       pub fn block_disconnected(&self, header: &BlockHeader) {
-               // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
-               // during initialization prior to the chain_monitor being fully configured in some cases.
-               // See the docs for `ChannelManagerReadArgs` for more.
-               let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier);
-               assert_eq!(*self.last_block_hash.read().unwrap(), header.block_hash(),
-                       "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
-               self.latest_block_height.fetch_sub(1, Ordering::AcqRel);
-               *self.last_block_hash.write().unwrap() = header.prev_blockhash;
-               let mut failed_channels = Vec::new();
-               {
-                       let mut channel_lock = self.channel_state.lock().unwrap();
-                       let channel_state = &mut *channel_lock;
-                       let short_to_id = &mut channel_state.short_to_id;
-                       let pending_msg_events = &mut channel_state.pending_msg_events;
-                       channel_state.by_id.retain(|_,  v| {
-                               if v.block_disconnected(header) {
-                                       if let Some(short_id) = v.get_short_channel_id() {
-                                               short_to_id.remove(&short_id);
-                                       }
-                                       failed_channels.push(v.force_shutdown(true));
-                                       if let Ok(update) = self.get_channel_update(&v) {
-                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                                       msg: update
-                                               });
-                                       }
-                                       false
-                               } else {
-                                       true
-                               }
-                       });
-               }
-               self.handle_init_event_channel_failures(failed_channels);
-       }
        /// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool
        /// indicating whether persistence is necessary. Only one listener on
        /// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken
@@@ -4320,6 -4345,7 +4347,6 @@@ impl<'a, Signer: Sign, M: Deref, T: Der
  
                        latest_block_height: AtomicUsize::new(latest_block_height as usize),
                        last_block_hash: RwLock::new(last_block_hash),
 -                      secp_ctx,
  
                        channel_state: Mutex::new(ChannelHolder {
                                by_id,
                                pending_msg_events: Vec::new(),
                        }),
                        our_network_key: args.keys_manager.get_node_secret(),
 +                      our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &args.keys_manager.get_node_secret()),
 +                      secp_ctx,
  
                        last_node_announcement_serial: AtomicUsize::new(last_node_announcement_serial as usize),
  
@@@ -4407,154 -4431,3 +4434,154 @@@ mod tests 
                }
        }
  }
 +
 +#[cfg(all(any(test, feature = "_test_utils"), feature = "unstable"))]
 +pub mod bench {
 +      use chain::Listen;
 +      use chain::chainmonitor::ChainMonitor;
 +      use chain::channelmonitor::Persist;
 +      use chain::keysinterface::{KeysManager, InMemorySigner};
 +      use chain::transaction::OutPoint;
 +      use ln::channelmanager::{ChainParameters, ChannelManager, PaymentHash, PaymentPreimage};
 +      use ln::features::InitFeatures;
 +      use ln::functional_test_utils::*;
 +      use ln::msgs::ChannelMessageHandler;
 +      use routing::network_graph::NetworkGraph;
 +      use routing::router::get_route;
 +      use util::test_utils;
 +      use util::config::UserConfig;
 +      use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
 +
 +      use bitcoin::hashes::Hash;
 +      use bitcoin::hashes::sha256::Hash as Sha256;
 +      use bitcoin::{Block, BlockHeader, Transaction, TxOut};
 +
 +      use std::sync::Mutex;
 +
 +      use test::Bencher;
 +
 +      struct NodeHolder<'a, P: Persist<InMemorySigner>> {
 +              node: &'a ChannelManager<InMemorySigner,
 +                      &'a ChainMonitor<InMemorySigner, &'a test_utils::TestChainSource,
 +                              &'a test_utils::TestBroadcaster, &'a test_utils::TestFeeEstimator,
 +                              &'a test_utils::TestLogger, &'a P>,
 +                      &'a test_utils::TestBroadcaster, &'a KeysManager,
 +                      &'a test_utils::TestFeeEstimator, &'a test_utils::TestLogger>
 +      }
 +
 +      #[cfg(test)]
 +      #[bench]
 +      fn bench_sends(bench: &mut Bencher) {
 +              bench_two_sends(bench, test_utils::TestPersister::new(), test_utils::TestPersister::new());
 +      }
 +
 +      pub fn bench_two_sends<P: Persist<InMemorySigner>>(bench: &mut Bencher, persister_a: P, persister_b: P) {
 +              // Do a simple benchmark of sending a payment back and forth between two nodes.
 +              // Note that this is unrealistic as each payment send will require at least two fsync
 +              // calls per node.
 +              let network = bitcoin::Network::Testnet;
 +              let genesis_hash = bitcoin::blockdata::constants::genesis_block(network).header.block_hash();
 +
 +              let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())};
 +              let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
 +
 +              let mut config: UserConfig = Default::default();
 +              config.own_channel_config.minimum_depth = 1;
 +
 +              let logger_a = test_utils::TestLogger::with_id("node a".to_owned());
 +              let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a);
 +              let seed_a = [1u8; 32];
 +              let keys_manager_a = KeysManager::new(&seed_a, 42, 42);
 +              let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &logger_a, &keys_manager_a, config.clone(), ChainParameters {
 +                      network,
 +                      latest_hash: genesis_hash,
 +                      latest_height: 0,
 +              });
 +              let node_a_holder = NodeHolder { node: &node_a };
 +
 +              let logger_b = test_utils::TestLogger::with_id("node a".to_owned());
 +              let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b);
 +              let seed_b = [2u8; 32];
 +              let keys_manager_b = KeysManager::new(&seed_b, 42, 42);
 +              let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &logger_b, &keys_manager_b, config.clone(), ChainParameters {
 +                      network,
 +                      latest_hash: genesis_hash,
 +                      latest_height: 0,
 +              });
 +              let node_b_holder = NodeHolder { node: &node_b };
 +
 +              node_a.create_channel(node_b.get_our_node_id(), 8_000_000, 100_000_000, 42, None).unwrap();
 +              node_b.handle_open_channel(&node_a.get_our_node_id(), InitFeatures::known(), &get_event_msg!(node_a_holder, MessageSendEvent::SendOpenChannel, node_b.get_our_node_id()));
 +              node_a.handle_accept_channel(&node_b.get_our_node_id(), InitFeatures::known(), &get_event_msg!(node_b_holder, MessageSendEvent::SendAcceptChannel, node_a.get_our_node_id()));
 +
 +              let tx;
 +              if let Event::FundingGenerationReady { temporary_channel_id, output_script, .. } = get_event!(node_a_holder, Event::FundingGenerationReady) {
 +                      tx = Transaction { version: 2, lock_time: 0, input: Vec::new(), output: vec![TxOut {
 +                              value: 8_000_000, script_pubkey: output_script,
 +                      }]};
 +                      let funding_outpoint = OutPoint { txid: tx.txid(), index: 0 };
 +                      node_a.funding_transaction_generated(&temporary_channel_id, funding_outpoint);
 +              } else { panic!(); }
 +
 +              node_b.handle_funding_created(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendFundingCreated, node_b.get_our_node_id()));
 +              node_a.handle_funding_signed(&node_b.get_our_node_id(), &get_event_msg!(node_b_holder, MessageSendEvent::SendFundingSigned, node_a.get_our_node_id()));
 +
 +              get_event!(node_a_holder, Event::FundingBroadcastSafe);
 +
 +              let block = Block {
 +                      header: BlockHeader { version: 0x20000000, prev_blockhash: genesis_hash, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
 +                      txdata: vec![tx],
 +              };
 +              Listen::block_connected(&node_a, &block, 1);
 +              Listen::block_connected(&node_b, &block, 1);
 +
 +              node_a.handle_funding_locked(&node_b.get_our_node_id(), &get_event_msg!(node_b_holder, MessageSendEvent::SendFundingLocked, node_a.get_our_node_id()));
 +              node_b.handle_funding_locked(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendFundingLocked, node_b.get_our_node_id()));
 +
 +              let dummy_graph = NetworkGraph::new(genesis_hash);
 +
 +              macro_rules! send_payment {
 +                      ($node_a: expr, $node_b: expr) => {
 +                              let usable_channels = $node_a.list_usable_channels();
 +                              let route = get_route(&$node_a.get_our_node_id(), &dummy_graph, &$node_b.get_our_node_id(), None, Some(&usable_channels.iter().map(|r| r).collect::<Vec<_>>()), &[], 10_000, TEST_FINAL_CLTV, &logger_a).unwrap();
 +
 +                              let payment_preimage = PaymentPreimage([0; 32]);
 +                              let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0[..]).into_inner());
 +
 +                              $node_a.send_payment(&route, payment_hash, &None).unwrap();
 +                              let payment_event = SendEvent::from_event($node_a.get_and_clear_pending_msg_events().pop().unwrap());
 +                              $node_b.handle_update_add_htlc(&$node_a.get_our_node_id(), &payment_event.msgs[0]);
 +                              $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &payment_event.commitment_msg);
 +                              let (raa, cs) = get_revoke_commit_msgs!(NodeHolder { node: &$node_b }, $node_a.get_our_node_id());
 +                              $node_a.handle_revoke_and_ack(&$node_b.get_our_node_id(), &raa);
 +                              $node_a.handle_commitment_signed(&$node_b.get_our_node_id(), &cs);
 +                              $node_b.handle_revoke_and_ack(&$node_a.get_our_node_id(), &get_event_msg!(NodeHolder { node: &$node_a }, MessageSendEvent::SendRevokeAndACK, $node_b.get_our_node_id()));
 +
 +                              expect_pending_htlcs_forwardable!(NodeHolder { node: &$node_b });
 +                              expect_payment_received!(NodeHolder { node: &$node_b }, payment_hash, 10_000);
 +                              assert!($node_b.claim_funds(payment_preimage, &None, 10_000));
 +
 +                              match $node_b.get_and_clear_pending_msg_events().pop().unwrap() {
 +                                      MessageSendEvent::UpdateHTLCs { node_id, updates } => {
 +                                              assert_eq!(node_id, $node_a.get_our_node_id());
 +                                              $node_a.handle_update_fulfill_htlc(&$node_b.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
 +                                              $node_a.handle_commitment_signed(&$node_b.get_our_node_id(), &updates.commitment_signed);
 +                                      },
 +                                      _ => panic!("Failed to generate claim event"),
 +                              }
 +
 +                              let (raa, cs) = get_revoke_commit_msgs!(NodeHolder { node: &$node_a }, $node_b.get_our_node_id());
 +                              $node_b.handle_revoke_and_ack(&$node_a.get_our_node_id(), &raa);
 +                              $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &cs);
 +                              $node_a.handle_revoke_and_ack(&$node_b.get_our_node_id(), &get_event_msg!(NodeHolder { node: &$node_b }, MessageSendEvent::SendRevokeAndACK, $node_a.get_our_node_id()));
 +
 +                              expect_payment_sent!(NodeHolder { node: &$node_a }, payment_preimage);
 +                      }
 +              }
 +
 +              bench.iter(|| {
 +                      send_payment!(node_a, node_b);
 +                      send_payment!(node_b, node_a);
 +              });
 +      }
 +}
index 3c641dd74af4114a05b6c5ec032d810ac40ec836,ed1585f68e4f19677b4c847bc1c7bf06c2e4404a..af11ef5d86baf01c98d0d815a753ac23ffc3535b
@@@ -10,7 -10,7 +10,7 @@@
  //! A bunch of useful utilities for building networks of nodes and exchanging messages between
  //! nodes for functional tests.
  
- use chain::Watch;
+ use chain::{Listen, Watch};
  use chain::channelmonitor::ChannelMonitor;
  use chain::transaction::OutPoint;
  use ln::channelmanager::{ChainParameters, ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
@@@ -60,21 -60,15 +60,15 @@@ pub fn mine_transaction<'a, 'b, 'c, 'd>
  /// Mine the given transaction at the given height, mining blocks as required to build to that
  /// height
  pub fn confirm_transaction_at<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, tx: &Transaction, conf_height: u32) {
-       let starting_block = node.best_block_info();
+       let first_connect_height = node.best_block_info().1 + 1;
+       assert!(first_connect_height <= conf_height);
+       if conf_height - first_connect_height >= 1 {
+               connect_blocks(node, conf_height - first_connect_height);
+       }
        let mut block = Block {
-               header: BlockHeader { version: 0x20000000, prev_blockhash: starting_block.0, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
+               header: BlockHeader { version: 0x20000000, prev_blockhash: node.best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
                txdata: Vec::new(),
        };
-       let height = starting_block.1 + 1;
-       assert!(height <= conf_height);
-       for _ in height..conf_height {
-               connect_block(node, &block);
-               block = Block {
-                       header: BlockHeader { version: 0x20000000, prev_blockhash: block.header.block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
-                       txdata: vec![],
-               };
-       }
        for _ in 0..*node.network_chan_count.borrow() { // Make sure we don't end up with channels at the same short id by offsetting by chan_count
                block.txdata.push(Transaction { version: 0, lock_time: 0, input: Vec::new(), output: Vec::new() });
        }
        connect_block(node, &block);
  }
  
+ /// The possible ways we may notify a ChannelManager of a new block
+ pub enum ConnectStyle {
+       /// Calls update_best_block first, detecting transactions in the block only after receiving the
+       /// header and height information.
+       BestBlockFirst,
+       /// The same as BestBlockFirst, however when we have multiple blocks to connect, we only
+       /// make a single update_best_block call.
+       BestBlockFirstSkippingBlocks,
+       /// Calls transactions_confirmed first, detecting transactions in the block before updating the
+       /// header and height information.
+       TransactionsFirst,
+       /// The same as TransactionsFirst, however when we have multiple blocks to connect, we only
+       /// make a single update_best_block call.
+       TransactionsFirstSkippingBlocks,
+       /// Provides the full block via the chain::Listen interface. In the current code this is
+       /// equivalent to TransactionsFirst with some additional assertions.
+       FullBlockViaListen,
+ }
  pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32) -> BlockHash {
+       let skip_intermediaries = match *node.connect_style.borrow() {
+               ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks => true,
+               _ => false,
+       };
        let mut block = Block {
                header: BlockHeader { version: 0x2000000, prev_blockhash: node.best_block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
                txdata: vec![],
        };
-       connect_block(node, &block);
-       for _ in 2..depth + 1 {
+       assert!(depth >= 1);
+       for _ in 0..depth - 1 {
+               do_connect_block(node, &block, skip_intermediaries);
                block = Block {
                        header: BlockHeader { version: 0x20000000, prev_blockhash: block.header.block_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
                        txdata: vec![],
                };
-               connect_block(node, &block);
        }
+       connect_block(node, &block);
        block.header.block_hash()
  }
  
  pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block) {
+       do_connect_block(node, block, false);
+ }
+ fn do_connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, skip_manager: bool) {
        let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
        let height = node.best_block_info().1 + 1;
        node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height);
-       node.node.block_connected(&block.header, &txdata, height);
+       if !skip_manager {
+               match *node.connect_style.borrow() {
+                       ConnectStyle::BestBlockFirst|ConnectStyle::BestBlockFirstSkippingBlocks => {
+                               node.node.update_best_block(&block.header, height);
+                               node.node.transactions_confirmed(&block.header, height, &block.txdata.iter().enumerate().collect::<Vec<_>>());
+                       },
+                       ConnectStyle::TransactionsFirst|ConnectStyle::TransactionsFirstSkippingBlocks => {
+                               node.node.transactions_confirmed(&block.header, height, &block.txdata.iter().enumerate().collect::<Vec<_>>());
+                               node.node.update_best_block(&block.header, height);
+                       },
+                       ConnectStyle::FullBlockViaListen => {
+                               Listen::block_connected(node.node, &block, height);
+                       }
+               }
+       }
        node.node.test_process_background_events();
        node.blocks.borrow_mut().push((block.header, height));
  }
  
  pub fn disconnect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, count: u32) {
-       for _ in 0..count {
+       for i in 0..count {
                let orig_header = node.blocks.borrow_mut().pop().unwrap();
                assert!(orig_header.1 > 0); // Cannot disconnect genesis
+               let prev_header = node.blocks.borrow().last().unwrap().clone();
                node.chain_monitor.chain_monitor.block_disconnected(&orig_header.0, orig_header.1);
-               node.node.block_disconnected(&orig_header.0);
+               match *node.connect_style.borrow() {
+                       ConnectStyle::FullBlockViaListen => {
+                               Listen::block_disconnected(node.node, &orig_header.0, orig_header.1);
+                       },
+                       ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks => {
+                               if i == count - 1 {
+                                       node.node.update_best_block(&prev_header.0, prev_header.1);
+                               }
+                       },
+                       _ => {
+                               node.node.update_best_block(&prev_header.0, prev_header.1);
+                       },
+               }
        }
  }
  
@@@ -152,6 -203,7 +203,7 @@@ pub struct Node<'a, 'b: 'a, 'c: 'b> 
        pub network_chan_count: Rc<RefCell<u32>>,
        pub logger: &'c test_utils::TestLogger,
        pub blocks: RefCell<Vec<(BlockHeader, u32)>>,
+       pub connect_style: Rc<RefCell<ConnectStyle>>,
  }
  impl<'a, 'b, 'c> Node<'a, 'b, 'c> {
        pub fn best_block_hash(&self) -> BlockHash {
@@@ -313,24 -365,6 +365,24 @@@ macro_rules! get_event_msg 
        }
  }
  
 +/// Get a specific event from the pending events queue.
 +#[macro_export]
 +macro_rules! get_event {
 +      ($node: expr, $event_type: path) => {
 +              {
 +                      let mut events = $node.node.get_and_clear_pending_events();
 +                      assert_eq!(events.len(), 1);
 +                      let ev = events.pop().unwrap();
 +                      match ev {
 +                              $event_type { .. } => {
 +                                      ev
 +                              },
 +                              _ => panic!("Unexpected event"),
 +                      }
 +              }
 +      }
 +}
 +
  #[cfg(test)]
  macro_rules! get_htlc_update_msgs {
        ($node: expr, $node_id: expr) => {
@@@ -359,8 -393,7 +411,8 @@@ macro_rules! get_feerate 
        }
  }
  
 -#[cfg(test)]
 +/// Returns any local commitment transactions for the channel.
 +#[macro_export]
  macro_rules! get_local_commitment_txn {
        ($node: expr, $channel_id: expr) => {
                {
@@@ -866,7 -899,7 +918,7 @@@ macro_rules! expect_pending_htlcs_forwa
        }}
  }
  
 -#[cfg(test)]
 +#[cfg(any(test, feature = "unstable"))]
  macro_rules! expect_payment_received {
        ($node: expr, $expected_payment_hash: expr, $expected_recv_value: expr) => {
                let events = $node.node.get_and_clear_pending_events();
@@@ -1243,6 -1276,7 +1295,7 @@@ pub fn create_network<'a, 'b: 'a, 'c: '
        let mut nodes = Vec::new();
        let chan_count = Rc::new(RefCell::new(0));
        let payment_count = Rc::new(RefCell::new(0));
+       let connect_style = Rc::new(RefCell::new(ConnectStyle::FullBlockViaListen));
  
        for i in 0..node_count {
                let net_graph_msg_handler = NetGraphMsgHandler::new(cfgs[i].chain_source.genesis_hash, None, cfgs[i].logger);
                                 keys_manager: &cfgs[i].keys_manager, node: &chan_mgrs[i], net_graph_msg_handler,
                                 node_seed: cfgs[i].node_seed, network_chan_count: chan_count.clone(),
                                 network_payment_count: payment_count.clone(), logger: cfgs[i].logger,
-                                blocks: RefCell::new(vec![(genesis_block(Network::Testnet).header, 0)])
+                                blocks: RefCell::new(vec![(genesis_block(Network::Testnet).header, 0)]),
+                                connect_style: Rc::clone(&connect_style),
                })
        }
  
@@@ -1364,22 -1399,36 +1418,36 @@@ pub fn check_preimage_claim<'a, 'b, 'c>
  
  pub fn get_announce_close_broadcast_events<'a, 'b, 'c>(nodes: &Vec<Node<'a, 'b, 'c>>, a: usize, b: usize)  {
        let events_1 = nodes[a].node.get_and_clear_pending_msg_events();
-       assert_eq!(events_1.len(), 1);
+       assert_eq!(events_1.len(), 2);
        let as_update = match events_1[0] {
                MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                        msg.clone()
                },
                _ => panic!("Unexpected event"),
        };
+       match events_1[1] {
+               MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => {
+                       assert_eq!(node_id, nodes[b].node.get_our_node_id());
+                       assert_eq!(msg.data, "Commitment or closing transaction was confirmed on chain.");
+               },
+               _ => panic!("Unexpected event"),
+       }
  
        let events_2 = nodes[b].node.get_and_clear_pending_msg_events();
-       assert_eq!(events_2.len(), 1);
+       assert_eq!(events_2.len(), 2);
        let bs_update = match events_2[0] {
                MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                        msg.clone()
                },
                _ => panic!("Unexpected event"),
        };
+       match events_2[1] {
+               MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => {
+                       assert_eq!(node_id, nodes[a].node.get_our_node_id());
+                       assert_eq!(msg.data, "Commitment or closing transaction was confirmed on chain.");
+               },
+               _ => panic!("Unexpected event"),
+       }
  
        for node in nodes {
                node.net_graph_msg_handler.handle_channel_update(&as_update).unwrap();