Merge pull request #2411 from valentinewallace/2023-07-blinded-onion-keys
[rust-lightning] / lightning / src / chain / chainmonitor.rs
index 8dba235f2127fd98faf400fa3b83978047b11bdd..16a02b54a3bbe4e848f194f376a541c6bafa860a 100644 (file)
@@ -31,12 +31,13 @@ use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
 use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
 use crate::chain::transaction::{OutPoint, TransactionData};
-use crate::chain::keysinterface::Sign;
+use crate::sign::WriteableEcdsaChannelSigner;
+use crate::events;
+use crate::events::{Event, EventHandler};
 use crate::util::atomic_counter::AtomicCounter;
 use crate::util::logger::Logger;
 use crate::util::errors::APIError;
-use crate::util::events;
-use crate::util::events::{Event, EventHandler};
+use crate::util::wakers::{Future, Notifier};
 use crate::ln::channelmanager::ChannelDetails;
 
 use crate::prelude::*;
@@ -68,7 +69,7 @@ impl MonitorUpdateId {
        pub(crate) fn from_monitor_update(update: &ChannelMonitorUpdate) -> Self {
                Self { contents: UpdateOrigin::OffChain(update.update_id) }
        }
-       pub(crate) fn from_new_monitor<ChannelSigner: Sign>(monitor: &ChannelMonitor<ChannelSigner>) -> Self {
+       pub(crate) fn from_new_monitor<ChannelSigner: WriteableEcdsaChannelSigner>(monitor: &ChannelMonitor<ChannelSigner>) -> Self {
                Self { contents: UpdateOrigin::OffChain(monitor.get_latest_update_id()) }
        }
 }
@@ -93,7 +94,7 @@ impl MonitorUpdateId {
 ///    [`ChannelMonitorUpdateStatus::PermanentFailure`], in which case the channel will likely be
 ///    closed without broadcasting the latest state. See
 ///    [`ChannelMonitorUpdateStatus::PermanentFailure`] for more details.
-pub trait Persist<ChannelSigner: Sign> {
+pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
        /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is
        /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup.
        ///
@@ -144,10 +145,10 @@ pub trait Persist<ChannelSigner: Sign> {
        /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
        ///
        /// [`Writeable::write`]: crate::util::ser::Writeable::write
-       fn update_persisted_channel(&self, channel_id: OutPoint, update: &Option<ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
+       fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
 }
 
-struct MonitorHolder<ChannelSigner: Sign> {
+struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
        monitor: ChannelMonitor<ChannelSigner>,
        /// The full set of pending monitor updates for this Channel.
        ///
@@ -182,7 +183,7 @@ struct MonitorHolder<ChannelSigner: Sign> {
        last_chain_persist_height: AtomicUsize,
 }
 
-impl<ChannelSigner: Sign> MonitorHolder<ChannelSigner> {
+impl<ChannelSigner: WriteableEcdsaChannelSigner> MonitorHolder<ChannelSigner> {
        fn has_pending_offchain_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<MonitorUpdateId>>) -> bool {
                pending_monitor_updates_lock.iter().any(|update_id|
                        if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false })
@@ -197,12 +198,12 @@ impl<ChannelSigner: Sign> MonitorHolder<ChannelSigner> {
 ///
 /// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is
 /// released.
-pub struct LockedChannelMonitor<'a, ChannelSigner: Sign> {
+pub struct LockedChannelMonitor<'a, ChannelSigner: WriteableEcdsaChannelSigner> {
        lock: RwLockReadGuard<'a, HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
        funding_txo: OutPoint,
 }
 
-impl<ChannelSigner: Sign> Deref for LockedChannelMonitor<'_, ChannelSigner> {
+impl<ChannelSigner: WriteableEcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
        type Target = ChannelMonitor<ChannelSigner>;
        fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
                &self.lock.get(&self.funding_txo).expect("Checked at construction").monitor
@@ -216,9 +217,16 @@ impl<ChannelSigner: Sign> Deref for LockedChannelMonitor<'_, ChannelSigner> {
 /// or used independently to monitor channels remotely. See the [module-level documentation] for
 /// details.
 ///
+/// Note that `ChainMonitor` should regularly trigger rebroadcasts/fee bumps of pending claims from
+/// a force-closed channel. This is crucial in preventing certain classes of pinning attacks,
+/// detecting substantial mempool feerate changes between blocks, and ensuring reliability if
+/// broadcasting fails. We recommend invoking this every 30 seconds, or lower if running in an
+/// environment with spotty connections, like on mobile.
+///
 /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
 /// [module-level documentation]: crate::chain::chainmonitor
-pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
+/// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims
+pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
        where C::Target: chain::Filter,
         T::Target: BroadcasterInterface,
         F::Target: FeeEstimator,
@@ -240,9 +248,11 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
        pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
        /// The best block height seen, used as a proxy for the passage of time.
        highest_chain_height: AtomicUsize,
+
+       event_notifier: Notifier,
 }
 
-impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
+impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
 where C::Target: chain::Filter,
            T::Target: BroadcasterInterface,
            F::Target: FeeEstimator,
@@ -294,12 +304,13 @@ where C::Target: chain::Filter,
                                }
 
                                log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
-                               match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
+                               match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
                                        ChannelMonitorUpdateStatus::Completed =>
                                                log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
                                        ChannelMonitorUpdateStatus::PermanentFailure => {
                                                monitor_state.channel_perm_failed.store(true, Ordering::Release);
                                                self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
+                                               self.event_notifier.notify();
                                        },
                                        ChannelMonitorUpdateStatus::InProgress => {
                                                log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
@@ -345,6 +356,7 @@ where C::Target: chain::Filter,
                        persister,
                        pending_monitor_events: Mutex::new(Vec::new()),
                        highest_chain_height: AtomicUsize::new(0),
+                       event_notifier: Notifier::new(),
                }
        }
 
@@ -352,8 +364,7 @@ where C::Target: chain::Filter,
        /// claims which are awaiting confirmation.
        ///
        /// Includes the balances from each [`ChannelMonitor`] *except* those included in
-       /// `ignored_channels`, allowing you to filter out balances from channels which are still open
-       /// (and whose balance should likely be pulled from the [`ChannelDetails`]).
+       /// `ignored_channels`.
        ///
        /// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for
        /// inclusion in the return value.
@@ -472,6 +483,7 @@ where C::Target: chain::Filter,
                                }
                        },
                }
+               self.event_notifier.notify();
                Ok(())
        }
 
@@ -486,11 +498,12 @@ where C::Target: chain::Filter,
                        funding_txo,
                        monitor_update_id,
                }], counterparty_node_id));
+               self.event_notifier.notify();
        }
 
-       #[cfg(any(test, fuzzing, feature = "_test_utils"))]
+       #[cfg(any(test, feature = "_test_utils"))]
        pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
-               use crate::util::events::EventsProvider;
+               use crate::events::EventsProvider;
                let events = core::cell::RefCell::new(Vec::new());
                let event_handler = |event: events::Event| events.borrow_mut().push(event);
                self.process_pending_events(&event_handler);
@@ -502,21 +515,48 @@ where C::Target: chain::Filter,
        ///
        /// See the trait-level documentation of [`EventsProvider`] for requirements.
        ///
-       /// [`EventsProvider`]: crate::util::events::EventsProvider
+       /// [`EventsProvider`]: crate::events::EventsProvider
        pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
                &self, handler: H
        ) {
-               let mut pending_events = Vec::new();
-               for monitor_state in self.monitors.read().unwrap().values() {
-                       pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
+               // Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
+               // crazy dance to process a monitor's events then only remove them once we've done so.
+               let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
+               for funding_txo in mons_to_process {
+                       let mut ev;
+                       super::channelmonitor::process_events_body!(
+                               self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
                }
-               for event in pending_events {
-                       handler(event).await;
+       }
+
+       /// Gets a [`Future`] that completes when an event is available either via
+       /// [`chain::Watch::release_pending_monitor_events`] or
+       /// [`EventsProvider::process_pending_events`].
+       ///
+       /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
+       /// [`ChainMonitor`] and should instead register actions to be taken later.
+       ///
+       /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
+       pub fn get_update_future(&self) -> Future {
+               self.event_notifier.get_future()
+       }
+
+       /// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
+       /// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
+       /// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
+       /// invoking this every 30 seconds, or lower if running in an environment with spotty
+       /// connections, like on mobile.
+       pub fn rebroadcast_pending_claims(&self) {
+               let monitors = self.monitors.read().unwrap();
+               for (_, monitor_holder) in &*monitors {
+                       monitor_holder.monitor.rebroadcast_pending_claims(
+                               &*self.broadcaster, &*self.fee_estimator, &*self.logger
+                       )
                }
        }
 }
 
-impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
+impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
 chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P>
 where
        C::Target: chain::Filter,
@@ -543,7 +583,7 @@ where
        }
 }
 
-impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
+impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
 chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P>
 where
        C::Target: chain::Filter,
@@ -592,7 +632,7 @@ where
        }
 }
 
-impl<ChannelSigner: Sign, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref >
+impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref >
 chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
 where C::Target: chain::Filter,
            T::Target: BroadcasterInterface,
@@ -646,7 +686,7 @@ where C::Target: chain::Filter,
 
        /// Note that we persist the given `ChannelMonitor` update while holding the
        /// `ChainMonitor` monitors lock.
-       fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
+       fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
                // Update the monitor that watches the channel referred to by the given outpoint.
                let monitors = self.monitors.read().unwrap();
                match monitors.get(&funding_txo) {
@@ -664,15 +704,15 @@ where C::Target: chain::Filter,
                        Some(monitor_state) => {
                                let monitor = &monitor_state.monitor;
                                log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
-                               let update_res = monitor.update_monitor(&update, &self.broadcaster, &*self.fee_estimator, &self.logger);
+                               let update_res = monitor.update_monitor(update, &self.broadcaster, &*self.fee_estimator, &self.logger);
                                if update_res.is_err() {
                                        log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor));
                                }
                                // Even if updating the monitor returns an error, the monitor's state will
                                // still be changed. So, persist the updated monitor despite the error.
-                               let update_id = MonitorUpdateId::from_monitor_update(&update);
+                               let update_id = MonitorUpdateId::from_monitor_update(update);
                                let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
-                               let persist_res = self.persister.update_persisted_channel(funding_txo, &Some(update), monitor, update_id);
+                               let persist_res = self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id);
                                match persist_res {
                                        ChannelMonitorUpdateStatus::InProgress => {
                                                pending_monitor_updates.push(update_id);
@@ -735,37 +775,20 @@ where C::Target: chain::Filter,
        }
 }
 
-impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P>
+impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P>
        where C::Target: chain::Filter,
              T::Target: BroadcasterInterface,
              F::Target: FeeEstimator,
              L::Target: Logger,
              P::Target: Persist<ChannelSigner>,
 {
-       #[cfg(not(anchors))]
-       /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
-       ///
-       /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
-       /// order to handle these events.
-       ///
-       /// [`SpendableOutputs`]: events::Event::SpendableOutputs
-       fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
-               let mut pending_events = Vec::new();
-               for monitor_state in self.monitors.read().unwrap().values() {
-                       pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
-               }
-               for event in pending_events {
-                       handler.handle_event(event);
-               }
-       }
-       #[cfg(anchors)]
        /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
        ///
        /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
        /// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
        /// within each channel. As the confirmation of a commitment transaction may be critical to the
-       /// safety of funds, this method must be invoked frequently, ideally once for every chain tip
-       /// update (block connected or disconnected).
+       /// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an
+       /// environment with spotty connections, like on mobile.
        ///
        /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
        /// order to handle these events.
@@ -773,30 +796,24 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
        /// [`SpendableOutputs`]: events::Event::SpendableOutputs
        /// [`BumpTransaction`]: events::Event::BumpTransaction
        fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
-               let mut pending_events = Vec::new();
                for monitor_state in self.monitors.read().unwrap().values() {
-                       pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
-               }
-               for event in pending_events {
-                       handler.handle_event(event);
+                       monitor_state.monitor.process_pending_events(&handler);
                }
        }
 }
 
 #[cfg(test)]
 mod tests {
-       use bitcoin::{BlockHeader, TxMerkleNode};
-       use bitcoin::hashes::Hash;
        use crate::{check_added_monitors, check_closed_broadcast, check_closed_event};
-       use crate::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
+       use crate::{expect_payment_claimed, expect_payment_path_successful, get_event_msg};
        use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
        use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch};
        use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
-       use crate::ln::channelmanager::{self, PaymentSendFailure, PaymentId};
+       use crate::events::{Event, ClosureReason, MessageSendEvent, MessageSendEventsProvider};
+       use crate::ln::channelmanager::{PaymentSendFailure, PaymentId, RecipientOnionFields};
        use crate::ln::functional_test_utils::*;
        use crate::ln::msgs::ChannelMessageHandler;
        use crate::util::errors::APIError;
-       use crate::util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
 
        #[test]
        fn test_async_ooo_offchain_updates() {
@@ -807,7 +824,7 @@ mod tests {
                let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
                let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
                let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-               create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
+               create_announced_chan_between_nodes(&nodes, 0, 1);
 
                // Route two payments to be claimed at the same time.
                let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
@@ -819,10 +836,8 @@ mod tests {
 
                nodes[1].node.claim_funds(payment_preimage_1);
                check_added_monitors!(nodes[1], 1);
-               expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
                nodes[1].node.claim_funds(payment_preimage_2);
                check_added_monitors!(nodes[1], 1);
-               expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
 
                let persistences = chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clone();
                assert_eq!(persistences.len(), 1);
@@ -850,14 +865,30 @@ mod tests {
                        .find(|(txo, _)| txo == funding_txo).unwrap().1.contains(&next_update));
                assert!(nodes[1].chain_monitor.release_pending_monitor_events().is_empty());
                assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+               assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
                nodes[1].chain_monitor.chain_monitor.channel_monitor_updated(*funding_txo, update_iter.next().unwrap().clone()).unwrap();
 
+               let claim_events = nodes[1].node.get_and_clear_pending_events();
+               assert_eq!(claim_events.len(), 2);
+               match claim_events[0] {
+                       Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
+                               assert_eq!(payment_hash_1, *payment_hash);
+                       },
+                       _ => panic!("Unexpected event"),
+               }
+               match claim_events[1] {
+                       Event::PaymentClaimed { ref payment_hash, amount_msat: 1_000_000, .. } => {
+                               assert_eq!(payment_hash_2, *payment_hash);
+                       },
+                       _ => panic!("Unexpected event"),
+               }
+
                // Now manually walk the commitment signed dance - because we claimed two payments
                // back-to-back it doesn't fit into the neat walk commitment_signed_dance does.
 
                let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
                nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
-               expect_payment_sent_without_paths!(nodes[0], payment_preimage_1);
+               expect_payment_sent(&nodes[0], payment_preimage_1, None, false, false);
                nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &updates.commitment_signed);
                check_added_monitors!(nodes[0], 1);
                let (as_first_raa, as_first_update) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
@@ -870,7 +901,7 @@ mod tests {
                let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id());
 
                nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_second_updates.update_fulfill_htlcs[0]);
-               expect_payment_sent_without_paths!(nodes[0], payment_preimage_2);
+               expect_payment_sent(&nodes[0], payment_preimage_2, None, false, false);
                nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_second_updates.commitment_signed);
                check_added_monitors!(nodes[0], 1);
                nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_first_raa);
@@ -897,8 +928,7 @@ mod tests {
                let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
                let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
                let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-               let channel = create_announced_chan_between_nodes(
-                       &nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
+               let channel = create_announced_chan_between_nodes(&nodes, 0, 1);
 
                // Get a route for later and rebalance the channel somewhat
                send_payment(&nodes[0], &[&nodes[1]], 10_000_000);
@@ -919,10 +949,7 @@ mod tests {
 
                // Connect B's commitment transaction, but only to the ChainMonitor/ChannelMonitor. The
                // channel is now closed, but the ChannelManager doesn't know that yet.
-               let new_header = BlockHeader {
-                       version: 2, time: 0, bits: 0, nonce: 0,
-                       prev_blockhash: nodes[0].best_block_info().0,
-                       merkle_root: TxMerkleNode::all_zeros() };
+               let new_header = create_dummy_header(nodes[0].best_block_info().0, 0);
                nodes[0].chain_monitor.chain_monitor.transactions_confirmed(&new_header,
                        &[(0, &remote_txn[0]), (1, &remote_txn[1])], nodes[0].best_block_info().1 + 1);
                assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
@@ -932,12 +959,14 @@ mod tests {
                // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass
                // the update through to the ChannelMonitor which will refuse it (as the channel is closed).
                chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
-               unwrap_send_err!(nodes[0].node.send_payment(&route, second_payment_hash, &Some(second_payment_secret), PaymentId(second_payment_hash.0)),
-                       true, APIError::ChannelUnavailable { ref err },
+               unwrap_send_err!(nodes[0].node.send_payment_with_route(&route, second_payment_hash,
+                               RecipientOnionFields::secret_only(second_payment_secret), PaymentId(second_payment_hash.0)
+                       ), true, APIError::ChannelUnavailable { ref err },
                        assert!(err.contains("ChannelMonitor storage failure")));
                check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update
                check_closed_broadcast!(nodes[0], true);
-               check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
+               check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() }, 
+                       [nodes[1].node.get_our_node_id()], 100000);
 
                // However, as the ChainMonitor is still waiting for the original persistence to complete,
                // it won't yet release the MonitorEvents.
@@ -945,10 +974,7 @@ mod tests {
 
                if block_timeout {
                        // After three blocks, pending MontiorEvents should be released either way.
-                       let latest_header = BlockHeader {
-                               version: 2, time: 0, bits: 0, nonce: 0,
-                               prev_blockhash: nodes[0].best_block_info().0,
-                               merkle_root: TxMerkleNode::all_zeros() };
+                       let latest_header = create_dummy_header(nodes[0].best_block_info().0, 0);
                        nodes[0].chain_monitor.chain_monitor.best_block_updated(&latest_header, nodes[0].best_block_info().1 + LATENCY_GRACE_PERIOD_BLOCKS);
                } else {
                        let persistences = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clone();
@@ -959,7 +985,7 @@ mod tests {
                        }
                }
 
-               expect_payment_sent!(nodes[0], payment_preimage);
+               expect_payment_sent(&nodes[0], payment_preimage, None, true, false);
        }
 
        #[test]
@@ -974,7 +1000,7 @@ mod tests {
                let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
                let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
                let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-               create_announced_chan_between_nodes(&nodes, 0, 1, channelmanager::provided_init_features(), channelmanager::provided_init_features());
+               create_announced_chan_between_nodes(&nodes, 0, 1);
 
                chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
                chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
@@ -987,7 +1013,8 @@ mod tests {
                // ... however once we get events once, the channel will close, creating a channel-closed
                // ChannelMonitorUpdate.
                check_closed_broadcast!(nodes[0], true);
-               check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() });
+               check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() },
+                       [nodes[1].node.get_our_node_id()], 100000);
                check_added_monitors!(nodes[0], 1);
        }
 }