Inform ChannelManager when fulfilled HTLCs are finalized
authorMatt Corallo <git@bluematt.me>
Sat, 2 Oct 2021 22:35:07 +0000 (22:35 +0000)
committerMatt Corallo <git@bluematt.me>
Fri, 22 Oct 2021 18:41:42 +0000 (18:41 +0000)
When an HTLC has been failed, we track it up until the point there
exists no broadcastable commitment transaction which has the HTLC
present, at which point Channel returns the HTLCSource back to the
ChannelManager, which fails the HTLC backwards appropriately.

When an HTLC is fulfilled, however, we fulfill on the backwards path
immediately. This is great for claiming upstream HTLCs, but when we
want to track pending payments, we need to ensure we can check with
ChannelMonitor data to rebuild pending payments. In order to do so,
we need an event similar to the HTLC failure event, but for
fulfills instead.

Specifically, if we force-close a channel, we remove its off-chain
`Channel` object entirely, at which point, on reload, we may notice
HTLC(s) which are not present in our pending payments map (as they
may have received a payment preimage, but not fully committed to
it). Thus, we'd conclude we still have a retryable payment, which
is untrue.

This commit does so, informing the ChannelManager via a new return
element where appropriate of the HTLCSource corresponding to the
failed HTLC.

lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs

index 4cd19c6ac06e5b5833dca292d7316ef06d053538..16e6576b87e5944508674b615dcdc935bb1c9367 100644 (file)
@@ -345,6 +345,7 @@ pub(super) struct RAAUpdates {
        pub commitment_update: Option<msgs::CommitmentUpdate>,
        pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
        pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
+       pub finalized_claimed_htlcs: Vec<HTLCSource>,
        pub monitor_update: ChannelMonitorUpdate,
        pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>,
 }
@@ -356,6 +357,7 @@ pub(super) struct MonitorRestoreUpdates {
        pub order: RAACommitmentOrder,
        pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
        pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
+       pub finalized_claimed_htlcs: Vec<HTLCSource>,
        pub funding_broadcastable: Option<Transaction>,
        pub funding_locked: Option<msgs::FundingLocked>,
 }
@@ -427,6 +429,7 @@ pub(super) struct Channel<Signer: Sign> {
        monitor_pending_commitment_signed: bool,
        monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>,
        monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
+       monitor_pending_finalized_fulfills: Vec<HTLCSource>,
 
        // pending_update_fee is filled when sending and receiving update_fee.
        //
@@ -713,6 +716,7 @@ impl<Signer: Sign> Channel<Signer> {
                        monitor_pending_commitment_signed: false,
                        monitor_pending_forwards: Vec::new(),
                        monitor_pending_failures: Vec::new(),
+                       monitor_pending_finalized_fulfills: Vec::new(),
 
                        #[cfg(debug_assertions)]
                        holder_max_commitment_tx_output: Mutex::new((channel_value_satoshis * 1000 - push_msat, push_msat)),
@@ -976,6 +980,7 @@ impl<Signer: Sign> Channel<Signer> {
                        monitor_pending_commitment_signed: false,
                        monitor_pending_forwards: Vec::new(),
                        monitor_pending_failures: Vec::new(),
+                       monitor_pending_finalized_fulfills: Vec::new(),
 
                        #[cfg(debug_assertions)]
                        holder_max_commitment_tx_output: Mutex::new((msg.push_msat, msg.funding_satoshis * 1000 - msg.push_msat)),
@@ -2798,6 +2803,7 @@ impl<Signer: Sign> Channel<Signer> {
                log_trace!(logger, "Updating HTLCs on receipt of RAA in channel {}...", log_bytes!(self.channel_id()));
                let mut to_forward_infos = Vec::new();
                let mut revoked_htlcs = Vec::new();
+               let mut finalized_claimed_htlcs = Vec::new();
                let mut update_fail_htlcs = Vec::new();
                let mut update_fail_malformed_htlcs = Vec::new();
                let mut require_commitment = false;
@@ -2824,6 +2830,7 @@ impl<Signer: Sign> Channel<Signer> {
                                        if let Some(reason) = fail_reason.clone() { // We really want take() here, but, again, non-mut ref :(
                                                revoked_htlcs.push((htlc.source.clone(), htlc.payment_hash, reason));
                                        } else {
+                                               finalized_claimed_htlcs.push(htlc.source.clone());
                                                // They fulfilled, so we sent them money
                                                value_to_self_msat_diff -= htlc.amount_msat as i64;
                                        }
@@ -2920,9 +2927,10 @@ impl<Signer: Sign> Channel<Signer> {
                        }
                        self.monitor_pending_forwards.append(&mut to_forward_infos);
                        self.monitor_pending_failures.append(&mut revoked_htlcs);
+                       self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
                        log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
                        return Ok(RAAUpdates {
-                               commitment_update: None,
+                               commitment_update: None, finalized_claimed_htlcs: Vec::new(),
                                accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(),
                                monitor_update,
                                holding_cell_failed_htlcs: Vec::new()
@@ -2947,6 +2955,7 @@ impl<Signer: Sign> Channel<Signer> {
 
                                Ok(RAAUpdates {
                                        commitment_update: Some(commitment_update),
+                                       finalized_claimed_htlcs,
                                        accepted_htlcs: to_forward_infos,
                                        failed_htlcs: revoked_htlcs,
                                        monitor_update,
@@ -2973,6 +2982,7 @@ impl<Signer: Sign> Channel<Signer> {
                                                        update_fee: None,
                                                        commitment_signed
                                                }),
+                                               finalized_claimed_htlcs,
                                                accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
                                                monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
                                        })
@@ -2980,6 +2990,7 @@ impl<Signer: Sign> Channel<Signer> {
                                        log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
                                        Ok(RAAUpdates {
                                                commitment_update: None,
+                                               finalized_claimed_htlcs,
                                                accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
                                                monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
                                        })
@@ -3097,11 +3108,16 @@ impl<Signer: Sign> Channel<Signer> {
        /// which failed. The messages which were generated from that call which generated the
        /// monitor update failure must *not* have been sent to the remote end, and must instead
        /// have been dropped. They will be regenerated when monitor_updating_restored is called.
-       pub fn monitor_update_failed(&mut self, resend_raa: bool, resend_commitment: bool, mut pending_forwards: Vec<(PendingHTLCInfo, u64)>, mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>) {
+       pub fn monitor_update_failed(&mut self, resend_raa: bool, resend_commitment: bool,
+               mut pending_forwards: Vec<(PendingHTLCInfo, u64)>,
+               mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
+               mut pending_finalized_claimed_htlcs: Vec<HTLCSource>
+       ) {
                self.monitor_pending_revoke_and_ack |= resend_raa;
                self.monitor_pending_commitment_signed |= resend_commitment;
                self.monitor_pending_forwards.append(&mut pending_forwards);
                self.monitor_pending_failures.append(&mut pending_fails);
+               self.monitor_pending_finalized_fulfills.append(&mut pending_finalized_claimed_htlcs);
                self.channel_state |= ChannelState::MonitorUpdateFailed as u32;
        }
 
@@ -3135,13 +3151,15 @@ impl<Signer: Sign> Channel<Signer> {
                mem::swap(&mut accepted_htlcs, &mut self.monitor_pending_forwards);
                let mut failed_htlcs = Vec::new();
                mem::swap(&mut failed_htlcs, &mut self.monitor_pending_failures);
+               let mut finalized_claimed_htlcs = Vec::new();
+               mem::swap(&mut finalized_claimed_htlcs, &mut self.monitor_pending_finalized_fulfills);
 
                if self.channel_state & (ChannelState::PeerDisconnected as u32) != 0 {
                        self.monitor_pending_revoke_and_ack = false;
                        self.monitor_pending_commitment_signed = false;
                        return MonitorRestoreUpdates {
                                raa: None, commitment_update: None, order: RAACommitmentOrder::RevokeAndACKFirst,
-                               accepted_htlcs, failed_htlcs, funding_broadcastable, funding_locked
+                               accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, funding_locked
                        };
                }
 
@@ -3160,7 +3178,7 @@ impl<Signer: Sign> Channel<Signer> {
                        if commitment_update.is_some() { "a" } else { "no" }, if raa.is_some() { "an" } else { "no" },
                        match order { RAACommitmentOrder::CommitmentFirst => "commitment", RAACommitmentOrder::RevokeAndACKFirst => "RAA"});
                MonitorRestoreUpdates {
-                       raa, commitment_update, order, accepted_htlcs, failed_htlcs, funding_broadcastable, funding_locked
+                       raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, funding_locked
                }
        }
 
@@ -5221,6 +5239,7 @@ impl<Signer: Sign> Writeable for Channel<Signer> {
                        (5, self.config, required),
                        (7, self.shutdown_scriptpubkey, option),
                        (9, self.target_closing_feerate_sats_per_kw, option),
+                       (11, self.monitor_pending_finalized_fulfills, vec_type),
                });
 
                Ok(())
@@ -5454,6 +5473,7 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel<Signer>
 
                let mut announcement_sigs = None;
                let mut target_closing_feerate_sats_per_kw = None;
+               let mut monitor_pending_finalized_fulfills = Some(Vec::new());
                read_tlv_fields!(reader, {
                        (0, announcement_sigs, option),
                        (1, minimum_depth, option),
@@ -5461,6 +5481,7 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel<Signer>
                        (5, config, option), // Note that if none is provided we will *not* overwrite the existing one.
                        (7, shutdown_scriptpubkey, option),
                        (9, target_closing_feerate_sats_per_kw, option),
+                       (11, monitor_pending_finalized_fulfills, vec_type),
                });
 
                let mut secp_ctx = Secp256k1::new();
@@ -5496,6 +5517,7 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel<Signer>
                        monitor_pending_commitment_signed,
                        monitor_pending_forwards,
                        monitor_pending_failures,
+                       monitor_pending_finalized_fulfills: monitor_pending_finalized_fulfills.unwrap(),
 
                        pending_update_fee,
                        holding_cell_update_fee,
index 9cbbf32387caf73b1547b03d876bd394ee8ddb20..019d61c57c09a2d3e8f0606a3353a49198cc5425 100644 (file)
@@ -1002,7 +1002,7 @@ macro_rules! handle_monitor_err {
        ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
                handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new())
        };
-       ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $chan_id: expr) => {
+       ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr, $chan_id: expr) => {
                match $err {
                        ChannelMonitorUpdateErr::PermanentFailure => {
                                log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateErr::PermanentFailure", log_bytes!($chan_id[..]));
@@ -1023,7 +1023,7 @@ macro_rules! handle_monitor_err {
                                (res, true)
                        },
                        ChannelMonitorUpdateErr::TemporaryFailure => {
-                               log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards and {} fails",
+                               log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards, {} fails, and {} fulfill finalizations",
                                                log_bytes!($chan_id[..]),
                                                if $resend_commitment && $resend_raa {
                                                                match $action_type {
@@ -1034,25 +1034,29 @@ macro_rules! handle_monitor_err {
                                                        else if $resend_raa { "RAA" }
                                                        else { "nothing" },
                                                (&$failed_forwards as &Vec<(PendingHTLCInfo, u64)>).len(),
-                                               (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len());
+                                               (&$failed_fails as &Vec<(HTLCSource, PaymentHash, HTLCFailReason)>).len(),
+                                               (&$failed_finalized_fulfills as &Vec<HTLCSource>).len());
                                if !$resend_commitment {
                                        debug_assert!($action_type == RAACommitmentOrder::RevokeAndACKFirst || !$resend_raa);
                                }
                                if !$resend_raa {
                                        debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
                                }
-                               $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails);
+                               $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $failed_finalized_fulfills);
                                (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
                        },
                }
        };
-       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { {
-               let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $entry.key());
+       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $failed_finalized_fulfills: expr) => { {
+               let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $failed_finalized_fulfills, $entry.key());
                if drop {
                        $entry.remove_entry();
                }
                res
        } };
+       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
+               handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, Vec::new());
+       }
 }
 
 macro_rules! return_monitor_err {
@@ -1441,7 +1445,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if let Some(monitor_update) = monitor_update {
                                                if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
                                                        let (result, is_permanent) =
-                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), Vec::new(), chan_entry.key());
                                                        if is_permanent {
                                                                remove_channel!(channel_state, chan_entry);
                                                                break result;
@@ -2846,7 +2850,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                let ret_err = match res {
                        Ok(Some((update_fee, commitment_signed, monitor_update))) => {
                                if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
-                                       let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), chan_id);
+                                       let (res, drop) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), Vec::new(), chan_id);
                                        if drop { retain_channel = false; }
                                        res
                                } else {
@@ -3430,6 +3434,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        msg: self.get_channel_update_for_unicast(channel.get()).unwrap(),
                                })
                        } else { None };
+                       // TODO: Handle updates.finalized_claimed_htlcs!
                        chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, updates.raa, updates.commitment_update, updates.order, None, updates.accepted_htlcs, updates.funding_broadcastable, updates.funding_locked);
                        if let Some(upd) = channel_update {
                                channel_state.pending_msg_events.push(upd);
@@ -3525,7 +3530,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
                                        // accepted payment from yet. We do, however, need to wait to send our funding_locked
                                        // until we have persisted our monitor.
-                                       chan.monitor_update_failed(false, false, Vec::new(), Vec::new());
+                                       chan.monitor_update_failed(false, false, Vec::new(), Vec::new(), Vec::new());
                                },
                        }
                }
@@ -3643,7 +3648,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        if let Some(monitor_update) = monitor_update {
                                                if let Err(e) = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), monitor_update) {
                                                        let (result, is_permanent) =
-                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), chan_entry.key());
+                                                               handle_monitor_err!(self, e, channel_state.short_to_id, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, false, false, Vec::new(), Vec::new(), Vec::new(), chan_entry.key());
                                                        if is_permanent {
                                                                remove_channel!(channel_state, chan_entry);
                                                                break result;
@@ -3938,12 +3943,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                        assert!(raa_updates.commitment_update.is_none());
                                                        assert!(raa_updates.accepted_htlcs.is_empty());
                                                        assert!(raa_updates.failed_htlcs.is_empty());
+                                                       assert!(raa_updates.finalized_claimed_htlcs.is_empty());
                                                        break Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned()));
                                                } else {
                                                        if let Err(e) = handle_monitor_err!(self, e, channel_state, chan,
                                                                        RAACommitmentOrder::CommitmentFirst, false,
                                                                        raa_updates.commitment_update.is_some(),
-                                                                       raa_updates.accepted_htlcs, raa_updates.failed_htlcs) {
+                                                                       raa_updates.accepted_htlcs, raa_updates.failed_htlcs,
+                                                                       raa_updates.finalized_claimed_htlcs) {
                                                                break Err(e);
                                                        } else { unreachable!(); }
                                                }
@@ -4197,7 +4204,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                if let Some((commitment_update, monitor_update)) = commitment_opt {
                                                        if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
                                                                has_monitor_update = true;
-                                                               let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id);
+                                                               let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), Vec::new(), channel_id);
                                                                handle_errors.push((chan.get_counterparty_node_id(), res));
                                                                if close_channel { return false; }
                                                        } else {