Merge pull request #1115 from TheBlueMatt/2021-10-expose-addr-vec
[rust-lightning] / lightning / src / ln / channelmanager.rs
index b772c5e9ac75789f0876e2664bb87fdf2c8129a4..298f88ce0cc018162cf5f2a3201512641b1cd8fd 100644 (file)
@@ -43,7 +43,6 @@ use chain::transaction::{OutPoint, TransactionData};
 // Since this struct is returned in `list_channels` methods, expose it here in case users want to
 // construct one themselves.
 use ln::{PaymentHash, PaymentPreimage, PaymentSecret};
-pub use ln::channel::CounterpartyForwardingInfo;
 use ln::channel::{Channel, ChannelError, ChannelUpdateStatus, UpdateFulfillCommitFetch};
 use ln::features::{InitFeatures, NodeFeatures};
 use routing::router::{Route, RouteHop};
@@ -53,9 +52,9 @@ use ln::onion_utils;
 use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField};
 use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner};
 use util::config::UserConfig;
-use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
+use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason};
 use util::{byte_utils, events};
-use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
+use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer};
 use util::chacha20::{ChaCha20, ChaChaReader};
 use util::logger::{Logger, Level};
 use util::errors::APIError;
@@ -173,6 +172,22 @@ struct ClaimableHTLC {
        onion_payload: OnionPayload,
 }
 
+/// A payment identifier used to uniquely identify a payment to LDK.
+#[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)]
+pub struct PaymentId(pub [u8; 32]);
+
+impl Writeable for PaymentId {
+       fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
+               self.0.write(w)
+       }
+}
+
+impl Readable for PaymentId {
+       fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
+               let buf: [u8; 32] = Readable::read(r)?;
+               Ok(PaymentId(buf))
+       }
+}
 /// Tracks the inbound corresponding to an outbound HTLC
 #[derive(Clone, PartialEq)]
 pub(crate) enum HTLCSource {
@@ -183,6 +198,7 @@ pub(crate) enum HTLCSource {
                /// Technically we can recalculate this from the route, but we cache it here to avoid
                /// doing a double-pass on route when we get a failure back
                first_hop_htlc_msat: u64,
+               payment_id: PaymentId,
        },
 }
 #[cfg(test)]
@@ -192,6 +208,7 @@ impl HTLCSource {
                        path: Vec::new(),
                        session_priv: SecretKey::from_slice(&[1; 32]).unwrap(),
                        first_hop_htlc_msat: 0,
+                       payment_id: PaymentId([2; 32]),
                }
        }
 }
@@ -225,6 +242,7 @@ type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource
 
 struct MsgHandleErrInternal {
        err: msgs::LightningError,
+       chan_id: Option<[u8; 32]>, // If Some a channel of ours has been closed
        shutdown_finish: Option<(ShutdownResult, Option<msgs::ChannelUpdate>)>,
 }
 impl MsgHandleErrInternal {
@@ -240,6 +258,7 @@ impl MsgHandleErrInternal {
                                        },
                                },
                        },
+                       chan_id: None,
                        shutdown_finish: None,
                }
        }
@@ -250,12 +269,13 @@ impl MsgHandleErrInternal {
                                err,
                                action: msgs::ErrorAction::IgnoreError,
                        },
+                       chan_id: None,
                        shutdown_finish: None,
                }
        }
        #[inline]
        fn from_no_close(err: msgs::LightningError) -> Self {
-               Self { err, shutdown_finish: None }
+               Self { err, chan_id: None, shutdown_finish: None }
        }
        #[inline]
        fn from_finish_shutdown(err: String, channel_id: [u8; 32], shutdown_res: ShutdownResult, channel_update: Option<msgs::ChannelUpdate>) -> Self {
@@ -269,6 +289,7 @@ impl MsgHandleErrInternal {
                                        },
                                },
                        },
+                       chan_id: Some(channel_id),
                        shutdown_finish: Some((shutdown_res, channel_update)),
                }
        }
@@ -303,6 +324,7 @@ impl MsgHandleErrInternal {
                                        },
                                },
                        },
+                       chan_id: None,
                        shutdown_finish: None,
                }
        }
@@ -378,6 +400,65 @@ struct PendingInboundPayment {
        min_value_msat: Option<u64>,
 }
 
+/// Stores the session_priv for each part of a payment that is still pending. For versions 0.0.102
+/// and later, also stores information for retrying the payment.
+pub(crate) enum PendingOutboundPayment {
+       Legacy {
+               session_privs: HashSet<[u8; 32]>,
+       },
+       Retryable {
+               session_privs: HashSet<[u8; 32]>,
+               payment_hash: PaymentHash,
+               payment_secret: Option<PaymentSecret>,
+               pending_amt_msat: u64,
+               /// The total payment amount across all paths, used to verify that a retry is not overpaying.
+               total_msat: u64,
+               /// Our best known block height at the time this payment was initiated.
+               starting_block_height: u32,
+       },
+}
+
+impl PendingOutboundPayment {
+       fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: u64) -> bool {
+               let remove_res = match self {
+                       PendingOutboundPayment::Legacy { session_privs } |
+                       PendingOutboundPayment::Retryable { session_privs, .. } => {
+                               session_privs.remove(session_priv)
+                       }
+               };
+               if remove_res {
+                       if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self {
+                               *pending_amt_msat -= part_amt_msat;
+                       }
+               }
+               remove_res
+       }
+
+       fn insert(&mut self, session_priv: [u8; 32], part_amt_msat: u64) -> bool {
+               let insert_res = match self {
+                       PendingOutboundPayment::Legacy { session_privs } |
+                       PendingOutboundPayment::Retryable { session_privs, .. } => {
+                               session_privs.insert(session_priv)
+                       }
+               };
+               if insert_res {
+                       if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self {
+                               *pending_amt_msat += part_amt_msat;
+                       }
+               }
+               insert_res
+       }
+
+       fn remaining_parts(&self) -> usize {
+               match self {
+                       PendingOutboundPayment::Legacy { session_privs } |
+                       PendingOutboundPayment::Retryable { session_privs, .. } => {
+                               session_privs.len()
+                       }
+               }
+       }
+}
+
 /// SimpleArcChannelManager is useful when you need a ChannelManager with a static lifetime, e.g.
 /// when you're using lightning-net-tokio (since tokio::spawn requires parameters with static
 /// lifetimes). Other times you can afford a reference, which is more efficient, in which case
@@ -464,17 +545,19 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
        /// Locked *after* channel_state.
        pending_inbound_payments: Mutex<HashMap<PaymentHash, PendingInboundPayment>>,
 
-       /// The session_priv bytes of outbound payments which are pending resolution.
+       /// The session_priv bytes and retry metadata of outbound payments which are pending resolution.
        /// The authoritative state of these HTLCs resides either within Channels or ChannelMonitors
        /// (if the channel has been force-closed), however we track them here to prevent duplicative
-       /// PaymentSent/PaymentFailed events. Specifically, in the case of a duplicative
+       /// PaymentSent/PaymentPathFailed events. Specifically, in the case of a duplicative
        /// update_fulfill_htlc message after a reconnect, we may "claim" a payment twice.
        /// Additionally, because ChannelMonitors are often not re-serialized after connecting block(s)
        /// which may generate a claim event, we may receive similar duplicate claim/fail MonitorEvents
        /// after reloading from disk while replaying blocks against ChannelMonitors.
        ///
+       /// See `PendingOutboundPayment` documentation for more info.
+       ///
        /// Locked *after* channel_state.
-       pending_outbound_payments: Mutex<HashSet<[u8; 32]>>,
+       pending_outbound_payments: Mutex<HashMap<PaymentId, PendingOutboundPayment>>,
 
        our_network_key: SecretKey,
        our_network_pubkey: PublicKey,
@@ -626,6 +709,19 @@ const CHECK_CLTV_EXPIRY_SANITY: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_GRA
 #[allow(dead_code)]
 const CHECK_CLTV_EXPIRY_SANITY_2: u32 = MIN_CLTV_EXPIRY_DELTA as u32 - LATENCY_GRACE_PERIOD_BLOCKS - 2*CLTV_CLAIM_BUFFER;
 
+/// Information needed for constructing an invoice route hint for this channel.
+#[derive(Clone, Debug, PartialEq)]
+pub struct CounterpartyForwardingInfo {
+       /// Base routing fee in millisatoshis.
+       pub fee_base_msat: u32,
+       /// Amount in millionths of a satoshi the channel will charge per transferred satoshi.
+       pub fee_proportional_millionths: u32,
+       /// The minimum difference in cltv_expiry between an ingoing HTLC and its outgoing counterpart,
+       /// such that the outgoing HTLC is forwardable to this counterparty. See `msgs::ChannelUpdate`'s
+       /// `cltv_expiry_delta` for more details.
+       pub cltv_expiry_delta: u16,
+}
+
 /// Channel parameters which apply to our counterparty. These are split out from [`ChannelDetails`]
 /// to better separate parameters.
 #[derive(Clone, Debug, PartialEq)]
@@ -780,12 +876,13 @@ macro_rules! handle_error {
        ($self: ident, $internal: expr, $counterparty_node_id: expr) => {
                match $internal {
                        Ok(msg) => Ok(msg),
-                       Err(MsgHandleErrInternal { err, shutdown_finish }) => {
+                       Err(MsgHandleErrInternal { err, chan_id, shutdown_finish }) => {
                                #[cfg(debug_assertions)]
                                {
                                        // In testing, ensure there are no deadlocks where the lock is already held upon
                                        // entering the macro.
                                        assert!($self.channel_state.try_lock().is_ok());
+                                       assert!($self.pending_events.try_lock().is_ok());
                                }
 
                                let mut msg_events = Vec::with_capacity(2);
@@ -797,6 +894,9 @@ macro_rules! handle_error {
                                                        msg: update
                                                });
                                        }
+                                       if let Some(channel_id) = chan_id {
+                                               $self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { channel_id,  reason: ClosureReason::ProcessingError { err: err.err.clone() } });
+                                       }
                                }
 
                                log_error!($self.logger, "{}", err.err);
@@ -1138,7 +1238,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                pending_msg_events: Vec::new(),
                        }),
                        pending_inbound_payments: Mutex::new(HashMap::new()),
-                       pending_outbound_payments: Mutex::new(HashSet::new()),
+                       pending_outbound_payments: Mutex::new(HashMap::new()),
 
                        our_network_key: keys_manager.get_node_secret(),
                        our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()),
@@ -1284,6 +1384,18 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                self.list_channels_with_filter(|&(_, ref channel)| channel.is_live())
        }
 
+       /// Helper function that issues the channel close events
+       fn issue_channel_close_events(&self, channel: &Channel<Signer>, closure_reason: ClosureReason) {
+               let mut pending_events_lock = self.pending_events.lock().unwrap();
+               match channel.unbroadcasted_funding() {
+                       Some(transaction) => {
+                               pending_events_lock.push(events::Event::DiscardFunding { channel_id: channel.channel_id(), transaction })
+                       },
+                       None => {},
+               }
+               pending_events_lock.push(events::Event::ChannelClosed { channel_id: channel.channel_id(), reason: closure_reason });
+       }
+
        fn close_channel_internal(&self, channel_id: &[u8; 32], target_feerate_sats_per_1000_weight: Option<u32>) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
@@ -1330,6 +1442,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                msg: channel_update
                                                        });
                                                }
+                                               self.issue_channel_close_events(&channel, ClosureReason::HolderForceClosed);
                                        }
                                        break Ok(());
                                },
@@ -1405,7 +1518,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
        }
 
-       fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: Option<&PublicKey>) -> Result<PublicKey, APIError> {
+       /// `peer_node_id` should be set when we receive a message from a peer, but not set when the
+       /// user closes, which will be re-exposed as the `ChannelClosed` reason.
+       fn force_close_channel_with_peer(&self, channel_id: &[u8; 32], peer_node_id: Option<&PublicKey>, peer_msg: Option<&String>) -> Result<PublicKey, APIError> {
                let mut chan = {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
@@ -1418,6 +1533,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                if let Some(short_id) = chan.get().get_short_channel_id() {
                                        channel_state.short_to_id.remove(&short_id);
                                }
+                               if peer_node_id.is_some() {
+                                       if let Some(peer_msg) = peer_msg {
+                                               self.issue_channel_close_events(chan.get(),ClosureReason::CounterpartyForceClosed { peer_msg: peer_msg.to_string() });
+                                       }
+                               } else {
+                                       self.issue_channel_close_events(chan.get(),ClosureReason::HolderForceClosed);
+                               }
                                chan.remove_entry().1
                        } else {
                                return Err(APIError::ChannelUnavailable{err: "No such channel".to_owned()});
@@ -1439,7 +1561,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// the chain and rejecting new HTLCs on the given channel. Fails if channel_id is unknown to the manager.
        pub fn force_close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
-               match self.force_close_channel_with_peer(channel_id, None) {
+               match self.force_close_channel_with_peer(channel_id, None, None) {
                        Ok(counterparty_node_id) => {
                                self.channel_state.lock().unwrap().pending_msg_events.push(
                                        events::MessageSendEvent::HandleError {
@@ -1820,7 +1942,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        }
 
        // Only public for testing, this should otherwise never be called direcly
-       pub(crate) fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, keysend_preimage: &Option<PaymentPreimage>) -> Result<(), APIError> {
+       pub(crate) fn send_payment_along_path(&self, path: &Vec<RouteHop>, payment_hash: &PaymentHash, payment_secret: &Option<PaymentSecret>, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option<PaymentPreimage>) -> Result<(), APIError> {
                log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id);
                let prng_seed = self.keys_manager.get_secure_random_bytes();
                let session_priv_bytes = self.keys_manager.get_secure_random_bytes();
@@ -1835,7 +1957,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
 
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
-               assert!(self.pending_outbound_payments.lock().unwrap().insert(session_priv_bytes));
 
                let err: Result<(), _> = loop {
                        let mut channel_lock = self.channel_state.lock().unwrap();
@@ -1853,11 +1974,27 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if !chan.get().is_live() {
                                                return Err(APIError::ChannelUnavailable{err: "Peer for first hop currently disconnected/pending monitor update!".to_owned()});
                                        }
-                                       break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
-                                               path: path.clone(),
-                                               session_priv: session_priv.clone(),
-                                               first_hop_htlc_msat: htlc_msat,
-                                       }, onion_packet, &self.logger), channel_state, chan)
+                                       let send_res = break_chan_entry!(self, chan.get_mut().send_htlc_and_commit(
+                                               htlc_msat, payment_hash.clone(), htlc_cltv, HTLCSource::OutboundRoute {
+                                                       path: path.clone(),
+                                                       session_priv: session_priv.clone(),
+                                                       first_hop_htlc_msat: htlc_msat,
+                                                       payment_id,
+                                               }, onion_packet, &self.logger),
+                                       channel_state, chan);
+
+                                       let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
+                                       let payment = pending_outbounds.entry(payment_id).or_insert_with(|| PendingOutboundPayment::Retryable {
+                                               session_privs: HashSet::new(),
+                                               pending_amt_msat: 0,
+                                               payment_hash: *payment_hash,
+                                               payment_secret: *payment_secret,
+                                               starting_block_height: self.best_block.read().unwrap().height(),
+                                               total_msat: total_value,
+                                       });
+                                       assert!(payment.insert(session_priv_bytes, path.last().unwrap().fee_msat));
+
+                                       send_res
                                } {
                                        Some((update_add, commitment_signed, monitor_update)) => {
                                                if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
@@ -1936,11 +2073,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// If a payment_secret *is* provided, we assume that the invoice had the payment_secret feature
        /// bit set (either as required or as available). If multiple paths are present in the Route,
        /// we assume the invoice had the basic_mpp feature set.
-       pub fn send_payment(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>) -> Result<(), PaymentSendFailure> {
-               self.send_payment_internal(route, payment_hash, payment_secret, None)
+       pub fn send_payment(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>) -> Result<PaymentId, PaymentSendFailure> {
+               self.send_payment_internal(route, payment_hash, payment_secret, None, None, None)
        }
 
-       fn send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, keysend_preimage: Option<PaymentPreimage>) -> Result<(), PaymentSendFailure> {
+       fn send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, payment_secret: &Option<PaymentSecret>, keysend_preimage: Option<PaymentPreimage>, payment_id: Option<PaymentId>, recv_value_msat: Option<u64>) -> Result<PaymentId, PaymentSendFailure> {
                if route.paths.len() < 1 {
                        return Err(PaymentSendFailure::ParameterError(APIError::RouteError{err: "There must be at least one path to send over"}));
                }
@@ -1956,6 +2093,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let mut total_value = 0;
                let our_node_id = self.get_our_node_id();
                let mut path_errs = Vec::with_capacity(route.paths.len());
+               let payment_id = if let Some(id) = payment_id { id } else { PaymentId(self.keys_manager.get_secure_random_bytes()) };
                'path_check: for path in route.paths.iter() {
                        if path.len() < 1 || path.len() > 20 {
                                path_errs.push(Err(APIError::RouteError{err: "Path didn't go anywhere/had bogus size"}));
@@ -1973,11 +2111,15 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                if path_errs.iter().any(|e| e.is_err()) {
                        return Err(PaymentSendFailure::PathParameterError(path_errs));
                }
+               if let Some(amt_msat) = recv_value_msat {
+                       debug_assert!(amt_msat >= total_value);
+                       total_value = amt_msat;
+               }
 
                let cur_height = self.best_block.read().unwrap().height() + 1;
                let mut results = Vec::new();
                for path in route.paths.iter() {
-                       results.push(self.send_payment_along_path(&path, &payment_hash, payment_secret, total_value, cur_height, &keysend_preimage));
+                       results.push(self.send_payment_along_path(&path, &payment_hash, payment_secret, total_value, cur_height, payment_id, &keysend_preimage));
                }
                let mut has_ok = false;
                let mut has_err = false;
@@ -1997,10 +2139,58 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                } else if has_err {
                        Err(PaymentSendFailure::AllFailedRetrySafe(results.drain(..).map(|r| r.unwrap_err()).collect()))
                } else {
-                       Ok(())
+                       Ok(payment_id)
                }
        }
 
+       /// Retries a payment along the given [`Route`].
+       ///
+       /// Errors returned are a superset of those returned from [`send_payment`], so see
+       /// [`send_payment`] documentation for more details on errors. This method will also error if the
+       /// retry amount puts the payment more than 10% over the payment's total amount, or if the payment
+       /// for the given `payment_id` cannot be found (likely due to timeout or success).
+       ///
+       /// [`send_payment`]: [`ChannelManager::send_payment`]
+       pub fn retry_payment(&self, route: &Route, payment_id: PaymentId) -> Result<(), PaymentSendFailure> {
+               const RETRY_OVERFLOW_PERCENTAGE: u64 = 10;
+               for path in route.paths.iter() {
+                       if path.len() == 0 {
+                               return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
+                                       err: "length-0 path in route".to_string()
+                               }))
+                       }
+               }
+
+               let (total_msat, payment_hash, payment_secret) = {
+                       let outbounds = self.pending_outbound_payments.lock().unwrap();
+                       if let Some(payment) = outbounds.get(&payment_id) {
+                               match payment {
+                                       PendingOutboundPayment::Retryable {
+                                               total_msat, payment_hash, payment_secret, pending_amt_msat, ..
+                                       } => {
+                                               let retry_amt_msat: u64 = route.paths.iter().map(|path| path.last().unwrap().fee_msat).sum();
+                                               if retry_amt_msat + *pending_amt_msat > *total_msat * (100 + RETRY_OVERFLOW_PERCENTAGE) / 100 {
+                                                       return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
+                                                               err: format!("retry_amt_msat of {} will put pending_amt_msat (currently: {}) more than 10% over total_payment_amt_msat of {}", retry_amt_msat, pending_amt_msat, total_msat).to_string()
+                                                       }))
+                                               }
+                                               (*total_msat, *payment_hash, *payment_secret)
+                                       },
+                                       PendingOutboundPayment::Legacy { .. } => {
+                                               return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
+                                                       err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string()
+                                               }))
+                                       }
+                               }
+                       } else {
+                               return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
+                                       err: format!("Payment with ID {} not found", log_bytes!(payment_id.0)),
+                               }))
+                       }
+               };
+               return self.send_payment_internal(route, payment_hash, &payment_secret, None, Some(payment_id), Some(total_msat)).map(|_| ())
+       }
+
        /// Send a spontaneous payment, which is a payment that does not require the recipient to have
        /// generated an invoice. Optionally, you may specify the preimage. If you do choose to specify
        /// the preimage, it must be a cryptographically secure random value that no intermediate node
@@ -2015,14 +2205,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        /// Note that `route` must have exactly one path.
        ///
        /// [`send_payment`]: Self::send_payment
-       pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option<PaymentPreimage>) -> Result<PaymentHash, PaymentSendFailure> {
+       pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option<PaymentPreimage>) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> {
                let preimage = match payment_preimage {
                        Some(p) => p,
                        None => PaymentPreimage(self.keys_manager.get_secure_random_bytes()),
                };
                let payment_hash = PaymentHash(Sha256::hash(&preimage.0).into_inner());
-               match self.send_payment_internal(route, payment_hash, &None, Some(preimage)) {
-                       Ok(()) => Ok(payment_hash),
+               match self.send_payment_internal(route, payment_hash, &None, Some(preimage), None, None) {
+                       Ok(payment_id) => Ok((payment_hash, payment_id)),
                        Err(e) => Err(e)
                }
        }
@@ -2379,6 +2569,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                                        if let Some(short_id) = channel.get_short_channel_id() {
                                                                                                channel_state.short_to_id.remove(&short_id);
                                                                                        }
+                                                                                       // ChannelClosed event is generated by handle_error for us.
                                                                                        Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
                                                                                },
                                                                                ChannelError::CloseDelayBroadcast(_) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
@@ -2812,23 +3003,27 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        self.fail_htlc_backwards_internal(channel_state,
                                                htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code, data: onion_failure_data});
                                },
-                               HTLCSource::OutboundRoute { session_priv, .. } => {
-                                       if {
-                                               let mut session_priv_bytes = [0; 32];
-                                               session_priv_bytes.copy_from_slice(&session_priv[..]);
-                                               self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
-                                       } {
-                                               self.pending_events.lock().unwrap().push(
-                                                       events::Event::PaymentFailed {
-                                                               payment_hash,
-                                                               rejected_by_dest: false,
-                                                               network_update: None,
-#[cfg(test)]
-                                                               error_code: None,
-#[cfg(test)]
-                                                               error_data: None,
-                                                       }
-                                               )
+                               HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
+                                       let mut session_priv_bytes = [0; 32];
+                                       session_priv_bytes.copy_from_slice(&session_priv[..]);
+                                       let mut outbounds = self.pending_outbound_payments.lock().unwrap();
+                                       if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
+                                               if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) {
+                                                       self.pending_events.lock().unwrap().push(
+                                                               events::Event::PaymentPathFailed {
+                                                                       payment_hash,
+                                                                       rejected_by_dest: false,
+                                                                       network_update: None,
+                                                                       all_paths_failed: payment.get().remaining_parts() == 0,
+                                                                       path: path.clone(),
+                                                                       short_channel_id: None,
+                                                                       #[cfg(test)]
+                                                                       error_code: None,
+                                                                       #[cfg(test)]
+                                                                       error_data: None,
+                                                               }
+                                                       );
+                                               }
                                        } else {
                                                log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
                                        }
@@ -2853,12 +3048,20 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                // from block_connected which may run during initialization prior to the chain_monitor
                // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
                match source {
-                       HTLCSource::OutboundRoute { ref path, session_priv, .. } => {
-                               if {
-                                       let mut session_priv_bytes = [0; 32];
-                                       session_priv_bytes.copy_from_slice(&session_priv[..]);
-                                       !self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
-                               } {
+                       HTLCSource::OutboundRoute { ref path, session_priv, payment_id, .. } => {
+                               let mut session_priv_bytes = [0; 32];
+                               session_priv_bytes.copy_from_slice(&session_priv[..]);
+                               let mut outbounds = self.pending_outbound_payments.lock().unwrap();
+                               let mut all_paths_failed = false;
+                               if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) {
+                                       if !sessions.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) {
+                                               log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
+                                               return;
+                                       }
+                                       if sessions.get().remaining_parts() == 0 {
+                                               all_paths_failed = true;
+                                       }
+                               } else {
                                        log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
                                        return;
                                }
@@ -2867,17 +3070,20 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                match &onion_error {
                                        &HTLCFailReason::LightningError { ref err } => {
 #[cfg(test)]
-                                               let (network_update, payment_retryable, onion_error_code, onion_error_data) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
+                                               let (network_update, short_channel_id, payment_retryable, onion_error_code, onion_error_data) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
 #[cfg(not(test))]
-                                               let (network_update, payment_retryable, _, _) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
+                                               let (network_update, short_channel_id, payment_retryable, _, _) = onion_utils::process_onion_failure(&self.secp_ctx, &self.logger, &source, err.data.clone());
                                                // TODO: If we decided to blame ourselves (or one of our channels) in
                                                // process_onion_failure we should close that channel as it implies our
                                                // next-hop is needlessly blaming us!
                                                self.pending_events.lock().unwrap().push(
-                                                       events::Event::PaymentFailed {
+                                                       events::Event::PaymentPathFailed {
                                                                payment_hash: payment_hash.clone(),
                                                                rejected_by_dest: !payment_retryable,
                                                                network_update,
+                                                               all_paths_failed,
+                                                               path: path.clone(),
+                                                               short_channel_id,
 #[cfg(test)]
                                                                error_code: onion_error_code,
 #[cfg(test)]
@@ -2899,10 +3105,13 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                // TODO: For non-temporary failures, we really should be closing the
                                                // channel here as we apparently can't relay through them anyway.
                                                self.pending_events.lock().unwrap().push(
-                                                       events::Event::PaymentFailed {
+                                                       events::Event::PaymentPathFailed {
                                                                payment_hash: payment_hash.clone(),
                                                                rejected_by_dest: path.len() == 1,
                                                                network_update: None,
+                                                               all_paths_failed,
+                                                               path: path.clone(),
+                                                               short_channel_id: Some(path.first().unwrap().short_channel_id),
 #[cfg(test)]
                                                                error_code: Some(*failure_code),
 #[cfg(test)]
@@ -3099,17 +3308,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
        fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
                match source {
-                       HTLCSource::OutboundRoute { session_priv, .. } => {
+                       HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
                                mem::drop(channel_state_lock);
-                               if {
-                                       let mut session_priv_bytes = [0; 32];
-                                       session_priv_bytes.copy_from_slice(&session_priv[..]);
-                                       self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
-                               } {
-                                       let mut pending_events = self.pending_events.lock().unwrap();
-                                       pending_events.push(events::Event::PaymentSent {
-                                               payment_preimage
-                                       });
+                               let mut session_priv_bytes = [0; 32];
+                               session_priv_bytes.copy_from_slice(&session_priv[..]);
+                               let mut outbounds = self.pending_outbound_payments.lock().unwrap();
+                               let found_payment = if let Some(mut sessions) = outbounds.remove(&payment_id) {
+                                       sessions.remove(&session_priv_bytes, path.last().unwrap().fee_msat)
+                               } else { false };
+                               if found_payment {
+                                       let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
+                                       self.pending_events.lock().unwrap().push(
+                                               events::Event::PaymentSent {
+                                                       payment_preimage,
+                                                       payment_hash: payment_hash
+                                               }
+                                       );
                                } else {
                                        log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0));
                                }
@@ -3348,7 +3562,16 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                Err(e) => try_chan_entry!(self, Err(e), channel_state, chan),
                                        };
                                        if let Err(e) = self.chain_monitor.watch_channel(chan.get().get_funding_txo().unwrap(), monitor) {
-                                               return_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, false, false);
+                                               let mut res = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, false, false);
+                                               if let Err(MsgHandleErrInternal { ref mut shutdown_finish, .. }) = res {
+                                                       // We weren't able to watch the channel to begin with, so no updates should be made on
+                                                       // it. Previously, full_stack_target found an (unreachable) panic when the
+                                                       // monitor update contained within `shutdown_finish` was applied.
+                                                       if let Some((ref mut shutdown_finish, _)) = shutdown_finish {
+                                                               shutdown_finish.0.take();
+                                                       }
+                                               }
+                                               return res
                                        }
                                        funding_tx
                                },
@@ -3491,6 +3714,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        msg: update
                                });
                        }
+                       self.issue_channel_close_events(&chan, ClosureReason::CooperativeClosure);
                }
                Ok(())
        }
@@ -3886,7 +4110,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
                                        }
                                },
-                               MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => {
+                               MonitorEvent::CommitmentTxConfirmed(funding_outpoint) => {
                                        let mut channel_lock = self.channel_state.lock().unwrap();
                                        let channel_state = &mut *channel_lock;
                                        let by_id = &mut channel_state.by_id;
@@ -3902,6 +4126,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                msg: update
                                                        });
                                                }
+                                               self.issue_channel_close_events(&chan, ClosureReason::CommitmentTxConfirmed);
                                                pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                        node_id: chan.get_counterparty_node_id(),
                                                        action: msgs::ErrorAction::SendErrorMessage {
@@ -3963,6 +4188,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        Err(e) => {
                                                let (close_channel, res) = convert_chan_err!(self, e, short_to_id, chan, channel_id);
                                                handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
+                                               // ChannelClosed event is generated by handle_error for us
                                                !close_channel
                                        }
                                }
@@ -4016,6 +4242,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                });
                                                        }
 
+                                                       self.issue_channel_close_events(chan, ClosureReason::CooperativeClosure);
+
                                                        log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
                                                        self.tx_broadcaster.broadcast_transaction(&tx);
                                                        false
@@ -4168,6 +4396,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                self.process_pending_events(&event_handler);
                events.into_inner()
        }
+
+       #[cfg(test)]
+       pub fn has_pending_payments(&self) -> bool {
+               !self.pending_outbound_payments.lock().unwrap().is_empty()
+       }
 }
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<Signer, M, T, K, F, L>
@@ -4343,6 +4576,16 @@ where
                payment_secrets.retain(|_, inbound_payment| {
                        inbound_payment.expiry_time > header.time as u64
                });
+
+               let mut outbounds = self.pending_outbound_payments.lock().unwrap();
+               outbounds.retain(|_, payment| {
+                       const PAYMENT_EXPIRY_BLOCKS: u32 = 3;
+                       if payment.remaining_parts() != 0 { return true }
+                       if let PendingOutboundPayment::Retryable { starting_block_height, .. } = payment {
+                               return *starting_block_height + PAYMENT_EXPIRY_BLOCKS > height
+                       }
+                       true
+               });
        }
 
        fn get_relevant_txids(&self) -> Vec<Txid> {
@@ -4436,6 +4679,7 @@ where
                                                        msg: update
                                                });
                                        }
+                                       self.issue_channel_close_events(channel, ClosureReason::CommitmentTxConfirmed);
                                        pending_msg_events.push(events::MessageSendEvent::HandleError {
                                                node_id: channel.get_counterparty_node_id(),
                                                action: msgs::ErrorAction::SendErrorMessage { msg: e },
@@ -4626,6 +4870,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                                                                msg: update
                                                        });
                                                }
+                                               self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer);
                                                false
                                        } else {
                                                true
@@ -4640,6 +4885,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                                                        if let Some(short_id) = chan.get_short_channel_id() {
                                                                short_to_id.remove(&short_id);
                                                        }
+                                                       self.issue_channel_close_events(chan, ClosureReason::DisconnectedPeer);
                                                        return false;
                                                } else {
                                                        no_channels_remain = false;
@@ -4730,12 +4976,12 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                        for chan in self.list_channels() {
                                if chan.counterparty.node_id == *counterparty_node_id {
                                        // Untrusted messages from peer, we throw away the error if id points to a non-existent channel
-                                       let _ = self.force_close_channel_with_peer(&chan.channel_id, Some(counterparty_node_id));
+                                       let _ = self.force_close_channel_with_peer(&chan.channel_id, Some(counterparty_node_id), Some(&msg.data));
                                }
                        }
                } else {
                        // Untrusted messages from peer, we throw away the error if id points to a non-existent channel
-                       let _ = self.force_close_channel_with_peer(&msg.channel_id, Some(counterparty_node_id));
+                       let _ = self.force_close_channel_with_peer(&msg.channel_id, Some(counterparty_node_id), Some(&msg.data));
                }
        }
 }
@@ -4837,10 +5083,74 @@ impl_writeable_tlv_based!(PendingHTLCInfo, {
        (8, outgoing_cltv_value, required)
 });
 
-impl_writeable_tlv_based_enum!(HTLCFailureMsg, ;
-       (0, Relay),
-       (1, Malformed),
-);
+
+impl Writeable for HTLCFailureMsg {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
+               match self {
+                       HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { channel_id, htlc_id, reason }) => {
+                               0u8.write(writer)?;
+                               channel_id.write(writer)?;
+                               htlc_id.write(writer)?;
+                               reason.write(writer)?;
+                       },
+                       HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
+                               channel_id, htlc_id, sha256_of_onion, failure_code
+                       }) => {
+                               1u8.write(writer)?;
+                               channel_id.write(writer)?;
+                               htlc_id.write(writer)?;
+                               sha256_of_onion.write(writer)?;
+                               failure_code.write(writer)?;
+                       },
+               }
+               Ok(())
+       }
+}
+
+impl Readable for HTLCFailureMsg {
+       fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
+               let id: u8 = Readable::read(reader)?;
+               match id {
+                       0 => {
+                               Ok(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC {
+                                       channel_id: Readable::read(reader)?,
+                                       htlc_id: Readable::read(reader)?,
+                                       reason: Readable::read(reader)?,
+                               }))
+                       },
+                       1 => {
+                               Ok(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC {
+                                       channel_id: Readable::read(reader)?,
+                                       htlc_id: Readable::read(reader)?,
+                                       sha256_of_onion: Readable::read(reader)?,
+                                       failure_code: Readable::read(reader)?,
+                               }))
+                       },
+                       // In versions prior to 0.0.101, HTLCFailureMsg objects were written with type 0 or 1 but
+                       // weren't length-prefixed and thus didn't support reading the TLV stream suffix of the network
+                       // messages contained in the variants.
+                       // In version 0.0.101, support for reading the variants with these types was added, and
+                       // we should migrate to writing these variants when UpdateFailHTLC or
+                       // UpdateFailMalformedHTLC get TLV fields.
+                       2 => {
+                               let length: BigSize = Readable::read(reader)?;
+                               let mut s = FixedLengthReader::new(reader, length.0);
+                               let res = Readable::read(&mut s)?;
+                               s.eat_remaining()?; // Return ShortRead if there's actually not enough bytes
+                               Ok(HTLCFailureMsg::Relay(res))
+                       },
+                       3 => {
+                               let length: BigSize = Readable::read(reader)?;
+                               let mut s = FixedLengthReader::new(reader, length.0);
+                               let res = Readable::read(&mut s)?;
+                               s.eat_remaining()?; // Return ShortRead if there's actually not enough bytes
+                               Ok(HTLCFailureMsg::Malformed(res))
+                       },
+                       _ => Err(DecodeError::UnknownRequiredFeature),
+               }
+       }
+}
+
 impl_writeable_tlv_based_enum!(PendingHTLCStatus, ;
        (0, Forward),
        (1, Fail),
@@ -4911,14 +5221,60 @@ impl Readable for ClaimableHTLC {
        }
 }
 
-impl_writeable_tlv_based_enum!(HTLCSource,
-       (0, OutboundRoute) => {
-               (0, session_priv, required),
-               (2, first_hop_htlc_msat, required),
-               (4, path, vec_type),
-       }, ;
-       (1, PreviousHopData)
-);
+impl Readable for HTLCSource {
+       fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
+               let id: u8 = Readable::read(reader)?;
+               match id {
+                       0 => {
+                               let mut session_priv: ::util::ser::OptionDeserWrapper<SecretKey> = ::util::ser::OptionDeserWrapper(None);
+                               let mut first_hop_htlc_msat: u64 = 0;
+                               let mut path = Some(Vec::new());
+                               let mut payment_id = None;
+                               read_tlv_fields!(reader, {
+                                       (0, session_priv, required),
+                                       (1, payment_id, option),
+                                       (2, first_hop_htlc_msat, required),
+                                       (4, path, vec_type),
+                               });
+                               if payment_id.is_none() {
+                                       // For backwards compat, if there was no payment_id written, use the session_priv bytes
+                                       // instead.
+                                       payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref()));
+                               }
+                               Ok(HTLCSource::OutboundRoute {
+                                       session_priv: session_priv.0.unwrap(),
+                                       first_hop_htlc_msat: first_hop_htlc_msat,
+                                       path: path.unwrap(),
+                                       payment_id: payment_id.unwrap(),
+                               })
+                       }
+                       1 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)),
+                       _ => Err(DecodeError::UnknownRequiredFeature),
+               }
+       }
+}
+
+impl Writeable for HTLCSource {
+       fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::io::Error> {
+               match self {
+                       HTLCSource::OutboundRoute { ref session_priv, ref first_hop_htlc_msat, ref path, payment_id } => {
+                               0u8.write(writer)?;
+                               let payment_id_opt = Some(payment_id);
+                               write_tlv_fields!(writer, {
+                                       (0, session_priv, required),
+                                       (1, payment_id_opt, option),
+                                       (2, first_hop_htlc_msat, required),
+                                       (4, path, vec_type),
+                                });
+                       }
+                       HTLCSource::PreviousHopData(ref field) => {
+                               1u8.write(writer)?;
+                               field.write(writer)?;
+                       }
+               }
+               Ok(())
+       }
+}
 
 impl_writeable_tlv_based_enum!(HTLCFailReason,
        (0, LightningError) => {
@@ -4951,6 +5307,20 @@ impl_writeable_tlv_based!(PendingInboundPayment, {
        (8, min_value_msat, required),
 });
 
+impl_writeable_tlv_based_enum!(PendingOutboundPayment,
+       (0, Legacy) => {
+               (0, session_privs, required),
+       },
+       (2, Retryable) => {
+               (0, session_privs, required),
+               (2, payment_hash, required),
+               (4, payment_secret, option),
+               (6, total_msat, required),
+               (8, pending_amt_msat, required),
+               (10, starting_block_height, required),
+       },
+;);
+
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelManager<Signer, M, T, K, F, L>
        where M::Target: chain::Watch<Signer>,
         T::Target: BroadcasterInterface,
@@ -5039,12 +5409,37 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
                }
 
                let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
-               (pending_outbound_payments.len() as u64).write(writer)?;
-               for session_priv in pending_outbound_payments.iter() {
-                       session_priv.write(writer)?;
+               // For backwards compat, write the session privs and their total length.
+               let mut num_pending_outbounds_compat: u64 = 0;
+               for (_, outbound) in pending_outbound_payments.iter() {
+                       num_pending_outbounds_compat += outbound.remaining_parts() as u64;
+               }
+               num_pending_outbounds_compat.write(writer)?;
+               for (_, outbound) in pending_outbound_payments.iter() {
+                       match outbound {
+                               PendingOutboundPayment::Legacy { session_privs } |
+                               PendingOutboundPayment::Retryable { session_privs, .. } => {
+                                       for session_priv in session_privs.iter() {
+                                               session_priv.write(writer)?;
+                                       }
+                               }
+                       }
                }
 
-               write_tlv_fields!(writer, {});
+               // Encode without retry info for 0.0.101 compatibility.
+               let mut pending_outbound_payments_no_retry: HashMap<PaymentId, HashSet<[u8; 32]>> = HashMap::new();
+               for (id, outbound) in pending_outbound_payments.iter() {
+                       match outbound {
+                               PendingOutboundPayment::Legacy { session_privs } |
+                               PendingOutboundPayment::Retryable { session_privs, .. } => {
+                                       pending_outbound_payments_no_retry.insert(*id, session_privs.clone());
+                               }
+                       }
+               }
+               write_tlv_fields!(writer, {
+                       (1, pending_outbound_payments_no_retry, required),
+                       (3, pending_outbound_payments, required),
+               });
 
                Ok(())
        }
@@ -5181,6 +5576,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                let mut funding_txo_set = HashSet::with_capacity(cmp::min(channel_count as usize, 128));
                let mut by_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
                let mut short_to_id = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
+               let mut channel_closures = Vec::new();
                for _ in 0..channel_count {
                        let mut channel: Channel<Signer> = Channel::read(reader, &args.keys_manager)?;
                        let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
@@ -5211,6 +5607,10 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                        let (_, mut new_failed_htlcs) = channel.force_shutdown(true);
                                        failed_htlcs.append(&mut new_failed_htlcs);
                                        monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
+                                       channel_closures.push(events::Event::ChannelClosed {
+                                               channel_id: channel.channel_id(),
+                                               reason: ClosureReason::OutdatedChannelManager
+                                       });
                                } else {
                                        if let Some(short_channel_id) = channel.get_short_channel_id() {
                                                short_to_id.insert(short_channel_id, channel.channel_id());
@@ -5276,6 +5676,16 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                None => continue,
                        }
                }
+               if forward_htlcs_count > 0 {
+                       // If we have pending HTLCs to forward, assume we either dropped a
+                       // `PendingHTLCsForwardable` or the user received it but never processed it as they
+                       // shut down before the timer hit. Either way, set the time_forwardable to a small
+                       // constant as enough time has likely passed that we should simply handle the forwards
+                       // now, or at least after the user gets a chance to reconnect to our peers.
+                       pending_events_read.push(events::Event::PendingHTLCsForwardable {
+                               time_forwardable: Duration::from_secs(2),
+                       });
+               }
 
                let background_event_count: u64 = Readable::read(reader)?;
                let mut pending_background_events_read: Vec<BackgroundEvent> = Vec::with_capacity(cmp::min(background_event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<BackgroundEvent>()));
@@ -5297,19 +5707,43 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                        }
                }
 
-               let pending_outbound_payments_count: u64 = Readable::read(reader)?;
-               let mut pending_outbound_payments: HashSet<[u8; 32]> = HashSet::with_capacity(cmp::min(pending_outbound_payments_count as usize, MAX_ALLOC_SIZE/32));
-               for _ in 0..pending_outbound_payments_count {
-                       if !pending_outbound_payments.insert(Readable::read(reader)?) {
-                               return Err(DecodeError::InvalidValue);
-                       }
+               let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
+               let mut pending_outbound_payments_compat: HashMap<PaymentId, PendingOutboundPayment> =
+                       HashMap::with_capacity(cmp::min(pending_outbound_payments_count_compat as usize, MAX_ALLOC_SIZE/32));
+               for _ in 0..pending_outbound_payments_count_compat {
+                       let session_priv = Readable::read(reader)?;
+                       let payment = PendingOutboundPayment::Legacy {
+                               session_privs: [session_priv].iter().cloned().collect()
+                       };
+                       if pending_outbound_payments_compat.insert(PaymentId(session_priv), payment).is_some() {
+                               return Err(DecodeError::InvalidValue)
+                       };
                }
 
-               read_tlv_fields!(reader, {});
+               // pending_outbound_payments_no_retry is for compatibility with 0.0.101 clients.
+               let mut pending_outbound_payments_no_retry: Option<HashMap<PaymentId, HashSet<[u8; 32]>>> = None;
+               let mut pending_outbound_payments = None;
+               read_tlv_fields!(reader, {
+                       (1, pending_outbound_payments_no_retry, option),
+                       (3, pending_outbound_payments, option),
+               });
+               if pending_outbound_payments.is_none() && pending_outbound_payments_no_retry.is_none() {
+                       pending_outbound_payments = Some(pending_outbound_payments_compat);
+               } else if pending_outbound_payments.is_none() {
+                       let mut outbounds = HashMap::new();
+                       for (id, session_privs) in pending_outbound_payments_no_retry.unwrap().drain() {
+                               outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs });
+                       }
+                       pending_outbound_payments = Some(outbounds);
+               }
 
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes());
 
+               if !channel_closures.is_empty() {
+                       pending_events_read.append(&mut channel_closures);
+               }
+
                let channel_manager = ChannelManager {
                        genesis_hash,
                        fee_estimator: args.fee_estimator,
@@ -5326,7 +5760,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                pending_msg_events: Vec::new(),
                        }),
                        pending_inbound_payments: Mutex::new(pending_inbound_payments),
-                       pending_outbound_payments: Mutex::new(pending_outbound_payments),
+                       pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),
 
                        our_network_key: args.keys_manager.get_node_secret(),
                        our_network_pubkey: PublicKey::from_secret_key(&secp_ctx, &args.keys_manager.get_node_secret()),
@@ -5364,7 +5798,7 @@ mod tests {
        use bitcoin::hashes::sha256::Hash as Sha256;
        use core::time::Duration;
        use ln::{PaymentPreimage, PaymentHash, PaymentSecret};
-       use ln::channelmanager::PaymentSendFailure;
+       use ln::channelmanager::{PaymentId, PaymentSendFailure};
        use ln::features::{InitFeatures, InvoiceFeatures};
        use ln::functional_test_utils::*;
        use ln::msgs;
@@ -5515,10 +5949,11 @@ mod tests {
                let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
                let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph, &nodes[1].node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
                let (payment_preimage, our_payment_hash, payment_secret) = get_payment_preimage_hash!(&nodes[1]);
+               let payment_id = PaymentId([42; 32]);
                // Use the utility function send_payment_along_path to send the payment with MPP data which
                // indicates there are more HTLCs coming.
                let cur_height = CHAN_CONFIRM_DEPTH + 1; // route_payment calls send_payment, which adds 1 to the current height. So we do the same here to match.
-               nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, &None).unwrap();
+               nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
@@ -5548,7 +5983,7 @@ mod tests {
                expect_payment_failed!(nodes[0], our_payment_hash, true);
 
                // Send the second half of the original MPP payment.
-               nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, &None).unwrap();
+               nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
@@ -5586,17 +6021,13 @@ mod tests {
                nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_third_raa);
                check_added_monitors!(nodes[0], 1);
 
-               // There's an existing bug that generates a PaymentSent event for each MPP path, so handle that here.
+               // Note that successful MPP payments will generate 1 event upon the first path's success. No
+               // further events will be generated for subsequence path successes.
                let events = nodes[0].node.get_and_clear_pending_events();
                match events[0] {
-                       Event::PaymentSent { payment_preimage: ref preimage } => {
-                               assert_eq!(payment_preimage, *preimage);
-                       },
-                       _ => panic!("Unexpected event"),
-               }
-               match events[1] {
-                       Event::PaymentSent { payment_preimage: ref preimage } => {
+                       Event::PaymentSent { payment_preimage: ref preimage, payment_hash: ref hash } => {
                                assert_eq!(payment_preimage, *preimage);
+                               assert_eq!(our_payment_hash, *hash);
                        },
                        _ => panic!("Unexpected event"),
                }
@@ -5649,7 +6080,7 @@ mod tests {
                // To start (2), send a keysend payment but don't claim it.
                let payment_preimage = PaymentPreimage([42; 32]);
                let route = get_route(&nodes[0].node.get_our_node_id(), &nodes[0].net_graph_msg_handler.network_graph, &expected_route.last().unwrap().node.get_our_node_id(), Some(InvoiceFeatures::known()), None, &Vec::new(), 100_000, TEST_FINAL_CLTV, &logger).unwrap();
-               let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap();
+               let (payment_hash, _) = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage)).unwrap();
                check_added_monitors!(nodes[0], 1);
                let mut events = nodes[0].node.get_and_clear_pending_msg_events();
                assert_eq!(events.len(), 1);
@@ -5708,7 +6139,7 @@ mod tests {
 
                let test_preimage = PaymentPreimage([42; 32]);
                let mismatch_payment_hash = PaymentHash([43; 32]);
-               let _ = nodes[0].node.send_payment_internal(&route, mismatch_payment_hash, &None, Some(test_preimage)).unwrap();
+               let _ = nodes[0].node.send_payment_internal(&route, mismatch_payment_hash, &None, Some(test_preimage), None, None).unwrap();
                check_added_monitors!(nodes[0], 1);
 
                let updates = get_htlc_update_msgs!(nodes[0], nodes[1].node.get_our_node_id());
@@ -5745,7 +6176,7 @@ mod tests {
                let test_preimage = PaymentPreimage([42; 32]);
                let test_secret = PaymentSecret([43; 32]);
                let payment_hash = PaymentHash(Sha256::hash(&test_preimage.0).into_inner());
-               let _ = nodes[0].node.send_payment_internal(&route, payment_hash, &Some(test_secret), Some(test_preimage)).unwrap();
+               let _ = nodes[0].node.send_payment_internal(&route, payment_hash, &Some(test_secret), Some(test_preimage), None, None).unwrap();
                check_added_monitors!(nodes[0], 1);
 
                let updates = get_htlc_update_msgs!(nodes[0], nodes[1].node.get_our_node_id());