Inform ChannelManager when fulfilled HTLCs are finalized
[rust-lightning] / lightning / src / ln / channelmanager.rs
index e04217a9f29ca58860d0650e39babe35388d604f..019d61c57c09a2d3e8f0606a3353a49198cc5425 100644 (file)
@@ -36,9 +36,9 @@ use bitcoin::secp256k1::ecdh::SharedSecret;
 use bitcoin::secp256k1;
 
 use chain;
-use chain::{Confirm, Watch, BestBlock};
+use chain::{Confirm, ChannelMonitorUpdateErr, Watch, BestBlock};
 use chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, ChannelMonitorUpdateErr, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID};
+use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID};
 use chain::transaction::{OutPoint, TransactionData};
 // Since this struct is returned in `list_channels` methods, expose it here in case users want to
 // construct one themselves.
@@ -242,7 +242,7 @@ type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource
 
 struct MsgHandleErrInternal {
        err: msgs::LightningError,
-       chan_id: Option<[u8; 32]>, // If Some a channel of ours has been closed
+       chan_id: Option<([u8; 32], u64)>, // If Some a channel of ours has been closed
        shutdown_finish: Option<(ShutdownResult, Option<msgs::ChannelUpdate>)>,
 }
 impl MsgHandleErrInternal {
@@ -278,7 +278,7 @@ impl MsgHandleErrInternal {
                Self { err, chan_id: None, shutdown_finish: None }
        }
        #[inline]
-       fn from_finish_shutdown(err: String, channel_id: [u8; 32], shutdown_res: ShutdownResult, channel_update: Option<msgs::ChannelUpdate>) -> Self {
+       fn from_finish_shutdown(err: String, channel_id: [u8; 32], user_channel_id: u64, shutdown_res: ShutdownResult, channel_update: Option<msgs::ChannelUpdate>) -> Self {
                Self {
                        err: LightningError {
                                err: err.clone(),
@@ -289,7 +289,7 @@ impl MsgHandleErrInternal {
                                        },
                                },
                        },
-                       chan_id: Some(channel_id),
+                       chan_id: Some((channel_id, user_channel_id)),
                        shutdown_finish: Some((shutdown_res, channel_update)),
                }
        }
@@ -402,7 +402,7 @@ struct PendingInboundPayment {
 
 /// Stores the session_priv for each part of a payment that is still pending. For versions 0.0.102
 /// and later, also stores information for retrying the payment.
-enum PendingOutboundPayment {
+pub(crate) enum PendingOutboundPayment {
        Legacy {
                session_privs: HashSet<[u8; 32]>,
        },
@@ -413,6 +413,8 @@ enum PendingOutboundPayment {
                pending_amt_msat: u64,
                /// The total payment amount across all paths, used to verify that a retry is not overpaying.
                total_msat: u64,
+               /// Our best known block height at the time this payment was initiated.
+               starting_block_height: u32,
        },
 }
 
@@ -774,8 +776,8 @@ pub struct ChannelDetails {
        ///
        /// [`outbound_capacity_msat`]: ChannelDetails::outbound_capacity_msat
        pub unspendable_punishment_reserve: Option<u64>,
-       /// The user_id passed in to create_channel, or 0 if the channel was inbound.
-       pub user_id: u64,
+       /// The `user_channel_id` passed in to create_channel, or 0 if the channel was inbound.
+       pub user_channel_id: u64,
        /// The available outbound capacity for sending HTLCs to the remote peer. This does not include
        /// any pending HTLCs which are not yet fully resolved (and, thus, who's balance is not
        /// available for inclusion in new outbound HTLCs). This further does not include any pending
@@ -892,8 +894,11 @@ macro_rules! handle_error {
                                                        msg: update
                                                });
                                        }
-                                       if let Some(channel_id) = chan_id {
-                                               $self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id,  reason: ClosureReason::ProcessingError { err: err.err.clone() } });
+                                       if let Some((channel_id, user_channel_id)) = chan_id {
+                                               $self.pending_events.lock().unwrap().push(events::Event::ChannelClosed {
+                                                       channel_id, user_channel_id,
+                                                       reason: ClosureReason::ProcessingError { err: err.err.clone() }
+                                               });
                                        }
                                }
 
@@ -935,7 +940,8 @@ macro_rules! convert_chan_err {
                                        $short_to_id.remove(&short_id);
                                }
                                let shutdown_res = $channel.force_shutdown(true);
-                               (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $self.get_channel_update_for_broadcast(&$channel).ok()))
+                               (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel.get_user_id(),
+                                       shutdown_res, $self.get_channel_update_for_broadcast(&$channel).ok()))
                        },
                        ChannelError::CloseDelayBroadcast(msg) => {
                                log_error!($self.logger, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($channel_id[..]), msg);
@@ -943,7 +949,8 @@ macro_rules! convert_chan_err {
                                        $short_to_id.remove(&short_id);
                                }
                                let shutdown_res = $channel.force_shutdown(false);
-                               (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $self.get_channel_update_for_broadcast(&$channel).ok()))
+                               (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, $channel.get_user_id(),
+                                       shutdown_res, $self.get_channel_update_for_broadcast(&$channel).ok()))
                        }
                }
        }
@@ -995,7 +1002,7 @@ macro_rules! handle_monitor_err {
        ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
                handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new())
        };
-       ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $chan_id: expr) => {
+       ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => {
                match $err {
                        ChannelMonitorUpdateErr::PermanentFailure => {
                                log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateErr::PermanentFailure", log_bytes!($chan_id[..]));
@@ -1011,12 +1018,12 @@ macro_rules! handle_monitor_err {
                                // splitting hairs we'd prefer to claim payments that were to us, but we haven't
                                // given up the preimage yet, so might as well just wait until the payment is
                                // retried, avoiding the on-chain fees.
-                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id,
+                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.get_user_id(),
                                                $chan.force_shutdown(true), $self.get_channel_update_for_broadcast(&$chan).ok() ));
                                (res, true)
                        },
                        ChannelMonitorUpdateErr::TemporaryFailure => {
-                               log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards and {} fails",
+                               log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations",
                                                log_bytes!($chan_id[..]),
                                                if $resend_commitment && $resend_raa {
                                                                match $action_type {
@@ -1027,25 +1034,29 @@ macro_rules! handle_monitor_err {
                                                        else if $resend_raa { "RAA" }
                                                        else { "nothing" },
                                                (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(),
-                                               (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len());
+                                               (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(),
+                                               (&$failed_finalized_fulfills as &Vec<HTLCSource>).len());
                                if !$resend_commitment {
                                        debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa);
                                }
                                if !$resend_raa {
                                        debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
                                }
-                               $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails);
+                               $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $failed_finalized_fulfills);
                                (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
                        },
                }
        };
-       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { {
-               let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $entry.key());
+       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { {
+               let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key());
                if drop {
                        $entry.remove_entry();
                }
                res
        } };
+       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
+               handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, Vec::new());
+       }
 }
 
 macro_rules! return_monitor_err {
@@ -1265,22 +1276,31 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
        /// Creates a new outbound channel to the given remote node and with the given value.
        ///
-       /// user_id will be provided back as user_channel_id in FundingGenerationReady events to allow
-       /// tracking of which events correspond with which create_channel call. Note that the
-       /// user_channel_id defaults to 0 for inbound channels, so you may wish to avoid using 0 for
-       /// user_id here. user_id has no meaning inside of LDK, it is simply copied to events and
-       /// otherwise ignored.
-       ///
-       /// If successful, will generate a SendOpenChannel message event, so you should probably poll
-       /// PeerManager::process_events afterwards.
+       /// `user_channel_id` will be provided back as in
+       /// [`Event::FundingGenerationReady::user_channel_id`] to allow tracking of which events
+       /// correspond with which `create_channel` call. Note that the `user_channel_id` defaults to 0
+       /// for inbound channels, so you may wish to avoid using 0 for `user_channel_id` here.
+       /// `user_channel_id` has no meaning inside of LDK, it is simply copied to events and otherwise
+       /// ignored.
        ///
-       /// Raises APIError::APIMisuseError when channel_value_satoshis > 2**24 or push_msat is
-       /// greater than channel_value_satoshis * 1k or channel_value_satoshis is < 1000.
+       /// Raises [`APIError::APIMisuseError`] when `channel_value_satoshis` > 2**24 or `push_msat` is
+       /// greater than `channel_value_satoshis * 1k` or `channel_value_satoshis < 1000`.
        ///
        /// Note that we do not check if you are currently connected to the given peer. If no
        /// connection is available, the outbound `open_channel` message may fail to send, resulting in
-       /// the channel eventually being silently forgotten.
-       pub fn create_channel(&self, their_network_key: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_id: u64, override_config: Option<UserConfig>) -> Result<(), APIError> {
+       /// the channel eventually being silently forgotten (dropped on reload).
+       ///
+       /// Returns the new Channel's temporary `channel_id`. This ID will appear as
+       /// [`Event::FundingGenerationReady::temporary_channel_id`] and in
+       /// [`ChannelDetails::channel_id`] until after
+       /// [`ChannelManager::funding_transaction_generated`] is called, swapping the Channel's ID for
+       /// one derived from the funding transaction's TXID. If the counterparty rejects the channel
+       /// immediately, this temporary ID will appear in [`Event::ChannelClosed::channel_id`].
+       ///
+       /// [`Event::FundingGenerationReady::user_channel_id`]: events::Event::FundingGenerationReady::user_channel_id
+       /// [`Event::FundingGenerationReady::temporary_channel_id`]: events::Event::FundingGenerationReady::temporary_channel_id
+       /// [`Event::ChannelClosed::channel_id`]: events::Event::ChannelClosed::channel_id
+       pub fn create_channel(&self, their_network_key: PublicKey, channel_value_satoshis: u64, push_msat: u64, user_channel_id: u64, override_config: Option<UserConfig>) -> Result<[u8; 32], APIError> {
                if channel_value_satoshis < 1000 {
                        return Err(APIError::APIMisuseError { err: format!("Channel value must be at least 1000 satoshis. It was {}", channel_value_satoshis) });
                }
@@ -1292,7 +1312,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        let peer_state = peer_state.lock().unwrap();
                                        let their_features = &peer_state.latest_features;
                                        let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration };
-                                       Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, their_features, channel_value_satoshis, push_msat, user_id, config)?
+                                       Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, their_features, channel_value_satoshis, push_msat, user_channel_id, config)?
                                },
                                None => return Err(APIError::ChannelUnavailable { err: format!("Not connected to node: {}", their_network_key) }),
                        }
@@ -1303,8 +1323,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                // We want to make sure the lock is actually acquired by PersistenceNotifierGuard.
                debug_assert!(&self.total_consistency_lock.try_write().is_err());
 
+               let temporary_channel_id = channel.channel_id();
                let mut channel_state = self.channel_state.lock().unwrap();
-               match channel_state.by_id.entry(channel.channel_id()) {
+               match channel_state.by_id.entry(temporary_channel_id) {
                        hash_map::Entry::Occupied(_) => {
                                if cfg!(feature = "fuzztarget") {
                                        return Err(APIError::APIMisuseError { err: "Fuzzy bad RNG".to_owned() });
@@ -1318,7 +1339,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        node_id: their_network_key,
                        msg: res,
                });
-               Ok(())
+               Ok(temporary_channel_id)
        }
 
        fn list_channels_with_filter<Fn: FnMut(&(&[u8; 32], &Channel<Signer>)) -> bool>(&self, f: Fn) -> Vec<ChannelDetails> {
@@ -1344,7 +1365,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        unspendable_punishment_reserve: to_self_reserve_satoshis,
                                        inbound_capacity_msat,
                                        outbound_capacity_msat,
-                                       user_id: channel.get_user_id(),
+                                       user_channel_id: channel.get_user_id(),
                                        confirmations_required: channel.minimum_depth(),
                                        force_close_spend_delay: channel.get_counterparty_selected_contest_delay(),
                                        is_outbound: channel.is_outbound(),
@@ -1382,6 +1403,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                self.list_channels_with_filter(|&(_, ref channel)| channel.is_live())
        }
 
+       /// Helper function that issues the channel close events
+       fn issue_channel_close_events(&self, channel: &Channel<Signer>, closure_reason: ClosureReason) {
+               let mut pending_events_lock = self.pending_events.lock().unwrap();
+               match channel.unbroadcasted_funding() {
+                       Some(transaction) => {
+                               pending_events_lock.push(events::Event::DiscardFunding { channel_id: channel.channel_id(), transaction })
+                       },
+                       None => {},
+               }
+               pending_events_lock.push(events::Event::ChannelClosed {
+                       channel_id: channel.channel_id(),
+                       user_channel_id: channel.get_user_id(),
+                       reason: closure_reason
+               });
+       }
+
        fn close_channel_internal(&self, channel_id: &[u8; 32], target_feerate_sats_per_1000_weight: Option<u32>) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
@@ -1408,7 +1445,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if let Some(monitor_update) = monitor_update {
                                                if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
                                                        let (result, is_permanent) =
-                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), Vec::new(), chan_entry.key());
                                                        if is_permanent {
                                                                remove_channel!(channel_state, chan_entry);
                                                                break result;
@@ -1428,12 +1465,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                msg: channel_update
                                                        });
                                                }
-                                               if let Ok(mut pending_events_lock) = self.pending_events.lock() {
-                                                       pending_events_lock.push(events::Event::ChannelClosed {
-                                                               channel_id: *channel_id,
-                                                               reason: ClosureReason::HolderForceClosed
-                                                       });
-                                               }
+                                               self.issue_channel_close_events(&channel, ClosureReason::HolderForceClosed);
                                        }
                                        break Ok(());
                                },
@@ -1524,13 +1556,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                if let Some(short_id) = chan.get().get_short_channel_id() {
                                        channel_state.short_to_id.remove(&short_id);
                                }
-                               let mut pending_events_lock = self.pending_events.lock().unwrap();
                                if peer_node_id.is_some() {
                                        if let Some(peer_msg) = peer_msg {
-                                               pending_events_lock.push(events::Event::ChannelClosed { channel_id: *channel_id, reason: ClosureReason::CounterpartyForceClosed { peer_msg: peer_msg.to_string() } });
+                                               self.issue_channel_close_events(chan.get(),ClosureReason::CounterpartyForceClosed { peer_msg: peer_msg.to_string() });
                                        }
                                } else {
-                                       pending_events_lock.push(events::Event::ChannelClosed { channel_id: *channel_id, reason: ClosureReason::HolderForceClosed });
+                                       self.issue_channel_close_events(chan.get(),ClosureReason::HolderForceClosed);
                                }
                                chan.remove_entry().1
                        } else {
@@ -1949,15 +1980,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
 
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
-               let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
-               let payment = pending_outbounds.entry(payment_id).or_insert_with(|| PendingOutboundPayment::Retryable {
-                       session_privs: HashSet::new(),
-                       pending_amt_msat: 0,
-                       payment_hash: *payment_hash,
-                       payment_secret: *payment_secret,
-                       total_msat: total_value,
-               });
-               assert!(payment.insert(session_priv_bytes, path.last().unwrap().fee_msat));
 
                let err: Result<(), _> = loop {
                        let mut channel_lock = self.channel_state.lock().unwrap();
@@ -1975,12 +1997,27 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if !chan.get().is_live() {
                                                return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!".to_owned()});
                                        }
-                                       break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
-                                               path: path.clone(),
-                                               session_priv: session_priv.clone(),
-                                               first_hop_htlc_msat: htlc_msat,
-                                               payment_id,
-                                       }, onion_packet, &self.logger), channel_state, chan)
+                                       let send_res = break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(
+                                               htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
+                                                       path: path.clone(),
+                                                       session_priv: session_priv.clone(),
+                                                       first_hop_htlc_msat: htlc_msat,
+                                                       payment_id,
+                                               }, onion_packet, &self.logger),
+                                       channel_state, chan);
+
+                                       let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
+                                       let payment = pending_outbounds.entry(payment_id).or_insert_with(|| PendingOutboundPayment::Retryable {
+                                               session_privs: HashSet::new(),
+                                               pending_amt_msat: 0,
+                                               payment_hash: *payment_hash,
+                                               payment_secret: *payment_secret,
+                                               starting_block_height: self.best_block.read().unwrap().height(),
+                                               total_msat: total_value,
+                                       });
+                                       assert!(payment.insert(session_priv_bytes, path.last().unwrap().fee_msat));
+
+                                       send_res
                                } {
                                        Some((update_add, commitment_signed, monitor_update)) => {
                                                if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
@@ -2129,6 +2166,54 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
        }
 
+       /// Retries a payment along the given [`Route`].
+       ///
+       /// Errors returned are a superset of those returned from [`send_payment`], so see
+       /// [`send_payment`] documentation for more details on errors. This method will also error if the
+       /// retry amount puts the payment more than 10% over the payment's total amount, or if the payment
+       /// for the given `payment_id` cannot be found (likely due to timeout or success).
+       ///
+       /// [`send_payment`]: [`ChannelManager::send_payment`]
+       pub fn retry_payment(&self, route: &Route, payment_id: PaymentId) -> Result<(), PaymentSendFailure> {
+               const RETRY_OVERFLOW_PERCENTAGE: u64 = 10;
+               for path in route.paths.iter() {
+                       if path.len() == 0 {
+                               return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
+                                       err: "length-0 path in route".to_string()
+                               }))
+                       }
+               }
+
+               let (total_msat, payment_hash, payment_secret) = {
+                       let outbounds = self.pending_outbound_payments.lock().unwrap();
+                       if let Some(payment) = outbounds.get(&payment_id) {
+                               match payment {
+                                       PendingOutboundPayment::Retryable {
+                                               total_msat, payment_hash, payment_secret, pending_amt_msat, ..
+                                       } => {
+                                               let retry_amt_msat: u64 = route.paths.iter().map(|path| path.last().unwrap().fee_msat).sum();
+                                               if retry_amt_msat + *pending_amt_msat > *total_msat * (100 + RETRY_OVERFLOW_PERCENTAGE) / 100 {
+                                                       return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
+                                                               err: format!("retry_amt_msat of {} will put pending_amt_msat (currently: {}) more than 10% over total_payment_amt_msat of {}", retry_amt_msat, pending_amt_msat, total_msat).to_string()
+                                                       }))
+                                               }
+                                               (*total_msat, *payment_hash, *payment_secret)
+                                       },
+                                       PendingOutboundPayment::Legacy { .. } => {
+                                               return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
+                                                       err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string()
+                                               }))
+                                       }
+                               }
+                       } else {
+                               return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
+                                       err: format!("Payment with ID {} not found", log_bytes!(payment_id.0)),
+                               }))
+                       }
+               };
+               return self.send_payment_internal(route, payment_hash, &payment_secret, None, Some(payment_id), Some(total_msat)).map(|_| ())
+       }
+
        /// Send a spontaneous payment, which is a payment that does not require the recipient to have
        /// generated an invoice. Optionally, you may specify the preimage. If you do choose to specify
        /// the preimage, it must be a cryptographically secure random value that no intermediate node
@@ -2166,7 +2251,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
                                        (chan.get_outbound_funding_created(funding_transaction, funding_txo, &self.logger)
                                                .map_err(|e| if let ChannelError::Close(msg) = e {
-                                                       MsgHandleErrInternal::from_finish_shutdown(msg, chan.channel_id(), chan.force_shutdown(true), None)
+                                                       MsgHandleErrInternal::from_finish_shutdown(msg, chan.channel_id(), chan.get_user_id(), chan.force_shutdown(true), None)
                                                } else { unreachable!(); })
                                        , chan)
                                },
@@ -2508,7 +2593,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                                                channel_state.short_to_id.remove(&short_id);
                                                                                        }
                                                                                        // ChannelClosed event is generated by handle_error for us.
-                                                                                       Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
+                                                                                       Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
                                                                                },
                                                                                ChannelError::CloseDelayBroadcast(_) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
                                                                        };
@@ -2765,7 +2850,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let ret_err = match res {
                        Ok(Some((update_fee, commitment_signed, monitor_update))) => {
                                if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
-                                       let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), chan_id);
+                                       let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), Vec::new(), chan_id);
                                        if drop { retain_channel = false; }
                                        res
                                } else {
@@ -2954,15 +3039,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                        network_update: None,
                                                                        all_paths_failed: payment.get().remaining_parts() == 0,
                                                                        path: path.clone(),
+                                                                       short_channel_id: None,
                                                                        #[cfg(test)]
                                                                        error_code: None,
                                                                        #[cfg(test)]
                                                                        error_data: None,
                                                                }
                                                        );
-                                                       if payment.get().remaining_parts() == 0 {
-                                                               payment.remove();
-                                                       }
                                                }
                                        } else {
                                                log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
@@ -3000,7 +3083,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        }
                                        if sessions.get().remaining_parts() == 0 {
                                                all_paths_failed = true;
-                                               sessions.remove();
                                        }
                                } else {
                                        log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
@@ -3011,9 +3093,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                match &onion_error {
                                        &HTLCFailReason::LightningError { ref err } => {
 #[cfg(test)]
-                                               let (network_update, payment_retryable, onion_error_code, onion_error_data) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
+                                               let (network_update, short_channel_id, payment_retryable, onion_error_code, onion_error_data) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
 #[cfg(not(test))]
-                                               let (network_update, payment_retryable, _, _) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
+                                               let (network_update, short_channel_id, payment_retryable, _, _) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
                                                // TODO: If we decided to blame ourselves (or one of our channels) in
                                                // process_onion_failure we should close that channel as it implies our
                                                // next-hop is needlessly blaming us!
@@ -3024,6 +3106,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                network_update,
                                                                all_paths_failed,
                                                                path: path.clone(),
+                                                               short_channel_id,
 #[cfg(test)]
                                                                error_code: onion_error_code,
 #[cfg(test)]
@@ -3051,6 +3134,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                network_update: None,
                                                                all_paths_failed,
                                                                path: path.clone(),
+                                                               short_channel_id: Some(path.first().unwrap().short_channel_id),
 #[cfg(test)]
                                                                error_code: Some(*failure_code),
 #[cfg(test)]
@@ -3256,8 +3340,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        sessions.remove(&session_priv_bytes, path.last().unwrap().fee_msat)
                                } else { false };
                                if found_payment {
+                                       let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
                                        self.pending_events.lock().unwrap().push(
-                                               events::Event::PaymentSent { payment_preimage }
+                                               events::Event::PaymentSent {
+                                                       payment_preimage,
+                                                       payment_hash: payment_hash
+                                               }
                                        );
                                } else {
                                        log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0));
@@ -3320,27 +3408,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                self.our_network_pubkey.clone()
        }
 
-       /// Restores a single, given channel to normal operation after a
-       /// ChannelMonitorUpdateErr::TemporaryFailure was returned from a channel monitor update
-       /// operation.
-       ///
-       /// All ChannelMonitor updates up to and including highest_applied_update_id must have been
-       /// fully committed in every copy of the given channels' ChannelMonitors.
-       ///
-       /// Note that there is no effect to calling with a highest_applied_update_id other than the
-       /// current latest ChannelMonitorUpdate and one call to this function after multiple
-       /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field
-       /// exists largely only to prevent races between this and concurrent update_monitor calls.
-       ///
-       /// Thus, the anticipated use is, at a high level:
-       ///  1) You register a chain::Watch with this ChannelManager,
-       ///  2) it stores each update to disk, and begins updating any remote (eg watchtower) copies of
-       ///     said ChannelMonitors as it can, returning ChannelMonitorUpdateErr::TemporaryFailures
-       ///     any time it cannot do so instantly,
-       ///  3) update(s) are applied to each remote copy of a ChannelMonitor,
-       ///  4) once all remote copies are updated, you call this function with the update_id that
-       ///     completed, and once it is the latest the Channel will be re-enabled.
-       pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
+       fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                let chan_restoration_res;
@@ -3355,8 +3423,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                return;
                        }
 
-                       let (raa, commitment_update, order, pending_forwards, pending_failures, funding_broadcastable, funding_locked) = channel.get_mut().monitor_updating_restored(&self.logger);
-                       let channel_update = if funding_locked.is_some() && channel.get().is_usable() && !channel.get().should_announce() {
+                       let updates = channel.get_mut().monitor_updating_restored(&self.logger);
+                       let channel_update = if updates.funding_locked.is_some() && channel.get().is_usable() && !channel.get().should_announce() {
                                // We only send a channel_update in the case where we are just now sending a
                                // funding_locked and the channel is in a usable state. Further, we rely on the
                                // normal announcement_signatures process to send a channel_update for public
@@ -3366,11 +3434,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        msg: self.get_channel_update_for_unicast(channel.get()).unwrap(),
                                })
                        } else { None };
-                       chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, raa, commitment_update, order, None, pending_forwards, funding_broadcastable, funding_locked);
+                       // TODO: Handle updates.finalized_claimed_htlcs!
+                       chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, updates.raa, updates.commitment_update, updates.order, None, updates.accepted_htlcs, updates.funding_broadcastable, updates.funding_locked);
                        if let Some(upd) = channel_update {
                                channel_state.pending_msg_events.push(upd);
                        }
-                       pending_failures
+                       updates.failed_htlcs
                };
                post_handle_chan_restoration!(self, chan_restoration_res);
                for failure in pending_failures.drain(..) {
@@ -3461,7 +3530,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
                                        // accepted payment from yet. We do, however, need to wait to send our funding_locked
                                        // until we have persisted our monitor.
-                                       chan.monitor_update_failed(false, false, Vec::new(), Vec::new());
+                                       chan.monitor_update_failed(false, false, Vec::new(), Vec::new(), Vec::new());
                                },
                        }
                }
@@ -3497,7 +3566,16 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                Err(e) => try_chan_entry!(self, Err(e), channel_state, chan),
                                        };
                                        if let Err(e) = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) {
-                                               return_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, false, false);
+                                               let mut res = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, false, false);
+                                               if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
+                                                       // We weren't able to watch the channel to begin with, so no updates should be made on
+                                                       // it. Previously, full_stack_target found an (unreachable) panic when the
+                                                       // monitor update contained within `shutdown_finish` was applied.
+                                                       if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
+                                                               shutdown_finish.0.take();
+                                                       }
+                                               }
+                                               return res
                                        }
                                        funding_tx
                                },
@@ -3570,7 +3648,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if let Some(monitor_update) = monitor_update {
                                                if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
                                                        let (result, is_permanent) =
-                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), Vec::new(), chan_entry.key());
                                                        if is_permanent {
                                                                remove_channel!(channel_state, chan_entry);
                                                                break result;
@@ -3640,7 +3718,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        msg: update
                                });
                        }
-                       self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id: msg.channel_id,  reason: ClosureReason::CooperativeClosure });
+                       self.issue_channel_close_events(&chan, ClosureReason::CooperativeClosure);
                }
                Ok(())
        }
@@ -3857,26 +3935,36 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                break Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
                                        let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
-                                       let (commitment_update, pending_forwards, pending_failures, monitor_update, htlcs_to_fail_in) =
-                                               break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan);
-                                       htlcs_to_fail = htlcs_to_fail_in;
-                                       if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
+                                       let raa_updates = break_chan_entry!(self,
+                                               chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan);
+                                       htlcs_to_fail = raa_updates.holding_cell_failed_htlcs;
+                                       if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), raa_updates.monitor_update) {
                                                if was_frozen_for_monitor {
-                                                       assert!(commitment_update.is_none() && pending_forwards.is_empty() && pending_failures.is_empty());
+                                                       assert!(raa_updates.commitment_update.is_none());
+                                                       assert!(raa_updates.accepted_htlcs.is_empty());
+                                                       assert!(raa_updates.failed_htlcs.is_empty());
+                                                       assert!(raa_updates.finalized_claimed_htlcs.is_empty());
                                                        break Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned()));
                                                } else {
-                                                       if let Err(e) = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, commitment_update.is_some(), pending_forwards, pending_failures) {
+                                                       if let Err(e) = handle_monitor_err!(self, e, channel_state, chan,
+                                                                       RAACommitmentOrder::CommitmentFirst, false,
+                                                                       raa_updates.commitment_update.is_some(),
+                                                                       raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
+                                                                       raa_updates.finalized_claimed_htlcs) {
                                                                break Err(e);
                                                        } else { unreachable!(); }
                                                }
                                        }
-                                       if let Some(updates) = commitment_update {
+                                       if let Some(updates) = raa_updates.commitment_update {
                                                channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
                                                        node_id: counterparty_node_id.clone(),
                                                        updates,
                                                });
                                        }
-                                       break Ok((pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"), chan.get().get_funding_txo().unwrap()))
+                                       break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
+                                                       chan.get().get_short_channel_id()
+                                                               .expect("RAA should only work on a short-id-available channel"),
+                                                       chan.get().get_funding_txo().unwrap()))
                                },
                                hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                        }
@@ -4036,7 +4124,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                                        }
                                },
-                               MonitorEvent::CommitmentTxConfirmed(funding_outpoint) => {
+                               MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
+                               MonitorEvent::UpdateFailed(funding_outpoint) => {
                                        let mut channel_lock = self.channel_state.lock().unwrap();
                                        let channel_state = &mut *channel_lock;
                                        let by_id = &mut channel_state.by_id;
@@ -4052,7 +4141,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                msg: update
                                                        });
                                                }
-                                               self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id: chan.channel_id(),  reason: ClosureReason::CommitmentTxConfirmed });
+                                               let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
+                                                       ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
+                                               } else {
+                                                       ClosureReason::CommitmentTxConfirmed
+                                               };
+                                               self.issue_channel_close_events(&chan, reason);
                                                pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                        node_id: chan.get_counterparty_node_id(),
                                                        action: msgs::ErrorAction::SendErrorMessage {
@@ -4061,6 +4155,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                });
                                        }
                                },
+                               MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
+                                       self.channel_monitor_updated(&funding_txo, monitor_update_id);
+                               },
                        }
                }
 
@@ -4071,6 +4168,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                has_pending_monitor_events
        }
 
+       /// In chanmon_consistency_target, we'd like to be able to restore monitor updating without
+       /// handling all pending events (i.e. not PendingHTLCsForwardable). Thus, we expose monitor
+       /// update events as a separate process method here.
+       #[cfg(feature = "fuzztarget")]
+       pub fn process_monitor_events(&self) {
+               self.process_pending_monitor_events();
+       }
+
        /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
        /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
        /// update was applied.
@@ -4099,7 +4204,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                if let Some((commitment_update, monitor_update)) = commitment_opt {
                                                        if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
                                                                has_monitor_update = true;
-                                                               let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id);
+                                                               let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), Vec::new(), channel_id);
                                                                handle_errors.push((chan.get_counterparty_node_id(), res));
                                                                if close_channel { return false; }
                                                        } else {
@@ -4168,12 +4273,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                });
                                                        }
 
-                                                       if let Ok(mut pending_events_lock) = self.pending_events.lock() {
-                                                               pending_events_lock.push(events::Event::ChannelClosed {
-                                                                       channel_id: *channel_id,
-                                                                       reason: ClosureReason::CooperativeClosure
-                                                               });
-                                                       }
+                                                       self.issue_channel_close_events(chan, ClosureReason::CooperativeClosure);
 
                                                        log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
                                                        self.tx_broadcaster.broadcast_transaction(&tx);
@@ -4327,6 +4427,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                self.process_pending_events(&event_handler);
                events.into_inner()
        }
+
+       #[cfg(test)]
+       pub fn has_pending_payments(&self) -> bool {
+               !self.pending_outbound_payments.lock().unwrap().is_empty()
+       }
 }
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<Signer, M, T, K, F, L>
@@ -4502,6 +4607,16 @@ where
                payment_secrets.retain(|_, inbound_payment| {
                        inbound_payment.expiry_time > header.time as u64
                });
+
+               let mut outbounds = self.pending_outbound_payments.lock().unwrap();
+               outbounds.retain(|_, payment| {
+                       const PAYMENT_EXPIRY_BLOCKS: u32 = 3;
+                       if payment.remaining_parts() != 0 { return true }
+                       if let PendingOutboundPayment::Retryable { starting_block_height, .. } = payment {
+                               return *starting_block_height + PAYMENT_EXPIRY_BLOCKS > height
+                       }
+                       true
+               });
        }
 
        fn get_relevant_txids(&self) -> Vec<Txid> {
@@ -4595,7 +4710,7 @@ where
                                                        msg: update
                                                });
                                        }
-                                       self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id: channel.channel_id(),  reason: ClosureReason::CommitmentTxConfirmed });
+                                       self.issue_channel_close_events(channel, ClosureReason::CommitmentTxConfirmed);
                                        pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                node_id: channel.get_counterparty_node_id(),
                                                action: msgs::ErrorAction::SendErrorMessage { msg: e },
@@ -4786,7 +4901,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                                                                msg: update
                                                        });
                                                }
-                                               self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id: chan.channel_id(),  reason: ClosureReason::DisconnectedPeer });
+                                               self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer);
                                                false
                                        } else {
                                                true
@@ -4801,7 +4916,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                                                        if let Some(short_id) = chan.get_short_channel_id() {
                                                                short_to_id.remove(&short_id);
                                                        }
-                                                       self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id: chan.channel_id(),  reason: ClosureReason::DisconnectedPeer });
+                                                       self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer);
                                                        return false;
                                                } else {
                                                        no_channels_remain = false;
@@ -5233,6 +5348,7 @@ impl_writeable_tlv_based_enum!(PendingOutboundPayment,
                (4, payment_secret, option),
                (6, total_msat, required),
                (8, pending_amt_msat, required),
+               (10, starting_block_height, required),
        },
 ;);
 
@@ -5364,20 +5480,25 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
 ///
 /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation
 /// is:
-/// 1) Deserialize all stored ChannelMonitors.
-/// 2) Deserialize the ChannelManager by filling in this struct and calling:
-///    <(BlockHash, ChannelManager)>::read(reader, args)
-///    This may result in closing some Channels if the ChannelMonitor is newer than the stored
-///    ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted.
-/// 3) If you are not fetching full blocks, register all relevant ChannelMonitor outpoints the same
-///    way you would handle a `chain::Filter` call using ChannelMonitor::get_outputs_to_watch() and
-///    ChannelMonitor::get_funding_txo().
-/// 4) Reconnect blocks on your ChannelMonitors.
-/// 5) Disconnect/connect blocks on the ChannelManager.
-/// 6) Move the ChannelMonitors into your local chain::Watch.
+/// 1) Deserialize all stored [`ChannelMonitor`]s.
+/// 2) Deserialize the [`ChannelManager`] by filling in this struct and calling:
+///    `<(BlockHash, ChannelManager)>::read(reader, args)`
+///    This may result in closing some channels if the [`ChannelMonitor`] is newer than the stored
+///    [`ChannelManager`] state to ensure no loss of funds. Thus, transactions may be broadcasted.
+/// 3) If you are not fetching full blocks, register all relevant [`ChannelMonitor`] outpoints the
+///    same way you would handle a [`chain::Filter`] call using
+///    [`ChannelMonitor::get_outputs_to_watch`] and [`ChannelMonitor::get_funding_txo`].
+/// 4) Reconnect blocks on your [`ChannelMonitor`]s.
+/// 5) Disconnect/connect blocks on the [`ChannelManager`].
+/// 6) Re-persist the [`ChannelMonitor`]s to ensure the latest state is on disk.
+///    Note that if you're using a [`ChainMonitor`] for your [`chain::Watch`] implementation, you
+///    will likely accomplish this as a side-effect of calling [`chain::Watch::watch_channel`] in
+///    the next step.
+/// 7) Move the [`ChannelMonitor`]s into your local [`chain::Watch`]. If you're using a
+///    [`ChainMonitor`], this is done by calling [`chain::Watch::watch_channel`].
 ///
-/// Note that the ordering of #4-6 is not of importance, however all three must occur before you
-/// call any other methods on the newly-deserialized ChannelManager.
+/// Note that the ordering of #4-7 is not of importance, however all four must occur before you
+/// call any other methods on the newly-deserialized [`ChannelManager`].
 ///
 /// Note that because some channels may be closed during deserialization, it is critical that you
 /// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to
@@ -5385,6 +5506,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
 /// broadcast), and then later deserialize a newer version of the same ChannelManager (which will
 /// not force-close the same channels but consider them live), you may end up revoking a state for
 /// which you've already broadcasted the transaction.
+///
+/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
 pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
        where M::Target: chain::Watch<Signer>,
         T::Target: BroadcasterInterface,
@@ -5524,6 +5647,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                        monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
                                        channel_closures.push(events::Event::ChannelClosed {
                                                channel_id: channel.channel_id(),
+                                               user_channel_id: channel.get_user_id(),
                                                reason: ClosureReason::OutdatedChannelManager
                                        });
                                } else {
@@ -5591,6 +5715,16 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                None => continue,
                        }
                }
+               if forward_htlcs_count > 0 {
+                       // If we have pending HTLCs to forward, assume we either dropped a
+                       // `PendingHTLCsForwardable` or the user received it but never processed it as they
+                       // shut down before the timer hit. Either way, set the time_forwardable to a small
+                       // constant as enough time has likely passed that we should simply handle the forwards
+                       // now, or at least after the user gets a chance to reconnect to our peers.
+                       pending_events_read.push(events::Event::PendingHTLCsForwardable {
+                               time_forwardable: Duration::from_secs(2),
+                       });
+               }
 
                let background_event_count: u64 = Readable::read(reader)?;
                let mut pending_background_events_read: Vec<BackgroundEvent> = Vec::with_capacity(cmp::min(background_event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<BackgroundEvent>()));
@@ -5709,6 +5843,7 @@ mod tests {
        use ln::msgs;
        use ln::msgs::ChannelMessageHandler;
        use routing::router::{get_keysend_route, get_route};
+       use routing::scorer::Scorer;
        use util::errors::APIError;
        use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
        use util::test_utils;
@@ -5848,12 +5983,9 @@ mod tests {
                let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
                let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
                create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
-               let logger = test_utils::TestLogger::new();
 
                // First, send a partial MPP payment.
-               let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
-               let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph, &nodes[1].node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
-               let (payment_preimage, our_payment_hash, payment_secret) = get_payment_preimage_hash!(&nodes[1]);
+               let (route, our_payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(&nodes[0], nodes[1], 100_000);
                let payment_id = PaymentId([42; 32]);
                // Use the utility function send_payment_along_path to send the payment with MPP data which
                // indicates there are more HTLCs coming.
@@ -5930,8 +6062,9 @@ mod tests {
                // further events will be generated for subsequence path successes.
                let events = nodes[0].node.get_and_clear_pending_events();
                match events[0] {
-                       Event::PaymentSent { payment_preimage: ref preimage } => {
+                       Event::PaymentSent { payment_preimage: ref preimage, payment_hash: ref hash } => {
                                assert_eq!(payment_preimage, *preimage);
+                               assert_eq!(our_payment_hash, *hash);
                        },
                        _ => panic!("Unexpected event"),
                }
@@ -5949,13 +6082,14 @@ mod tests {
                let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
                create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
                let logger = test_utils::TestLogger::new();
+               let scorer = Scorer::new(0);
 
                // To start (1), send a regular payment but don't claim it.
                let expected_route = [&nodes[1]];
                let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &expected_route, 100_000);
 
                // Next, attempt a keysend payment and make sure it fails.
-               let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph, &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
+               let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph, &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger, &scorer).unwrap();
                nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
@@ -5983,7 +6117,7 @@ mod tests {
 
                // To start (2), send a keysend payment but don't claim it.
                let payment_preimage = PaymentPreimage([42; 32]);
-               let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph, &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
+               let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph, &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger, &scorer).unwrap();
                let (payment_hash, _) = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
@@ -6037,9 +6171,10 @@ mod tests {
                let _chan = create_chan_between_nodes(&nodes[0], &nodes[1], InitFeatures::known(), InitFeatures::known());
                let network_graph = &nodes[0].net_graph_msg_handler.network_graph;
                let first_hops = nodes[0].node.list_usable_channels();
+               let scorer = Scorer::new(0);
                let route = get_keysend_route(&payer_pubkey, network_graph, &payee_pubkey,
                                   Some(&first_hops.iter().collect::<Vec<_>>()), &vec![], 10000, 40,
-                                  nodes[0].logger).unwrap();
+                                  nodes[0].logger, &scorer).unwrap();
 
                let test_preimage = PaymentPreimage([42; 32]);
                let mismatch_payment_hash = PaymentHash([43; 32]);
@@ -6073,9 +6208,10 @@ mod tests {
                let _chan = create_chan_between_nodes(&nodes[0], &nodes[1], InitFeatures::known(), InitFeatures::known());
                let network_graph = &nodes[0].net_graph_msg_handler.network_graph;
                let first_hops = nodes[0].node.list_usable_channels();
+               let scorer = Scorer::new(0);
                let route = get_keysend_route(&payer_pubkey, network_graph, &payee_pubkey,
                                   Some(&first_hops.iter().collect::<Vec<_>>()), &vec![], 10000, 40,
-                                  nodes[0].logger).unwrap();
+                                  nodes[0].logger, &scorer).unwrap();
 
                let test_preimage = PaymentPreimage([42; 32]);
                let test_secret = PaymentSecret([43; 32]);
@@ -6105,12 +6241,9 @@ mod tests {
                let chan_2_id = create_announced_chan_between_nodes(&nodes, 0, 2, InitFeatures::known(), InitFeatures::known()).0.contents.short_channel_id;
                let chan_3_id = create_announced_chan_between_nodes(&nodes, 1, 3, InitFeatures::known(), InitFeatures::known()).0.contents.short_channel_id;
                let chan_4_id = create_announced_chan_between_nodes(&nodes, 2, 3, InitFeatures::known(), InitFeatures::known()).0.contents.short_channel_id;
-               let logger = test_utils::TestLogger::new();
 
                // Marshall an MPP route.
-               let (_, payment_hash, _) = get_payment_preimage_hash!(&nodes[3]);
-               let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
-               let mut route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph, &nodes[3].node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &[], 100000, TEST_FINAL_CLTV, &logger).unwrap();
+               let (mut route, payment_hash, _, _) = get_route_and_payment_hash!(&nodes[0], nodes[3], 100000);
                let path = route.paths[0].clone();
                route.paths.push(path);
                route.paths[0][0].pubkey = nodes[1].node.get_our_node_id();
@@ -6131,8 +6264,7 @@ mod tests {
 #[cfg(all(any(test, feature = "_test_utils"), feature = "unstable"))]
 pub mod bench {
        use chain::Listen;
-       use chain::chainmonitor::ChainMonitor;
-       use chain::channelmonitor::Persist;
+       use chain::chainmonitor::{ChainMonitor, Persist};
        use chain::keysinterface::{KeysManager, InMemorySigner};
        use ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage};
        use ln::features::{InitFeatures, InvoiceFeatures};
@@ -6140,6 +6272,7 @@ pub mod bench {
        use ln::msgs::{ChannelMessageHandler, Init};
        use routing::network_graph::NetworkGraph;
        use routing::router::get_route;
+       use routing::scorer::Scorer;
        use util::test_utils;
        use util::config::UserConfig;
        use util::events::{Event, MessageSendEvent, MessageSendEventsProvider, PaymentPurpose};
@@ -6247,8 +6380,9 @@ pub mod bench {
                macro_rules! send_payment {
                        ($node_a: expr, $node_b: expr) => {
                                let usable_channels = $node_a.list_usable_channels();
+                               let scorer = Scorer::new(0);
                                let route = get_route(&$node_a.get_our_node_id(), &dummy_graph, &$node_b.get_our_node_id(), Some(InvoiceFeatures::known()),
-                                       Some(&usable_channels.iter().map(|r| r).collect::<Vec<_>>()), &[], 10_000, TEST_FINAL_CLTV, &logger_a).unwrap();
+                                       Some(&usable_channels.iter().map(|r| r).collect::<Vec<_>>()), &[], 10_000, TEST_FINAL_CLTV, &logger_a, &scorer).unwrap();
 
                                let mut payment_preimage = PaymentPreimage([0; 32]);
                                payment_preimage.0[0..8].copy_from_slice(&payment_count.to_le_bytes());