chaininterface: add BlockNotifier struct
authorValentine Wallace <vwallace@protonmail.com>
Sat, 9 Nov 2019 01:12:13 +0000 (20:12 -0500)
committerValentine Wallace <vwallace@protonmail.com>
Fri, 22 Nov 2019 01:34:00 +0000 (20:34 -0500)
Adding this struct will allow us to remove the circular reference
between ChainListeners and the ChainWatchInterface, because it
separates out the responsibility of notifying listeners about new
blocks from the responsibility of storing and retrieving watched
transactions.

lightning/src/chain/chaininterface.rs
lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/channelmonitor.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs

index 24c9d7100808c7ebfc6b0ab6a7dc7a011b4083e0..32fef48eb44a54acaa7a576502ad1e0f1e56311d 100644 (file)
@@ -193,6 +193,75 @@ impl ChainWatchedUtil {
        }
 }
 
+/// Utility for notifying listeners about new blocks, and handling block rescans if new watch
+/// data is registered.
+pub struct BlockNotifier<'a> {
+       listeners: Mutex<Vec<Weak<ChainListener + 'a>>>, //TODO(vmw): try removing Weak
+       chain_monitor: Arc<ChainWatchInterface>,
+}
+
+impl<'a> BlockNotifier<'a> {
+       /// Constructs a new BlockNotifier without any listeners.
+       pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a> {
+               BlockNotifier {
+                       listeners: Mutex::new(Vec::new()),
+                       chain_monitor,
+               }
+       }
+
+       /// Register the given listener to receive events. Only a weak pointer is provided and
+       /// the registration should be freed once that pointer expires.
+       // TODO: unregister
+       pub fn register_listener(&self, listener: Weak<ChainListener + 'a>) {
+               let mut vec = self.listeners.lock().unwrap();
+               vec.push(listener);
+       }
+
+       /// Notify listeners that a block was connected given a full, unfiltered block.
+       ///
+       /// Handles re-scanning the block and calling block_connected again if listeners register new
+       /// watch data during the callbacks for you (see ChainListener::block_connected for more info).
+       pub fn block_connected<'b>(&self, block: &'b Block, height: u32) {
+               let mut reentered = true;
+               while reentered {
+                       let (matched, matched_index) = self.chain_monitor.filter_block(block);
+                       reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice());
+               }
+       }
+
+       /// Notify listeners that a block was connected, given pre-filtered list of transactions in the
+       /// block which matched the filter (probably using does_match_tx).
+       ///
+       /// Returns true if notified listeners registered additional watch data (implying that the
+       /// block must be re-scanned and this function called again prior to further block_connected
+       /// calls, see ChainListener::block_connected for more info).
+       pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
+               let last_seen = self.chain_monitor.reentered();
+
+               let listeners = self.listeners.lock().unwrap().clone();
+               for listener in listeners.iter() {
+                       match listener.upgrade() {
+                               Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
+                               None => ()
+                       }
+               }
+               return last_seen != self.chain_monitor.reentered();
+       }
+
+
+       /// Notify listeners that a block was disconnected.
+       pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
+               let listeners = self.listeners.lock().unwrap().clone();
+               for listener in listeners.iter() {
+                       match listener.upgrade() {
+                               Some(arc) => arc.block_disconnected(&header, disconnected_height),
+                               None => ()
+                       }
+               }
+       }
+
+}
+
 /// Utility to capture some common parts of ChainWatchInterface implementors.
 ///
 /// Keeping a local copy of this in a ChainWatchInterface implementor is likely useful.
@@ -240,63 +309,11 @@ impl ChainWatchInterfaceUtil {
                ChainWatchInterfaceUtil {
                        network: network,
                        watched: Mutex::new(ChainWatchedUtil::new()),
-                       listeners: Mutex::new(Vec::new()),
                        reentered: AtomicUsize::new(1),
                        logger: logger,
                }
        }
 
-       /// Notify listeners that a block was connected given a full, unfiltered block.
-       ///
-       /// Handles re-scanning the block and calling block_connected again if listeners register new
-       /// watch data during the callbacks for you (see ChainListener::block_connected for more info).
-       pub fn block_connected_with_filtering(&self, block: &Block, height: u32) {
-               let mut reentered = true;
-               while reentered {
-                       let mut matched = Vec::new();
-                       let mut matched_index = Vec::new();
-                       {
-                               let watched = self.watched.lock().unwrap();
-                               for (index, transaction) in block.txdata.iter().enumerate() {
-                                       if self.does_match_tx_unguarded(transaction, &watched) {
-                                               matched.push(transaction);
-                                               matched_index.push(index as u32);
-                                       }
-                               }
-                       }
-                       reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice());
-               }
-       }
-
-       /// Notify listeners that a block was disconnected.
-       pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
-               let listeners = self.listeners.lock().unwrap().clone();
-               for listener in listeners.iter() {
-                       match listener.upgrade() {
-                               Some(arc) => arc.block_disconnected(&header, disconnected_height),
-                               None => ()
-                       }
-               }
-       }
-
-       /// Notify listeners that a block was connected, given pre-filtered list of transactions in the
-       /// block which matched the filter (probably using does_match_tx).
-       ///
-       /// Returns true if notified listeners registered additional watch data (implying that the
-       /// block must be re-scanned and this function called again prior to further block_connected
-       /// calls, see ChainListener::block_connected for more info).
-       pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
-               let last_seen = self.reentered.load(Ordering::Relaxed);
-
-               let listeners = self.listeners.lock().unwrap().clone();
-               for listener in listeners.iter() {
-                       match listener.upgrade() {
-                               Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
-                               None => ()
-                       }
-               }
-               return last_seen != self.reentered.load(Ordering::Relaxed);
-       }
 
        /// Checks if a given transaction matches the current filter.
        pub fn does_match_tx(&self, tx: &Transaction) -> bool {
index c1fe6fbdd6d7c54002d31f277481a200f5cb5b7b..205f1a4bd34fe23bd94ffe36a58b4dd609bceb39 100644 (file)
@@ -1627,11 +1627,11 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
        };
 
        if confirm_a_first {
-               confirm_transaction(&nodes[0].chain_monitor, &funding_tx, funding_tx.version);
+               confirm_transaction(&nodes[0].block_notifier, &nodes[0].chain_monitor, &funding_tx, funding_tx.version);
                nodes[1].node.handle_funding_locked(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingLocked, nodes[1].node.get_our_node_id())).unwrap();
        } else {
                assert!(!restore_b_before_conf);
-               confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
+               confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
                assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
        }
 
@@ -1643,7 +1643,7 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
 
        if !restore_b_before_conf {
-               confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
+               confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
                assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
                assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
        }
@@ -1655,12 +1655,12 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
        let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first {
                nodes[0].node.handle_funding_locked(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingLocked, nodes[0].node.get_our_node_id())).unwrap();
 
-               confirm_transaction(&nodes[0].chain_monitor, &funding_tx, funding_tx.version);
+               confirm_transaction(&nodes[0].block_notifier, &nodes[0].chain_monitor, &funding_tx, funding_tx.version);
                let (funding_locked, channel_id) = create_chan_between_nodes_with_value_confirm_second(&nodes[1], &nodes[0]);
                (channel_id, create_chan_between_nodes_with_value_b(&nodes[0], &nodes[1], &funding_locked))
        } else {
                if restore_b_before_conf {
-                       confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
+                       confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
                }
                let (funding_locked, channel_id) = create_chan_between_nodes_with_value_confirm_second(&nodes[0], &nodes[1]);
                (channel_id, create_chan_between_nodes_with_value_b(&nodes[1], &nodes[0], &funding_locked))
index f368a3a203967bc73af40ece260d38b36205bcd4..0414cc5594363cf62ccd008ac679e9e5d64298fb 100644 (file)
@@ -585,8 +585,15 @@ impl ChannelManager {
        ///
        /// panics if channel_value_satoshis is >= `MAX_FUNDING_SATOSHIS`!
        ///
-       /// User must provide the current blockchain height from which to track onchain channel
+       /// Users must provide the current blockchain height from which to track onchain channel
        /// funding outpoints and send payments with reliable timelocks.
+       ///
+       /// Users need to notify the new ChannelManager when a new block is connected or
+       /// disconnected using its `block_connected` and `block_disconnected` methods.
+       /// However, rather than calling these methods directly, the user should register
+       /// the ChannelManager as a listener to the BlockNotifier and call the BlockNotifier's
+       /// `block_(dis)connected` methods, which will notify all registered listeners in one
+       /// go.
        pub fn new(network: Network, feeest: Arc<FeeEstimator>, monitor: Arc<ManyChannelMonitor>, chain_monitor: Arc<ChainWatchInterface>, tx_broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>,keys_manager: Arc<KeysInterface>, config: UserConfig, current_blockchain_height: usize) -> Result<Arc<ChannelManager>, secp256k1::Error> {
                let secp_ctx = Secp256k1::new();
 
@@ -3122,8 +3129,7 @@ impl Writeable for ChannelManager {
 /// 4) Reconnect blocks on your ChannelMonitors.
 /// 5) Move the ChannelMonitors into your local ManyChannelMonitor.
 /// 6) Disconnect/connect blocks on the ChannelManager.
-/// 7) Register the new ChannelManager with your ChainWatchInterface (this does not happen
-///    automatically as it does in ChannelManager::new()).
+/// 7) Register the new ChannelManager with your ChainWatchInterface.
 pub struct ChannelManagerReadArgs<'a> {
        /// The keys provider which will give us relevant keys. Some keys will be loaded during
        /// deserialization.
index dc385f6b63f1f15f5815836f267bbac7f68e2927..dfa56d6f2555cab8c59520a5183d1dc6e5f1f7fe 100644 (file)
@@ -109,6 +109,12 @@ pub struct HTLCUpdate {
 /// channel's monitor everywhere (including remote watchtowers) *before* this function returns. If
 /// an update occurs and a remote watchtower is left with old state, it may broadcast transactions
 /// which we have revoked, allowing our counterparty to claim all funds in the channel!
+///
+/// User needs to notify implementors of ManyChannelMonitor when a new block is connected or
+/// disconnected using their `block_connected` and `block_disconnected` methods. However, rather
+/// than calling these methods directly, the user should register implementors as listeners to the
+/// BlockNotifier and call the BlockNotifier's `block_(dis)connected` methods, which will notify
+/// all registered listeners in one go.
 pub trait ManyChannelMonitor: Send + Sync {
        /// Adds or updates a monitor for the given `funding_txo`.
        ///
index 303cde08d60500668204b1bc902d93c07d28dd8e..2745bae1dbfe39d4008b7b5c997cc50bbfdd038c 100644 (file)
@@ -34,27 +34,28 @@ use std::sync::{Arc, Mutex};
 use std::mem;
 
 pub const CHAN_CONFIRM_DEPTH: u32 = 100;
-pub fn confirm_transaction(chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) {
+pub fn confirm_transaction(notifier: &chaininterface::BlockNotifier, chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) {
        assert!(chain.does_match_tx(tx));
        let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       chain.block_connected_checked(&header, 1, &[tx; 1], &[chan_id; 1]);
+       notifier.block_connected_checked(&header, 1, &[tx; 1], &[chan_id; 1]);
        for i in 2..CHAN_CONFIRM_DEPTH {
                header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               chain.block_connected_checked(&header, i, &[tx; 0], &[0; 0]);
+               notifier.block_connected_checked(&header, i, &vec![], &[0; 0]);
        }
 }
 
-pub fn connect_blocks(chain: &chaininterface::ChainWatchInterfaceUtil, depth: u32, height: u32, parent: bool, prev_blockhash: Sha256d) -> Sha256d {
+pub fn connect_blocks(notifier: &chaininterface::BlockNotifier, depth: u32, height: u32, parent: bool, prev_blockhash: Sha256d) -> Sha256d {
        let mut header = BlockHeader { version: 0x2000000, prev_blockhash: if parent { prev_blockhash } else { Default::default() }, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       chain.block_connected_checked(&header, height + 1, &Vec::new(), &Vec::new());
+       notifier.block_connected_checked(&header, height + 1, &Vec::new(), &Vec::new());
        for i in 2..depth + 1 {
                header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               chain.block_connected_checked(&header, height + i, &Vec::new(), &Vec::new());
+               notifier.block_connected_checked(&header, height + i, &Vec::new(), &Vec::new());
        }
        header.bitcoin_hash()
 }
 
 pub struct Node {
+       pub block_notifier: Arc<chaininterface::BlockNotifier<'a, 'b>>,
        pub chain_monitor: Arc<chaininterface::ChainWatchInterfaceUtil>,
        pub tx_broadcaster: Arc<test_utils::TestBroadcaster>,
        pub chan_monitor: Arc<test_utils::TestChannelMonitor>,
@@ -220,7 +221,7 @@ pub fn create_chan_between_nodes_with_value_init(node_a: &Node, node_b: &Node, c
 }
 
 pub fn create_chan_between_nodes_with_value_confirm_first(node_recv: &Node, node_conf: &Node, tx: &Transaction) {
-       confirm_transaction(&node_conf.chain_monitor, &tx, tx.version);
+       confirm_transaction(&node_conf.block_notifier, &node_conf.chain_monitor, &tx, tx.version);
        node_recv.node.handle_funding_locked(&node_conf.node.get_our_node_id(), &get_event_msg!(node_conf, MessageSendEvent::SendFundingLocked, node_recv.node.get_our_node_id())).unwrap();
 }
 
@@ -246,7 +247,7 @@ pub fn create_chan_between_nodes_with_value_confirm_second(node_recv: &Node, nod
 
 pub fn create_chan_between_nodes_with_value_confirm(node_a: &Node, node_b: &Node, tx: &Transaction) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) {
        create_chan_between_nodes_with_value_confirm_first(node_a, node_b, tx);
-       confirm_transaction(&node_a.chain_monitor, &tx, tx.version);
+       confirm_transaction(&node_a.block_notifier, &node_a.chain_monitor, &tx, tx.version);
        create_chan_between_nodes_with_value_confirm_second(node_b, node_a)
 }
 
@@ -836,19 +837,25 @@ pub fn create_network(node_count: usize, node_config: &[Option<UserConfig>]) ->
                let logger: Arc<Logger> = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
                let feeest = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
                let chain_monitor = Arc::new(chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet, Arc::clone(&logger)));
+               let block_notifier = Arc::new(chaininterface::BlockNotifier::new(chain_monitor.clone()));
                let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())});
                let mut seed = [0; 32];
                rng.fill_bytes(&mut seed);
                let keys_manager = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet, Arc::clone(&logger)));
                let chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), feeest.clone()));
+               let weak_res = Arc::downgrade(&chan_monitor.simple_monitor);
+               block_notifier.register_listener(weak_res);
                let mut default_config = UserConfig::new();
                default_config.channel_options.announced_channel = true;
                default_config.peer_channel_config_limits.force_announced_channel_preference = false;
                let node = ChannelManager::new(Network::Testnet, feeest.clone(), chan_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger), keys_manager.clone(), if node_config[i].is_some() { node_config[i].clone().unwrap() } else { default_config }, 0).unwrap();
+               let weak_res = Arc::downgrade(&node);
+               block_notifier.register_listener(weak_res);
                let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()), chain_monitor.clone(), Arc::clone(&logger));
                nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router, keys_manager, node_seed: seed,
                        network_payment_count: payment_count.clone(),
                        network_chan_count: chan_count.clone(),
+                       block_notifier,
                });
        }
 
index 91829db81d40db2e06dd5e571998761fea492370..83ca1e422ec7104f1433fc28becdb30510d7c698 100644 (file)
@@ -3,7 +3,7 @@
 //! claim outputs on-chain.
 
 use chain::transaction::OutPoint;
-use chain::chaininterface::{ChainListener, ChainWatchInterface, ChainWatchInterfaceUtil};
+use chain::chaininterface::{ChainListener, ChainWatchInterfaceUtil};
 use chain::keysinterface::{KeysInterface, SpendableOutputDescriptor, KeysManager};
 use chain::keysinterface;
 use ln::channel::{COMMITMENT_TX_BASE_WEIGHT, COMMITMENT_TX_WEIGHT_PER_HTLC};
@@ -656,8 +656,8 @@ fn pre_funding_lock_shutdown_test() {
        let nodes = create_network(2, &[None, None]);
        let tx = create_chan_between_nodes_with_value_init(&nodes[0], &nodes[1], 8000000, 0, LocalFeatures::new(), LocalFeatures::new());
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[0].chain_monitor.block_connected_checked(&header, 1, &[&tx; 1], &[1; 1]);
-       nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&tx; 1], &[1; 1]);
+       nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![tx.clone()]}, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![tx.clone()]}, 1);
 
        nodes[0].node.close_channel(&OutPoint::new(tx.txid(), 0).to_channel_id()).unwrap();
        let node_0_shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id());
@@ -1281,8 +1281,7 @@ fn test_duplicate_htlc_different_direction_onchain() {
        assert_eq!(has_both_htlcs, 2);
 
        let header = BlockHeader { version: 0x2000_0000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-
-       nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![remote_txn[0].clone()] }, 1);
+       nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![remote_txn[0].clone()] }, 1);
 
        // Check we only broadcast 1 timeout tx
        let claim_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clone();
@@ -1739,7 +1738,7 @@ fn channel_monitor_network_test() {
        {
                let mut node_txn = test_txn_broadcast(&nodes[1], &chan_1, None, HTLCType::NONE);
                let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
                test_txn_broadcast(&nodes[0], &chan_1, None, HTLCType::NONE);
        }
        get_announce_close_broadcast_events(&nodes, 0, 1);
@@ -1754,7 +1753,7 @@ fn channel_monitor_network_test() {
        {
                let mut node_txn = test_txn_broadcast(&nodes[1], &chan_2, None, HTLCType::TIMEOUT);
                let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
+               nodes[2].block_notifier.block_connected(&Block { header, txdata: vec![node_txn.drain(..).next().unwrap()] }, 1);
                test_txn_broadcast(&nodes[2], &chan_2, None, HTLCType::NONE);
        }
        get_announce_close_broadcast_events(&nodes, 1, 2);
@@ -1791,7 +1790,7 @@ fn channel_monitor_network_test() {
                claim_funds!(nodes[3], nodes[2], payment_preimage_1, 3_000_000);
 
                let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[3].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn[0].clone()] }, 1);
+               nodes[3].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[0].clone()] }, 1);
 
                check_preimage_claim(&nodes[3], &node_txn);
        }
@@ -1801,7 +1800,7 @@ fn channel_monitor_network_test() {
 
        { // Cheat and reset nodes[4]'s height to 1
                let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[4].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![] }, 1);
+               nodes[4].block_notifier.block_connected(&Block { header, txdata: vec![] }, 1);
        }
 
        assert_eq!(nodes[3].node.latest_block_height.load(Ordering::Acquire), 1);
@@ -1813,10 +1812,10 @@ fn channel_monitor_network_test() {
 
        {
                let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[3].chain_monitor.block_connected_checked(&header, 2, &Vec::new()[..], &[0; 0]);
+               nodes[3].block_notifier.block_connected_checked(&header, 2, &Vec::new()[..], &[0; 0]);
                for i in 3..TEST_FINAL_CLTV + 2 + LATENCY_GRACE_PERIOD_BLOCKS + 1 {
                        header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-                       nodes[3].chain_monitor.block_connected_checked(&header, i, &Vec::new()[..], &[0; 0]);
+                       nodes[3].block_notifier.block_connected_checked(&header, i, &Vec::new()[..], &[0; 0]);
                }
 
                let node_txn = test_txn_broadcast(&nodes[3], &chan_4, None, HTLCType::TIMEOUT);
@@ -1825,16 +1824,17 @@ fn channel_monitor_network_test() {
                claim_funds!(nodes[4], nodes[3], payment_preimage_2, 3_000_000);
 
                header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[4].chain_monitor.block_connected_checked(&header, 2, &Vec::new()[..], &[0; 0]);
+
+               nodes[4].block_notifier.block_connected_checked(&header, 2, &Vec::new()[..], &[0; 0]);
                for i in 3..TEST_FINAL_CLTV + 2 - CLTV_CLAIM_BUFFER + 1 {
                        header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-                       nodes[4].chain_monitor.block_connected_checked(&header, i, &Vec::new()[..], &[0; 0]);
+                       nodes[4].block_notifier.block_connected_checked(&header, i, &Vec::new()[..], &[0; 0]);
                }
 
                test_txn_broadcast(&nodes[4], &chan_4, None, HTLCType::SUCCESS);
 
                header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[4].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn[0].clone()] }, TEST_FINAL_CLTV - 5);
+               nodes[4].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[0].clone()] }, TEST_FINAL_CLTV - 5);
 
                check_preimage_claim(&nodes[4], &node_txn);
        }
@@ -1846,7 +1846,6 @@ fn channel_monitor_network_test() {
 #[test]
 fn test_justice_tx() {
        // Test justice txn built on revoked HTLC-Success tx, against both sides
-
        let mut alice_config = UserConfig::new();
        alice_config.channel_options.announced_channel = true;
        alice_config.peer_channel_config_limits.force_announced_channel_preference = false;
@@ -1855,7 +1854,8 @@ fn test_justice_tx() {
        bob_config.channel_options.announced_channel = true;
        bob_config.peer_channel_config_limits.force_announced_channel_preference = false;
        bob_config.own_channel_config.our_to_self_delay = 6 * 24 * 3;
-       let nodes = create_network(2, &[Some(alice_config), Some(bob_config)]);
+       let cfgs = [Some(alice_config), Some(bob_config)];
+       let nodes = create_network(2, &cfgs);
        // Create some new channels:
        let chan_5 = create_announced_chan_between_nodes(&nodes, 0, 1, LocalFeatures::new(), LocalFeatures::new());
 
@@ -1875,7 +1875,7 @@ fn test_justice_tx() {
 
        {
                let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+               nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
                {
                        let mut node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
                        assert_eq!(node_txn.len(), 3);
@@ -1887,10 +1887,10 @@ fn test_justice_tx() {
                }
                test_txn_broadcast(&nodes[1], &chan_5, None, HTLCType::NONE);
 
-               nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
                let node_txn = test_txn_broadcast(&nodes[0], &chan_5, Some(revoked_local_txn[0].clone()), HTLCType::TIMEOUT);
                header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn[1].clone()] }, 1);
+               nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[1].clone()] }, 1);
                test_revoked_htlc_claim_txn_broadcast(&nodes[1], node_txn[1].clone());
        }
        get_announce_close_broadcast_events(&nodes, 0, 1);
@@ -1914,7 +1914,7 @@ fn test_justice_tx() {
        claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage_4, 3_000_000);
        {
                let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
                {
                        let mut node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
                        assert_eq!(node_txn.len(), 3);
@@ -1926,10 +1926,10 @@ fn test_justice_tx() {
                }
                test_txn_broadcast(&nodes[0], &chan_6, None, HTLCType::NONE);
 
-               nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+               nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
                let node_txn = test_txn_broadcast(&nodes[1], &chan_6, Some(revoked_local_txn[0].clone()), HTLCType::SUCCESS);
                header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn[1].clone()] }, 1);
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[1].clone()] }, 1);
                test_revoked_htlc_claim_txn_broadcast(&nodes[0], node_txn[1].clone());
        }
        get_announce_close_broadcast_events(&nodes, 0, 1);
@@ -1953,7 +1953,7 @@ fn revoked_output_claim() {
 
        // Inform nodes[1] that nodes[0] broadcast a stale tx
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
        let node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
        assert_eq!(node_txn.len(), 3); // nodes[1] will broadcast justice tx twice, and its own local state once
 
@@ -1963,7 +1963,7 @@ fn revoked_output_claim() {
        check_spends!(node_txn[1], chan_1.3.clone());
 
        // Inform nodes[0] that a watchtower cheated on its behalf, so it will force-close the chan
-       nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+       nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
        get_announce_close_broadcast_events(&nodes, 0, 1);
 }
 
@@ -1996,9 +1996,9 @@ fn claim_htlc_outputs_shared_tx() {
 
        {
                let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
-               nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
-               connect_blocks(&nodes[1].chain_monitor, ANTI_REORG_DELAY - 1, 1, true, header.bitcoin_hash());
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+               nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+               connect_blocks(&nodes[1].block_notifier, ANTI_REORG_DELAY - 1, 1, true, header.bitcoin_hash());
 
                let events = nodes[1].node.get_and_clear_pending_events();
                assert_eq!(events.len(), 1);
@@ -2064,9 +2064,9 @@ fn claim_htlc_outputs_single_tx() {
 
        {
                let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 200);
-               nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 200);
-               connect_blocks(&nodes[1].chain_monitor, ANTI_REORG_DELAY - 1, 200, true, header.bitcoin_hash());
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 200);
+               nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 200);
+               connect_blocks(&nodes[1].block_notifier, ANTI_REORG_DELAY - 1, 200, true, header.bitcoin_hash());
 
                let events = nodes[1].node.get_and_clear_pending_events();
                assert_eq!(events.len(), 1);
@@ -2175,7 +2175,7 @@ fn test_htlc_on_chain_success() {
        assert!(updates.update_fail_malformed_htlcs.is_empty());
        assert_eq!(updates.update_fulfill_htlcs.len(), 1);
 
-       nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1);
+       nodes[2].block_notifier.block_connected(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1);
        check_closed_broadcast!(nodes[2]);
        let node_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().clone(); // ChannelManager : 1 (commitment tx), ChannelMonitor : 4 (2*2 * HTLC-Success tx)
        assert_eq!(node_txn.len(), 5);
@@ -2192,7 +2192,7 @@ fn test_htlc_on_chain_success() {
        assert_eq!(node_txn[1].lock_time, 0);
 
        // Verify that B's ChannelManager is able to extract preimage from HTLC Success tx and pass it backward
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: node_txn}, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: node_txn}, 1);
        let events = nodes[1].node.get_and_clear_pending_msg_events();
        {
                let mut added_monitors = nodes[1].chan_monitor.added_monitors.lock().unwrap();
@@ -2260,7 +2260,7 @@ fn test_htlc_on_chain_success() {
        // Broadcast preimage tx by B on offered output from A commitment tx  on A's chain
        let commitment_tx = nodes[0].node.channel_state.lock().unwrap().by_id.get(&chan_1.2).unwrap().last_local_commitment_txn.clone();
        check_spends!(commitment_tx[0], chan_1.3.clone());
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1);
        check_closed_broadcast!(nodes[1]);
        let node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().clone(); // ChannelManager : 1 (commitment tx), ChannelMonitor : 1 (HTLC-Success) * 2 (block-rescan)
        assert_eq!(node_txn.len(), 3);
@@ -2277,7 +2277,7 @@ fn test_htlc_on_chain_success() {
        // we already checked the same situation with A.
 
        // Verify that A's ChannelManager is able to extract preimage from preimage tx and generate PaymentSent
-       nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone(), node_txn[0].clone()] }, 1);
+       nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![commitment_tx[0].clone(), node_txn[0].clone()] }, 1);
        check_closed_broadcast!(nodes[0]);
        let events = nodes[0].node.get_and_clear_pending_events();
        assert_eq!(events.len(), 2);
@@ -2341,7 +2341,7 @@ fn test_htlc_on_chain_timeout() {
                },
                _ => panic!("Unexpected event"),
        };
-       nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1);
+       nodes[2].block_notifier.block_connected(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1);
        check_closed_broadcast!(nodes[2]);
        let node_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().clone(); // ChannelManager : 1 (commitment tx)
        assert_eq!(node_txn.len(), 1);
@@ -2350,7 +2350,7 @@ fn test_htlc_on_chain_timeout() {
 
        // Broadcast timeout transaction by B on received output from C's commitment tx on B's chain
        // Verify that B's ChannelManager is able to detect that HTLC is timeout by its own tx and react backward in consequence
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 200);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 200);
        let timeout_tx;
        {
                let mut node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
@@ -2372,8 +2372,8 @@ fn test_htlc_on_chain_timeout() {
                node_txn.clear();
        }
 
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![timeout_tx]}, 1);
-       connect_blocks(&nodes[1].chain_monitor, ANTI_REORG_DELAY - 1, 1, true, header.bitcoin_hash());
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![timeout_tx]}, 1);
+       connect_blocks(&nodes[1].block_notifier, ANTI_REORG_DELAY - 1, 1, true, header.bitcoin_hash());
        check_added_monitors!(nodes[1], 0);
        check_closed_broadcast!(nodes[1]);
 
@@ -2398,7 +2398,7 @@ fn test_htlc_on_chain_timeout() {
        let commitment_tx = nodes[1].node.channel_state.lock().unwrap().by_id.get(&chan_1.2).unwrap().last_local_commitment_txn.clone();
        check_spends!(commitment_tx[0], chan_1.3.clone());
 
-       nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 200);
+       nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 200);
        check_closed_broadcast!(nodes[0]);
        let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clone(); // ChannelManager : 2 (commitment tx, HTLC-Timeout tx), ChannelMonitor : 2 (timeout tx) * 2 block-rescan
        assert_eq!(node_txn.len(), 4);
@@ -2431,8 +2431,8 @@ fn test_simple_commitment_revoked_fail_backward() {
        route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 3000000);
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42};
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
-       connect_blocks(&nodes[1].chain_monitor, ANTI_REORG_DELAY - 1, 1, true, header.bitcoin_hash());
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+       connect_blocks(&nodes[1].block_notifier, ANTI_REORG_DELAY - 1, 1, true, header.bitcoin_hash());
        check_added_monitors!(nodes[1], 0);
        check_closed_broadcast!(nodes[1]);
 
@@ -2584,8 +2584,8 @@ fn do_test_commitment_revoked_fail_backward_exhaustive(deliver_bs_raa: bool, use
        assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42};
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
-       connect_blocks(&nodes[1].chain_monitor, ANTI_REORG_DELAY - 1, 1, true, header.bitcoin_hash());
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+       connect_blocks(&nodes[1].block_notifier, ANTI_REORG_DELAY - 1, 1, true, header.bitcoin_hash());
 
        let events = nodes[1].node.get_and_clear_pending_events();
        assert_eq!(events.len(), if deliver_bs_raa { 1 } else { 2 });
@@ -2706,12 +2706,12 @@ fn test_htlc_ignore_latest_remote_commitment() {
        assert_eq!(node_txn.len(), 2);
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0], &node_txn[1]], &[1; 2]);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[0].clone(), node_txn[1].clone()]}, 1);
        check_closed_broadcast!(nodes[1]);
 
        // Duplicate the block_connected call since this may happen due to other listeners
        // registering new transactions
-       nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&node_txn[0], &node_txn[1]], &[1; 2]);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[0].clone(), node_txn[1].clone()]}, 1);
 }
 
 #[test]
@@ -2766,7 +2766,7 @@ fn test_force_close_fail_back() {
        };
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[1].chain_monitor.block_connected_checked(&header, 1, &[&tx], &[1]);
+       nodes[1].block_notifier.block_connected_checked(&header, 1, &[&tx], &[1]);
 
        // Note no UpdateHTLCs event here from nodes[1] to nodes[0]!
        check_closed_broadcast!(nodes[1]);
@@ -2777,7 +2777,7 @@ fn test_force_close_fail_back() {
                monitors.get_mut(&OutPoint::new(Sha256dHash::from_slice(&payment_event.commitment_msg.channel_id[..]).unwrap(), 0)).unwrap()
                        .provide_payment_preimage(&our_payment_hash, &our_payment_preimage);
        }
-       nodes[2].chain_monitor.block_connected_checked(&header, 1, &[&tx], &[1]);
+       nodes[2].block_notifier.block_connected_checked(&header, 1, &[&tx], &[1]);
        let node_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap();
        assert_eq!(node_txn.len(), 1);
        assert_eq!(node_txn[0].input.len(), 1);
@@ -3085,7 +3085,7 @@ fn test_funding_peer_disconnect() {
        nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
        nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 
-       confirm_transaction(&nodes[0].chain_monitor, &tx, tx.version);
+       confirm_transaction(&nodes[0].block_notifier, &nodes[0].chain_monitor, &tx, tx.version);
        let events_1 = nodes[0].node.get_and_clear_pending_msg_events();
        assert_eq!(events_1.len(), 1);
        match events_1[0] {
@@ -3100,7 +3100,7 @@ fn test_funding_peer_disconnect() {
        nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
        nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
 
-       confirm_transaction(&nodes[1].chain_monitor, &tx, tx.version);
+       confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &tx, tx.version);
        let events_2 = nodes[1].node.get_and_clear_pending_msg_events();
        assert_eq!(events_2.len(), 2);
        let funding_locked = match events_2[0] {
@@ -3408,7 +3408,7 @@ fn test_no_txn_manager_serialize_deserialize() {
        assert!(nodes[0].chan_monitor.add_update_monitor(chan_0_monitor.get_funding_txo().unwrap(), chan_0_monitor).is_ok());
        nodes[0].node = Arc::new(nodes_0_deserialized);
        let nodes_0_as_listener: Arc<ChainListener> = nodes[0].node.clone();
-       nodes[0].chain_monitor.register_listener(Arc::downgrade(&nodes_0_as_listener));
+       nodes[0].block_notifier.register_listener(Arc::downgrade(&nodes_0_as_listener));
        assert_eq!(nodes[0].node.list_channels().len(), 1);
        check_added_monitors!(nodes[0], 1);
 
@@ -3462,7 +3462,6 @@ fn test_simple_manager_serialize_deserialize() {
                        keys_manager,
                        fee_estimator: Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }),
                        monitor: nodes[0].chan_monitor.clone(),
-                       chain_monitor: nodes[0].chain_monitor.clone(),
                        tx_broadcaster: nodes[0].tx_broadcaster.clone(),
                        logger: Arc::new(test_utils::TestLogger::new()),
                        channel_monitors: &channel_monitors,
@@ -3682,7 +3681,7 @@ fn test_claim_sizeable_push_msat() {
        assert_eq!(node_txn[0].output.len(), 2); // We can't force trimming of to_remote output as channel_reserve_satoshis block us to do so at channel opening
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn[0].clone()] }, 0);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[0].clone()] }, 0);
        let spend_txn = check_spendable_outputs!(nodes[1], 1);
        assert_eq!(spend_txn.len(), 1);
        check_spends!(spend_txn[0], node_txn[0].clone());
@@ -3692,7 +3691,6 @@ fn test_claim_sizeable_push_msat() {
 fn test_claim_on_remote_sizeable_push_msat() {
        // Same test as previous, just test on remote commitment tx, as per_commitment_point registration changes following you're funder/fundee and
        // to_remote output is encumbered by a P2WPKH
-
        let nodes = create_network(2, &[None, None]);
 
        let chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 99000000, LocalFeatures::new(), LocalFeatures::new());
@@ -3705,7 +3703,7 @@ fn test_claim_on_remote_sizeable_push_msat() {
        assert_eq!(node_txn[0].output.len(), 2); // We can't force trimming of to_remote output as channel_reserve_satoshis block us to do so at channel opening
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn[0].clone()] }, 0);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[0].clone()] }, 0);
        check_closed_broadcast!(nodes[1]);
        let spend_txn = check_spendable_outputs!(nodes[1], 1);
        assert_eq!(spend_txn.len(), 2);
@@ -3728,7 +3726,7 @@ fn test_claim_on_remote_revoked_sizeable_push_msat() {
 
        claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage, 3_000_000);
        let  header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
        check_closed_broadcast!(nodes[1]);
 
        let node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
@@ -3757,7 +3755,7 @@ fn test_static_spendable_outputs_preimage_tx() {
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
        assert!(nodes[1].node.claim_funds(payment_preimage, 3_000_000));
        check_added_monitors!(nodes[1], 1);
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone()] }, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![commitment_tx[0].clone()] }, 1);
        let events = nodes[1].node.get_and_clear_pending_msg_events();
        match events[0] {
                MessageSendEvent::UpdateHTLCs { .. } => {},
@@ -3796,7 +3794,7 @@ fn test_static_spendable_outputs_justice_tx_revoked_commitment_tx() {
        claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage, 3_000_000);
 
        let  header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
        check_closed_broadcast!(nodes[1]);
 
        let mut node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
@@ -3827,7 +3825,7 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_timeout_tx() {
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
        // A will generate HTLC-Timeout from revoked commitment tx
-       nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+       nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
        check_closed_broadcast!(nodes[0]);
 
        let revoked_htlc_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
@@ -3839,7 +3837,7 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_timeout_tx() {
        check_spends!(revoked_htlc_txn[1], chan_1.3.clone());
 
        // B will generate justice tx from A's revoked commitment/HTLC tx
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone(), revoked_htlc_txn[0].clone()] }, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone(), revoked_htlc_txn[0].clone()] }, 1);
        check_closed_broadcast!(nodes[1]);
 
        let node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
@@ -3871,7 +3869,7 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_success_tx() {
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
        // B will generate HTLC-Success from revoked commitment tx
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone()] }, 1);
        check_closed_broadcast!(nodes[1]);
        let revoked_htlc_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
 
@@ -3882,7 +3880,7 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_success_tx() {
        check_spends!(revoked_htlc_txn[0], revoked_local_txn[0].clone());
 
        // A will generate justice tx from B's revoked commitment/HTLC tx
-       nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![revoked_local_txn[0].clone(), revoked_htlc_txn[0].clone()] }, 1);
+       nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![revoked_local_txn[0].clone(), revoked_htlc_txn[0].clone()] }, 1);
        check_closed_broadcast!(nodes[0]);
 
        let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
@@ -3932,7 +3930,7 @@ fn test_onchain_to_onchain_claim() {
        assert_eq!(updates.update_fulfill_htlcs.len(), 1);
        assert!(updates.update_fail_malformed_htlcs.is_empty());
 
-       nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1);
+       nodes[2].block_notifier.block_connected(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1);
        check_closed_broadcast!(nodes[2]);
 
        let c_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().clone(); // ChannelManager : 2 (commitment tx, HTLC-Success tx), ChannelMonitor : 1 (HTLC-Success tx)
@@ -3947,7 +3945,7 @@ fn test_onchain_to_onchain_claim() {
        assert_eq!(c_txn[0].lock_time, 0); // Success tx
 
        // So we broadcast C's commitment tx and HTLC-Success on B's chain, we should successfully be able to extract preimage and update downstream monitor
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![c_txn[1].clone(), c_txn[2].clone()]}, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![c_txn[1].clone(), c_txn[2].clone()]}, 1);
        {
                let mut b_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
                assert_eq!(b_txn.len(), 4);
@@ -3981,7 +3979,7 @@ fn test_onchain_to_onchain_claim() {
        };
        // Broadcast A's commitment tx on B's chain to see if we are able to claim inbound HTLC with our HTLC-Success tx
        let commitment_tx = nodes[0].node.channel_state.lock().unwrap().by_id.get(&chan_1.2).unwrap().last_local_commitment_txn.clone();
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![commitment_tx[0].clone()]}, 1);
        let b_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
        assert_eq!(b_txn.len(), 3);
        check_spends!(b_txn[1], chan_1.3); // Local commitment tx, issued by ChannelManager
@@ -4012,7 +4010,7 @@ fn test_duplicate_payment_hash_one_failure_one_success() {
        check_spends!(commitment_txn[0], chan_2.3.clone());
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_txn[0].clone()] }, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![commitment_txn[0].clone()] }, 1);
        check_closed_broadcast!(nodes[1]);
 
        let htlc_timeout_tx;
@@ -4033,7 +4031,7 @@ fn test_duplicate_payment_hash_one_failure_one_success() {
        }
 
        nodes[2].node.claim_funds(our_payment_preimage, 900_000);
-       nodes[2].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![commitment_txn[0].clone()] }, 1);
+       nodes[2].block_notifier.block_connected(&Block { header, txdata: vec![commitment_txn[0].clone()] }, 1);
        check_added_monitors!(nodes[2], 2);
        let events = nodes[2].node.get_and_clear_pending_msg_events();
        match events[0] {
@@ -4057,8 +4055,8 @@ fn test_duplicate_payment_hash_one_failure_one_success() {
        check_spends!(htlc_success_txn[0], commitment_txn[0].clone());
        check_spends!(htlc_success_txn[1], commitment_txn[0].clone());
 
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![htlc_timeout_tx] }, 200);
-       connect_blocks(&nodes[1].chain_monitor, ANTI_REORG_DELAY - 1, 200, true, header.bitcoin_hash());
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![htlc_timeout_tx] }, 200);
+       connect_blocks(&nodes[1].block_notifier, ANTI_REORG_DELAY - 1, 200, true, header.bitcoin_hash());
        expect_pending_htlcs_forwardable!(nodes[1]);
        let htlc_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
        assert!(htlc_updates.update_add_htlcs.is_empty());
@@ -4089,7 +4087,7 @@ fn test_duplicate_payment_hash_one_failure_one_success() {
        }
 
        // Solve 2nd HTLC by broadcasting on B's chain HTLC-Success Tx from C
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![htlc_success_txn[0].clone()] }, 200);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![htlc_success_txn[0].clone()] }, 200);
        let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
        assert!(updates.update_add_htlcs.is_empty());
        assert!(updates.update_fail_htlcs.is_empty());
@@ -4126,7 +4124,7 @@ fn test_dynamic_spendable_outputs_local_htlc_success_tx() {
        nodes[1].node.claim_funds(payment_preimage, 9_000_000);
        check_added_monitors!(nodes[1], 1);
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![local_txn[0].clone()] }, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![local_txn[0].clone()] }, 1);
        let events = nodes[1].node.get_and_clear_pending_msg_events();
        match events[0] {
                MessageSendEvent::UpdateHTLCs { .. } => {},
@@ -4273,11 +4271,11 @@ fn do_test_fail_backwards_unrevoked_remote_announce(deliver_last_raa: bool, anno
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
        if announce_latest {
-               nodes[2].chain_monitor.block_connected_checked(&header, 1, &[&ds_last_commitment_tx[0]], &[1; 1]);
+               nodes[2].block_notifier.block_connected(&Block { header, txdata: vec![ds_last_commitment_tx[0].clone()]}, 1);
        } else {
-               nodes[2].chain_monitor.block_connected_checked(&header, 1, &[&ds_prev_commitment_tx[0]], &[1; 1]);
+               nodes[2].block_notifier.block_connected(&Block { header, txdata: vec![ds_prev_commitment_tx[0].clone()]}, 1);
        }
-       connect_blocks(&nodes[2].chain_monitor, ANTI_REORG_DELAY - 1, 1, true,  header.bitcoin_hash());
+       connect_blocks(&nodes[2].block_notifier, ANTI_REORG_DELAY - 1, 1, true,  header.bitcoin_hash());
        check_closed_broadcast!(nodes[2]);
        expect_pending_htlcs_forwardable!(nodes[2]);
        check_added_monitors!(nodes[2], 2);
@@ -4412,7 +4410,7 @@ fn test_dynamic_spendable_outputs_local_htlc_timeout_tx() {
 
        // Timeout HTLC on A's chain and so it can generate a HTLC-Timeout tx
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![local_txn[0].clone()] }, 200);
+       nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![local_txn[0].clone()] }, 200);
        check_closed_broadcast!(nodes[0]);
 
        let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
@@ -4443,12 +4441,12 @@ fn test_static_output_closing_tx() {
        let closing_tx = close_channel(&nodes[0], &nodes[1], &chan.2, chan.3, true).2;
 
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-       nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![closing_tx.clone()] }, 1);
+       nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![closing_tx.clone()] }, 1);
        let spend_txn = check_spendable_outputs!(nodes[0], 2);
        assert_eq!(spend_txn.len(), 1);
        check_spends!(spend_txn[0], closing_tx.clone());
 
-       nodes[1].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![closing_tx.clone()] }, 1);
+       nodes[1].block_notifier.block_connected(&Block { header, txdata: vec![closing_tx.clone()] }, 1);
        let spend_txn = check_spendable_outputs!(nodes[1], 2);
        assert_eq!(spend_txn.len(), 1);
        check_spends!(spend_txn[0], closing_tx);
@@ -4484,7 +4482,7 @@ fn do_htlc_claim_local_commitment_only(use_dust: bool) {
 
        let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
        for i in 1..TEST_FINAL_CLTV - CLTV_CLAIM_BUFFER + CHAN_CONFIRM_DEPTH + 1 {
-               nodes[1].chain_monitor.block_connected_checked(&header, i, &Vec::new(), &Vec::new());
+               nodes[1].block_notifier.block_connected_checked(&header, i, &Vec::new(), &Vec::new());
                header.prev_blockhash = header.bitcoin_hash();
        }
        test_txn_broadcast(&nodes[1], &chan, None, if use_dust { HTLCType::NONE } else { HTLCType::SUCCESS });
@@ -4507,8 +4505,9 @@ fn do_htlc_claim_current_remote_commitment_only(use_dust: bool) {
        // to "time out" the HTLC.
 
        let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+
        for i in 1..TEST_FINAL_CLTV + LATENCY_GRACE_PERIOD_BLOCKS + CHAN_CONFIRM_DEPTH + 1 {
-               nodes[0].chain_monitor.block_connected_checked(&header, i, &Vec::new(), &Vec::new());
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: Vec::new()}, i);
                header.prev_blockhash = header.bitcoin_hash();
        }
        test_txn_broadcast(&nodes[0], &chan, None, HTLCType::NONE);
@@ -4547,7 +4546,7 @@ fn do_htlc_claim_previous_remote_commitment_only(use_dust: bool, check_revoke_no
 
        let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
        for i in 1..TEST_FINAL_CLTV + LATENCY_GRACE_PERIOD_BLOCKS + CHAN_CONFIRM_DEPTH + 1 {
-               nodes[0].chain_monitor.block_connected_checked(&header, i, &Vec::new(), &Vec::new());
+               nodes[0].block_notifier.block_connected_checked(&header, i, &Vec::new(), &Vec::new());
                header.prev_blockhash = header.bitcoin_hash();
        }
        if !check_revoke_no_close {
@@ -4617,7 +4616,7 @@ fn run_onion_failure_test_with_fail_intercept<F1,F2,F3>(_name: &str, test_case:
        // reset block height
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
        for ix in 0..nodes.len() {
-               nodes[ix].chain_monitor.block_connected_checked(&header, 1, &Vec::new()[..], &[0; 0]);
+               nodes[ix].block_notifier.block_connected_checked(&header, 1, &[], &[]);
        }
 
        macro_rules! expect_event {
@@ -4950,7 +4949,8 @@ fn test_onion_failure() {
        run_onion_failure_test("expiry_too_soon", 0, &nodes, &route, &payment_hash, |msg| {
                let height = msg.cltv_expiry - CLTV_CLAIM_BUFFER - LATENCY_GRACE_PERIOD_BLOCKS + 1;
                let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[1].chain_monitor.block_connected_checked(&header, height, &Vec::new()[..], &[0; 0]);
+
+               nodes[1].block_notifier.block_connected_checked(&header, height, &[], &[]);
        }, ||{}, true, Some(UPDATE|14), Some(msgs::HTLCFailChannelUpdate::ChannelUpdateMessage{msg: ChannelUpdate::dummy()}));
 
        run_onion_failure_test("unknown_payment_hash", 2, &nodes, &route, &payment_hash, |_| {}, || {
@@ -4960,7 +4960,8 @@ fn test_onion_failure() {
        run_onion_failure_test("final_expiry_too_soon", 1, &nodes, &route, &payment_hash, |msg| {
                let height = msg.cltv_expiry - CLTV_CLAIM_BUFFER - LATENCY_GRACE_PERIOD_BLOCKS + 1;
                let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               nodes[2].chain_monitor.block_connected_checked(&header, height, &Vec::new()[..], &[0; 0]);
+
+               nodes[2].block_notifier.block_connected_checked(&header, height, &[], &[]);
        }, || {}, true, Some(17), None);
 
        run_onion_failure_test("final_incorrect_cltv_expiry", 1, &nodes, &route, &payment_hash, |_| {}, || {
@@ -5721,10 +5722,11 @@ fn do_test_failure_delay_dust_htlc_local_commitment(announce_latest: bool) {
        assert_ne!(as_prev_commitment_tx, as_last_commitment_tx);
        // Fail the 2 dust-HTLCs, move their failure in maturation buffer (htlc_updated_waiting_threshold_conf)
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
+
        if announce_latest {
-               nodes[0].chain_monitor.block_connected_checked(&header, 1, &[&as_last_commitment_tx[0]], &[1; 1]);
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![as_last_commitment_tx[0].clone()]}, 1);
        } else {
-               nodes[0].chain_monitor.block_connected_checked(&header, 1, &[&as_prev_commitment_tx[0]], &[1; 1]);
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![as_prev_commitment_tx[0].clone()]}, 1);
        }
 
        let events = nodes[0].node.get_and_clear_pending_msg_events();
@@ -5735,7 +5737,7 @@ fn do_test_failure_delay_dust_htlc_local_commitment(announce_latest: bool) {
        }
 
        assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
-       connect_blocks(&nodes[0].chain_monitor, ANTI_REORG_DELAY - 1, 1, true,  header.bitcoin_hash());
+       connect_blocks(&nodes[0].block_notifier, ANTI_REORG_DELAY - 1, 1, true,  header.bitcoin_hash());
        let events = nodes[0].node.get_and_clear_pending_events();
        // Only 2 PaymentFailed events should show up, over-dust HTLC has to be failed by timeout tx
        assert_eq!(events.len(), 2);
@@ -5804,7 +5806,7 @@ fn test_no_failure_dust_htlc_local_commitment() {
        assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
        assert_eq!(nodes[0].node.get_and_clear_pending_msg_events().len(), 0);
        // We broadcast a few more block to check everything is all right
-       connect_blocks(&nodes[0].chain_monitor, 20, 1, true,  header.bitcoin_hash());
+       connect_blocks(&nodes[0].block_notifier, 20, 1, true,  header.bitcoin_hash());
        assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
        assert_eq!(nodes[0].node.get_and_clear_pending_msg_events().len(), 0);
 
@@ -5841,7 +5843,7 @@ fn do_test_sweep_outbound_htlc_failure_update(revoked: bool, local: bool) {
        let mut timeout_tx = Vec::new();
        if local {
                // We fail dust-HTLC 1 by broadcast of local commitment tx
-               nodes[0].chain_monitor.block_connected_checked(&header, 1, &[&as_commitment_tx[0]], &[1; 1]);
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![as_commitment_tx[0].clone()]}, 1);
                let events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
                match events[0] {
@@ -5850,7 +5852,7 @@ fn do_test_sweep_outbound_htlc_failure_update(revoked: bool, local: bool) {
                }
                assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
                timeout_tx.push(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap()[0].clone());
-               let parent_hash  = connect_blocks(&nodes[0].chain_monitor, ANTI_REORG_DELAY - 1, 2, true, header.bitcoin_hash());
+               let parent_hash  = connect_blocks(&nodes[0].block_notifier, ANTI_REORG_DELAY - 1, 2, true, header.bitcoin_hash());
                let events = nodes[0].node.get_and_clear_pending_events();
                assert_eq!(events.len(), 1);
                match events[0] {
@@ -5863,9 +5865,9 @@ fn do_test_sweep_outbound_htlc_failure_update(revoked: bool, local: bool) {
                // We fail non-dust-HTLC 2 by broadcast of local HTLC-timeout tx on local commitment tx
                let header_2 = BlockHeader { version: 0x20000000, prev_blockhash: parent_hash, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
-               nodes[0].chain_monitor.block_connected_checked(&header_2, 7, &[&timeout_tx[0]], &[1; 1]);
+               nodes[0].block_notifier.block_connected(&Block { header: header_2, txdata: vec![timeout_tx[0].clone()]}, 7);
                let header_3 = BlockHeader { version: 0x20000000, prev_blockhash: header_2.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-               connect_blocks(&nodes[0].chain_monitor, ANTI_REORG_DELAY - 1, 8, true, header_3.bitcoin_hash());
+               connect_blocks(&nodes[0].block_notifier, ANTI_REORG_DELAY - 1, 8, true, header_3.bitcoin_hash());
                let events = nodes[0].node.get_and_clear_pending_events();
                assert_eq!(events.len(), 1);
                match events[0] {
@@ -5876,7 +5878,7 @@ fn do_test_sweep_outbound_htlc_failure_update(revoked: bool, local: bool) {
                }
        } else {
                // We fail dust-HTLC 1 by broadcast of remote commitment tx. If revoked, fail also non-dust HTLC
-               nodes[0].chain_monitor.block_connected_checked(&header, 1, &[&bs_commitment_tx[0]], &[1; 1]);
+               nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![bs_commitment_tx[0].clone()]}, 1);
                assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
                let events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
@@ -5885,7 +5887,7 @@ fn do_test_sweep_outbound_htlc_failure_update(revoked: bool, local: bool) {
                        _ => panic!("Unexpected event"),
                }
                timeout_tx.push(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap()[0].clone());
-               let parent_hash  = connect_blocks(&nodes[0].chain_monitor, ANTI_REORG_DELAY - 1, 2, true, header.bitcoin_hash());
+               let parent_hash  = connect_blocks(&nodes[0].block_notifier, ANTI_REORG_DELAY - 1, 2, true, header.bitcoin_hash());
                let header_2 = BlockHeader { version: 0x20000000, prev_blockhash: parent_hash, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
                if !revoked {
                        let events = nodes[0].node.get_and_clear_pending_events();
@@ -5898,10 +5900,10 @@ fn do_test_sweep_outbound_htlc_failure_update(revoked: bool, local: bool) {
                        }
                        assert_eq!(timeout_tx[0].input[0].witness.last().unwrap().len(), ACCEPTED_HTLC_SCRIPT_WEIGHT);
                        // We fail non-dust-HTLC 2 by broadcast of local timeout tx on remote commitment tx
-                       nodes[0].chain_monitor.block_connected_checked(&header_2, 7, &[&timeout_tx[0]], &[1; 1]);
+                       nodes[0].block_notifier.block_connected(&Block { header: header_2, txdata: vec![timeout_tx[0].clone()]}, 7);
                        assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
                        let header_3 = BlockHeader { version: 0x20000000, prev_blockhash: header_2.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
-                       connect_blocks(&nodes[0].chain_monitor, ANTI_REORG_DELAY - 1, 8, true, header_3.bitcoin_hash());
+                       connect_blocks(&nodes[0].block_notifier, ANTI_REORG_DELAY - 1, 8, true, header_3.bitcoin_hash());
                        let events = nodes[0].node.get_and_clear_pending_events();
                        assert_eq!(events.len(), 1);
                        match events[0] {
@@ -5950,7 +5952,8 @@ fn test_upfront_shutdown_script() {
        config.channel_options.announced_channel = true;
        config.peer_channel_config_limits.force_announced_channel_preference = false;
        config.channel_options.commit_upfront_shutdown_pubkey = false;
-       let nodes = create_network(3, &[None, Some(config), None]);
+       let cfgs = [None, Some(config), None];
+       let nodes = create_network(3, &cfgs);
 
        // We test that in case of peer committing upfront to a script, if it changes at closing, we refuse to sign
        let flags = LocalFeatures::new();
@@ -6047,7 +6050,8 @@ fn test_user_configurable_csv_delay() {
        low_our_to_self_config.own_channel_config.our_to_self_delay = 6;
        let mut high_their_to_self_config = UserConfig::new();
        high_their_to_self_config.peer_channel_config_limits.their_to_self_delay = 100;
-       let nodes = create_network(2, &[Some(high_their_to_self_config.clone()), None]);
+       let cfgs = [Some(high_their_to_self_config.clone()), None];
+       let nodes = create_network(2, &cfgs);
 
        // We test config.our_to_self > BREAKDOWN_TIMEOUT is enforced in Channel::new_outbound()
        let keys_manager: Arc<KeysInterface> = Arc::new(KeysManager::new(&nodes[0].node_seed, Network::Testnet, Arc::new(test_utils::TestLogger::new()), 10, 20));
@@ -6138,6 +6142,12 @@ fn test_data_loss_protect() {
        monitor.add_update_monitor(OutPoint { txid: chan.3.txid(), index: 0 }, chan_monitor.clone()).is_ok();
        nodes[0].chan_monitor = monitor;
        nodes[0].chain_monitor = chain_monitor;
+
+       let weak_res = Arc::downgrade(&nodes[0].chan_monitor.simple_monitor);
+       nodes[0].block_notifier.register_listener(weak_res);
+       let weak_res = Arc::downgrade(&nodes[0].node);
+       nodes[0].block_notifier.register_listener(weak_res);
+
        check_added_monitors!(nodes[0], 1);
 
        nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id());
@@ -6194,7 +6204,7 @@ fn test_data_loss_protect() {
        check_spends!(node_txn[0], chan.3.clone());
        assert_eq!(node_txn[0].output.len(), 2);
        let header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42};
-       nodes[0].chain_monitor.block_connected_with_filtering(&Block { header, txdata: vec![node_txn[0].clone()]}, 1);
+       nodes[0].block_notifier.block_connected(&Block { header, txdata: vec![node_txn[0].clone()]}, 1);
        let spend_txn = check_spendable_outputs!(nodes[0], 1);
        assert_eq!(spend_txn.len(), 1);
        check_spends!(spend_txn[0], node_txn[0].clone());