Merge pull request #1134 from jkczyz/2021-10-payee-arg
[rust-lightning] / lightning / src / ln / channelmanager.rs
index 385a031cabbec5db8cb1b78e6a5f88aaf1e4b441..63c4bcc6b13813df6ef93472532ab4e67bb5ef4b 100644 (file)
@@ -145,7 +145,7 @@ pub(super) enum HTLCForwardInfo {
 }
 
 /// Tracks the inbound corresponding to an outbound HTLC
-#[derive(Clone, PartialEq)]
+#[derive(Clone, Hash, PartialEq, Eq)]
 pub(crate) struct HTLCPreviousHopData {
        short_channel_id: u64,
        htlc_id: u64,
@@ -189,7 +189,8 @@ impl Readable for PaymentId {
        }
 }
 /// Tracks the inbound corresponding to an outbound HTLC
-#[derive(Clone, PartialEq)]
+#[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash
+#[derive(Clone, PartialEq, Eq)]
 pub(crate) enum HTLCSource {
        PreviousHopData(HTLCPreviousHopData),
        OutboundRoute {
@@ -199,8 +200,28 @@ pub(crate) enum HTLCSource {
                /// doing a double-pass on route when we get a failure back
                first_hop_htlc_msat: u64,
                payment_id: PaymentId,
+               payment_secret: Option<PaymentSecret>,
        },
 }
+#[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash
+impl core::hash::Hash for HTLCSource {
+       fn hash<H: core::hash::Hasher>(&self, hasher: &mut H) {
+               match self {
+                       HTLCSource::PreviousHopData(prev_hop_data) => {
+                               0u8.hash(hasher);
+                               prev_hop_data.hash(hasher);
+                       },
+                       HTLCSource::OutboundRoute { path, session_priv, payment_id, payment_secret, first_hop_htlc_msat } => {
+                               1u8.hash(hasher);
+                               path.hash(hasher);
+                               session_priv[..].hash(hasher);
+                               payment_id.hash(hasher);
+                               payment_secret.hash(hasher);
+                               first_hop_htlc_msat.hash(hasher);
+                       },
+               }
+       }
+}
 #[cfg(test)]
 impl HTLCSource {
        pub fn dummy() -> Self {
@@ -209,6 +230,7 @@ impl HTLCSource {
                        session_priv: SecretKey::from_slice(&[1; 32]).unwrap(),
                        first_hop_htlc_msat: 0,
                        payment_id: PaymentId([2; 32]),
+                       payment_secret: None,
                }
        }
 }
@@ -416,19 +438,51 @@ pub(crate) enum PendingOutboundPayment {
                /// Our best known block height at the time this payment was initiated.
                starting_block_height: u32,
        },
+       /// When a pending payment is fulfilled, we continue tracking it until all pending HTLCs have
+       /// been resolved. This ensures we don't look up pending payments in ChannelMonitors on restart
+       /// and add a pending payment that was already fulfilled.
+       Fulfilled {
+               session_privs: HashSet<[u8; 32]>,
+       },
 }
 
 impl PendingOutboundPayment {
-       fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: u64) -> bool {
+       fn is_retryable(&self) -> bool {
+               match self {
+                       PendingOutboundPayment::Retryable { .. } => true,
+                       _ => false,
+               }
+       }
+       fn is_fulfilled(&self) -> bool {
+               match self {
+                       PendingOutboundPayment::Fulfilled { .. } => true,
+                       _ => false,
+               }
+       }
+
+       fn mark_fulfilled(&mut self) {
+               let mut session_privs = HashSet::new();
+               core::mem::swap(&mut session_privs, match self {
+                       PendingOutboundPayment::Legacy { session_privs } |
+                       PendingOutboundPayment::Retryable { session_privs, .. } |
+                       PendingOutboundPayment::Fulfilled { session_privs }
+                               => session_privs
+               });
+               *self = PendingOutboundPayment::Fulfilled { session_privs };
+       }
+
+       /// panics if part_amt_msat is None and !self.is_fulfilled
+       fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: Option<u64>) -> bool {
                let remove_res = match self {
                        PendingOutboundPayment::Legacy { session_privs } |
-                       PendingOutboundPayment::Retryable { session_privs, .. } => {
+                       PendingOutboundPayment::Retryable { session_privs, .. } |
+                       PendingOutboundPayment::Fulfilled { session_privs } => {
                                session_privs.remove(session_priv)
                        }
                };
                if remove_res {
                        if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self {
-                               *pending_amt_msat -= part_amt_msat;
+                               *pending_amt_msat -= part_amt_msat.expect("We must only not provide an amount if the payment was already fulfilled");
                        }
                }
                remove_res
@@ -440,6 +494,7 @@ impl PendingOutboundPayment {
                        PendingOutboundPayment::Retryable { session_privs, .. } => {
                                session_privs.insert(session_priv)
                        }
+                       PendingOutboundPayment::Fulfilled { .. } => false
                };
                if insert_res {
                        if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self {
@@ -452,7 +507,8 @@ impl PendingOutboundPayment {
        fn remaining_parts(&self) -> usize {
                match self {
                        PendingOutboundPayment::Legacy { session_privs } |
-                       PendingOutboundPayment::Retryable { session_privs, .. } => {
+                       PendingOutboundPayment::Retryable { session_privs, .. } |
+                       PendingOutboundPayment::Fulfilled { session_privs } => {
                                session_privs.len()
                        }
                }
@@ -1002,7 +1058,7 @@ macro_rules! handle_monitor_err {
        ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
                handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new())
        };
-       ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $chan_id: expr) => {
+       ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => {
                match $err {
                        ChannelMonitorUpdateErr::PermanentFailure => {
                                log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateErr::PermanentFailure", log_bytes!($chan_id[..]));
@@ -1023,7 +1079,7 @@ macro_rules! handle_monitor_err {
                                (res, true)
                        },
                        ChannelMonitorUpdateErr::TemporaryFailure => {
-                               log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards and {} fails",
+                               log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations",
                                                log_bytes!($chan_id[..]),
                                                if $resend_commitment && $resend_raa {
                                                                match $action_type {
@@ -1034,25 +1090,29 @@ macro_rules! handle_monitor_err {
                                                        else if $resend_raa { "RAA" }
                                                        else { "nothing" },
                                                (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(),
-                                               (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len());
+                                               (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(),
+                                               (&$failed_finalized_fulfills as &Vec<HTLCSource>).len());
                                if !$resend_commitment {
                                        debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa);
                                }
                                if !$resend_raa {
                                        debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
                                }
-                               $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails);
+                               $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $failed_finalized_fulfills);
                                (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
                        },
                }
        };
-       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { {
-               let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $entry.key());
+       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { {
+               let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key());
                if drop {
                        $entry.remove_entry();
                }
                res
        } };
+       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
+               handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, Vec::new());
+       }
 }
 
 macro_rules! return_monitor_err {
@@ -1441,7 +1501,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if let Some(monitor_update) = monitor_update {
                                                if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
                                                        let (result, is_permanent) =
-                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), Vec::new(), chan_entry.key());
                                                        if is_permanent {
                                                                remove_channel!(channel_state, chan_entry);
                                                                break result;
@@ -1979,6 +2039,17 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
 
                let err: Result<(), _> = loop {
                        let mut channel_lock = self.channel_state.lock().unwrap();
+
+                       let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
+                       let payment_entry = pending_outbounds.entry(payment_id);
+                       if let hash_map::Entry::Occupied(payment) = &payment_entry {
+                               if !payment.get().is_retryable() {
+                                       return Err(APIError::RouteError {
+                                               err: "Payment already completed"
+                                       });
+                               }
+                       }
+
                        let id = match channel_lock.short_to_id.get(&path.first().unwrap().short_channel_id) {
                                None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}),
                                Some(id) => id.clone(),
@@ -1999,11 +2070,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                        session_priv: session_priv.clone(),
                                                        first_hop_htlc_msat: htlc_msat,
                                                        payment_id,
+                                                       payment_secret: payment_secret.clone(),
                                                }, onion_packet, &self.logger),
                                        channel_state, chan);
 
-                                       let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
-                                       let payment = pending_outbounds.entry(payment_id).or_insert_with(|| PendingOutboundPayment::Retryable {
+                                       let payment = payment_entry.or_insert_with(|| PendingOutboundPayment::Retryable {
                                                session_privs: HashSet::new(),
                                                pending_amt_msat: 0,
                                                payment_hash: *payment_hash,
@@ -2199,7 +2270,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
                                                        err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string()
                                                }))
-                                       }
+                                       },
+                                       PendingOutboundPayment::Fulfilled { .. } => {
+                                               return Err(PaymentSendFailure::ParameterError(APIError::RouteError {
+                                                       err: "Payment already completed"
+                                               }));
+                                       },
                                }
                        } else {
                                return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
@@ -2846,7 +2922,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let ret_err = match res {
                        Ok(Some((update_fee, commitment_signed, monitor_update))) => {
                                if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
-                                       let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), chan_id);
+                                       let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), Vec::new(), chan_id);
                                        if drop { retain_channel = false; }
                                        res
                                } else {
@@ -3027,7 +3103,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        session_priv_bytes.copy_from_slice(&session_priv[..]);
                                        let mut outbounds = self.pending_outbound_payments.lock().unwrap();
                                        if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
-                                               if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) {
+                                               if payment.get_mut().remove(&session_priv_bytes, Some(path.last().unwrap().fee_msat)) &&
+                                                       !payment.get().is_fulfilled()
+                                               {
                                                        self.pending_events.lock().unwrap().push(
                                                                events::Event::PaymentPathFailed {
                                                                        payment_hash,
@@ -3073,12 +3151,16 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                session_priv_bytes.copy_from_slice(&session_priv[..]);
                                let mut outbounds = self.pending_outbound_payments.lock().unwrap();
                                let mut all_paths_failed = false;
-                               if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) {
-                                       if !sessions.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) {
+                               if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
+                                       if !payment.get_mut().remove(&session_priv_bytes, Some(path.last().unwrap().fee_msat)) {
                                                log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
                                                return;
                                        }
-                                       if sessions.get().remaining_parts() == 0 {
+                                       if payment.get().is_fulfilled() {
+                                               log_trace!(self.logger, "Received failure of HTLC with payment_hash {} after payment completion", log_bytes!(payment_hash.0));
+                                               return;
+                                       }
+                                       if payment.get().remaining_parts() == 0 {
                                                all_paths_failed = true;
                                        }
                                } else {
@@ -3328,6 +3410,23 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                } else { unreachable!(); }
        }
 
+       fn finalize_claims(&self, mut sources: Vec<HTLCSource>) {
+               for source in sources.drain(..) {
+                       if let HTLCSource::OutboundRoute { session_priv, payment_id, .. } = source {
+                               let mut session_priv_bytes = [0; 32];
+                               session_priv_bytes.copy_from_slice(&session_priv[..]);
+                               let mut outbounds = self.pending_outbound_payments.lock().unwrap();
+                               if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
+                                       assert!(payment.get().is_fulfilled());
+                                       payment.get_mut().remove(&session_priv_bytes, None);
+                                       if payment.get().remaining_parts() == 0 {
+                                               payment.remove();
+                                       }
+                               }
+                       }
+               }
+       }
+
        fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
                match source {
                        HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
@@ -3335,8 +3434,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                let mut session_priv_bytes = [0; 32];
                                session_priv_bytes.copy_from_slice(&session_priv[..]);
                                let mut outbounds = self.pending_outbound_payments.lock().unwrap();
-                               let found_payment = if let Some(mut sessions) = outbounds.remove(&payment_id) {
-                                       sessions.remove(&session_priv_bytes, path.last().unwrap().fee_msat)
+                               let found_payment = if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
+                                       let found_payment = !payment.get().is_fulfilled();
+                                       payment.get_mut().mark_fulfilled();
+                                       if from_onchain {
+                                               // We currently immediately remove HTLCs which were fulfilled on-chain.
+                                               // This could potentially lead to removing a pending payment too early,
+                                               // with a reorg of one block causing us to re-add the fulfilled payment on
+                                               // restart.
+                                               // TODO: We should have a second monitor event that informs us of payments
+                                               // irrevocably fulfilled.
+                                               payment.get_mut().remove(&session_priv_bytes, Some(path.last().unwrap().fee_msat));
+                                               if payment.get().remaining_parts() == 0 {
+                                                       payment.remove();
+                                               }
+                                       }
+                                       found_payment
                                } else { false };
                                if found_payment {
                                        let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
@@ -3411,7 +3524,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
                let chan_restoration_res;
-               let mut pending_failures = {
+               let (mut pending_failures, finalized_claims) = {
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_lock;
                        let mut channel = match channel_state.by_id.entry(funding_txo.to_channel_id()) {
@@ -3422,8 +3535,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                return;
                        }
 
-                       let (raa, commitment_update, order, pending_forwards, pending_failures, funding_broadcastable, funding_locked) = channel.get_mut().monitor_updating_restored(&self.logger);
-                       let channel_update = if funding_locked.is_some() && channel.get().is_usable() && !channel.get().should_announce() {
+                       let updates = channel.get_mut().monitor_updating_restored(&self.logger);
+                       let channel_update = if updates.funding_locked.is_some() && channel.get().is_usable() && !channel.get().should_announce() {
                                // We only send a channel_update in the case where we are just now sending a
                                // funding_locked and the channel is in a usable state. Further, we rely on the
                                // normal announcement_signatures process to send a channel_update for public
@@ -3433,13 +3546,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        msg: self.get_channel_update_for_unicast(channel.get()).unwrap(),
                                })
                        } else { None };
-                       chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, raa, commitment_update, order, None, pending_forwards, funding_broadcastable, funding_locked);
+                       chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, updates.raa, updates.commitment_update, updates.order, None, updates.accepted_htlcs, updates.funding_broadcastable, updates.funding_locked);
                        if let Some(upd) = channel_update {
                                channel_state.pending_msg_events.push(upd);
                        }
-                       pending_failures
+                       (updates.failed_htlcs, updates.finalized_claimed_htlcs)
                };
                post_handle_chan_restoration!(self, chan_restoration_res);
+               self.finalize_claims(finalized_claims);
                for failure in pending_failures.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
                }
@@ -3528,7 +3642,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
                                        // accepted payment from yet. We do, however, need to wait to send our funding_locked
                                        // until we have persisted our monitor.
-                                       chan.monitor_update_failed(false, false, Vec::new(), Vec::new());
+                                       chan.monitor_update_failed(false, false, Vec::new(), Vec::new(), Vec::new());
                                },
                        }
                }
@@ -3646,7 +3760,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if let Some(monitor_update) = monitor_update {
                                                if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
                                                        let (result, is_permanent) =
-                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), Vec::new(), chan_entry.key());
                                                        if is_permanent {
                                                                remove_channel!(channel_state, chan_entry);
                                                                break result;
@@ -3933,37 +4047,51 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                break Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
                                        let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
-                                       let (commitment_update, pending_forwards, pending_failures, monitor_update, htlcs_to_fail_in) =
-                                               break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan);
-                                       htlcs_to_fail = htlcs_to_fail_in;
-                                       if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
+                                       let raa_updates = break_chan_entry!(self,
+                                               chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan);
+                                       htlcs_to_fail = raa_updates.holding_cell_failed_htlcs;
+                                       if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), raa_updates.monitor_update) {
                                                if was_frozen_for_monitor {
-                                                       assert!(commitment_update.is_none() && pending_forwards.is_empty() && pending_failures.is_empty());
+                                                       assert!(raa_updates.commitment_update.is_none());
+                                                       assert!(raa_updates.accepted_htlcs.is_empty());
+                                                       assert!(raa_updates.failed_htlcs.is_empty());
+                                                       assert!(raa_updates.finalized_claimed_htlcs.is_empty());
                                                        break Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned()));
                                                } else {
-                                                       if let Err(e) = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, commitment_update.is_some(), pending_forwards, pending_failures) {
+                                                       if let Err(e) = handle_monitor_err!(self, e, channel_state, chan,
+                                                                       RAACommitmentOrder::CommitmentFirst, false,
+                                                                       raa_updates.commitment_update.is_some(),
+                                                                       raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
+                                                                       raa_updates.finalized_claimed_htlcs) {
                                                                break Err(e);
                                                        } else { unreachable!(); }
                                                }
                                        }
-                                       if let Some(updates) = commitment_update {
+                                       if let Some(updates) = raa_updates.commitment_update {
                                                channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
                                                        node_id: counterparty_node_id.clone(),
                                                        updates,
                                                });
                                        }
-                                       break Ok((pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"), chan.get().get_funding_txo().unwrap()))
+                                       break Ok((raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
+                                                       raa_updates.finalized_claimed_htlcs,
+                                                       chan.get().get_short_channel_id()
+                                                               .expect("RAA should only work on a short-id-available channel"),
+                                                       chan.get().get_funding_txo().unwrap()))
                                },
                                hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                        }
                };
                self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id);
                match res {
-                       Ok((pending_forwards, mut pending_failures, short_channel_id, channel_outpoint)) => {
+                       Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs,
+                               short_channel_id, channel_outpoint)) =>
+                       {
                                for failure in pending_failures.drain(..) {
                                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
                                }
                                self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, pending_forwards)]);
+                               self.finalize_claims(finalized_claim_htlcs);
                                Ok(())
                        },
                        Err(e) => Err(e)
@@ -4192,7 +4320,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                if let Some((commitment_update, monitor_update)) = commitment_opt {
                                                        if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
                                                                has_monitor_update = true;
-                                                               let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id);
+                                                               let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), Vec::new(), channel_id);
                                                                handle_errors.push((chan.get_counterparty_node_id(), res));
                                                                if close_channel { return false; }
                                                        } else {
@@ -5249,10 +5377,12 @@ impl Readable for HTLCSource {
                                let mut first_hop_htlc_msat: u64 = 0;
                                let mut path = Some(Vec::new());
                                let mut payment_id = None;
+                               let mut payment_secret = None;
                                read_tlv_fields!(reader, {
                                        (0, session_priv, required),
                                        (1, payment_id, option),
                                        (2, first_hop_htlc_msat, required),
+                                       (3, payment_secret, option),
                                        (4, path, vec_type),
                                });
                                if payment_id.is_none() {
@@ -5265,6 +5395,7 @@ impl Readable for HTLCSource {
                                        first_hop_htlc_msat: first_hop_htlc_msat,
                                        path: path.unwrap(),
                                        payment_id: payment_id.unwrap(),
+                                       payment_secret,
                                })
                        }
                        1 => Ok(HTLCSource::PreviousHopData(Readable::read(reader)?)),
@@ -5276,13 +5407,14 @@ impl Readable for HTLCSource {
 impl Writeable for HTLCSource {
        fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::io::Error> {
                match self {
-                       HTLCSource::OutboundRoute { ref session_priv, ref first_hop_htlc_msat, ref path, payment_id } => {
+                       HTLCSource::OutboundRoute { ref session_priv, ref first_hop_htlc_msat, ref path, payment_id, payment_secret } => {
                                0u8.write(writer)?;
                                let payment_id_opt = Some(payment_id);
                                write_tlv_fields!(writer, {
                                        (0, session_priv, required),
                                        (1, payment_id_opt, option),
                                        (2, first_hop_htlc_msat, required),
+                                       (3, payment_secret, option),
                                        (4, path, vec_type),
                                 });
                        }
@@ -5326,10 +5458,13 @@ impl_writeable_tlv_based!(PendingInboundPayment, {
        (8, min_value_msat, required),
 });
 
-impl_writeable_tlv_based_enum!(PendingOutboundPayment,
+impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment,
        (0, Legacy) => {
                (0, session_privs, required),
        },
+       (1, Fulfilled) => {
+               (0, session_privs, required),
+       },
        (2, Retryable) => {
                (0, session_privs, required),
                (2, payment_hash, required),
@@ -5338,7 +5473,7 @@ impl_writeable_tlv_based_enum!(PendingOutboundPayment,
                (8, pending_amt_msat, required),
                (10, starting_block_height, required),
        },
-;);
+);
 
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelManager<Signer, M, T, K, F, L>
        where M::Target: chain::Watch<Signer>,
@@ -5431,7 +5566,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
                // For backwards compat, write the session privs and their total length.
                let mut num_pending_outbounds_compat: u64 = 0;
                for (_, outbound) in pending_outbound_payments.iter() {
-                       num_pending_outbounds_compat += outbound.remaining_parts() as u64;
+                       if !outbound.is_fulfilled() {
+                               num_pending_outbounds_compat += outbound.remaining_parts() as u64;
+                       }
                }
                num_pending_outbounds_compat.write(writer)?;
                for (_, outbound) in pending_outbound_payments.iter() {
@@ -5442,6 +5579,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
                                                session_priv.write(writer)?;
                                        }
                                }
+                               PendingOutboundPayment::Fulfilled { .. } => {},
                        }
                }
 
@@ -5452,7 +5590,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
                                PendingOutboundPayment::Legacy { session_privs } |
                                PendingOutboundPayment::Retryable { session_privs, .. } => {
                                        pending_outbound_payments_no_retry.insert(*id, session_privs.clone());
-                               }
+                               },
+                               _ => {},
                        }
                }
                write_tlv_fields!(writer, {
@@ -5762,6 +5901,49 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs });
                        }
                        pending_outbound_payments = Some(outbounds);
+               } else {
+                       // If we're tracking pending payments, ensure we haven't lost any by looking at the
+                       // ChannelMonitor data for any channels for which we do not have authorative state
+                       // (i.e. those for which we just force-closed above or we otherwise don't have a
+                       // corresponding `Channel` at all).
+                       // This avoids several edge-cases where we would otherwise "forget" about pending
+                       // payments which are still in-flight via their on-chain state.
+                       // We only rebuild the pending payments map if we were most recently serialized by
+                       // 0.0.102+
+                       for (_, monitor) in args.channel_monitors {
+                               if by_id.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() {
+                                       for (htlc_source, htlc) in monitor.get_pending_outbound_htlcs() {
+                                               if let HTLCSource::OutboundRoute { payment_id, session_priv, path, payment_secret, .. } = htlc_source {
+                                                       if path.is_empty() {
+                                                               log_error!(args.logger, "Got an empty path for a pending payment");
+                                                               return Err(DecodeError::InvalidValue);
+                                                       }
+                                                       let path_amt = path.last().unwrap().fee_msat;
+                                                       let mut session_priv_bytes = [0; 32];
+                                                       session_priv_bytes[..].copy_from_slice(&session_priv[..]);
+                                                       match pending_outbound_payments.as_mut().unwrap().entry(payment_id) {
+                                                               hash_map::Entry::Occupied(mut entry) => {
+                                                                       let newly_added = entry.get_mut().insert(session_priv_bytes, path_amt);
+                                                                       log_info!(args.logger, "{} a pending payment path for {} msat for session priv {} on an existing pending payment with payment hash {}",
+                                                                               if newly_added { "Added" } else { "Had" }, path_amt, log_bytes!(session_priv_bytes), log_bytes!(htlc.payment_hash.0));
+                                                               },
+                                                               hash_map::Entry::Vacant(entry) => {
+                                                                       entry.insert(PendingOutboundPayment::Retryable {
+                                                                               session_privs: [session_priv_bytes].iter().map(|a| *a).collect(),
+                                                                               payment_hash: htlc.payment_hash,
+                                                                               payment_secret,
+                                                                               pending_amt_msat: path_amt,
+                                                                               total_msat: path_amt,
+                                                                               starting_block_height: best_block_height,
+                                                                       });
+                                                                       log_info!(args.logger, "Added a pending payment for {} msat with payment hash {} for path with session priv {}",
+                                                                               path_amt, log_bytes!(htlc.payment_hash.0),  log_bytes!(session_priv_bytes));
+                                                               }
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
                }
 
                let mut secp_ctx = Secp256k1::new();