Holding cell: if we fail to free an HTLC, fail it backwards
authorValentine Wallace <vwallace@protonmail.com>
Wed, 6 May 2020 22:15:43 +0000 (18:15 -0400)
committerValentine Wallace <vwallace@protonmail.com>
Sat, 8 Aug 2020 20:32:15 +0000 (16:32 -0400)
Plus add a test.

lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_tests.rs

index 719d03c6e37dfa3816a5099057126b7e44560d32..c05ca2a8f16656dd03a3ca2fb531859175b71a39 100644 (file)
@@ -2118,7 +2118,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
 
        /// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them
        /// fulfilling or failing the last pending HTLC)
-       fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
+       fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
                assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, 0);
                if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
                        log_trace!(logger, "Freeing holding cell with {} HTLC updates{}", self.holding_cell_htlc_updates.len(), if self.holding_cell_update_fee.is_some() { " and a fee update" } else { "" });
@@ -2133,110 +2133,94 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        let mut update_add_htlcs = Vec::with_capacity(htlc_updates.len());
                        let mut update_fulfill_htlcs = Vec::with_capacity(htlc_updates.len());
                        let mut update_fail_htlcs = Vec::with_capacity(htlc_updates.len());
-                       let mut err = None;
+                       let mut htlcs_to_fail = Vec::new();
                        for htlc_update in htlc_updates.drain(..) {
                                // Note that this *can* fail, though it should be due to rather-rare conditions on
                                // fee races with adding too many outputs which push our total payments just over
                                // the limit. In case it's less rare than I anticipate, we may want to revisit
                                // handling this case better and maybe fulfilling some of the HTLCs while attempting
                                // to rebalance channels.
-                               if err.is_some() { // We're back to AwaitingRemoteRevoke (or are about to fail the channel)
-                                       self.holding_cell_htlc_updates.push(htlc_update);
-                               } else {
-                                       match &htlc_update {
-                                               &HTLCUpdateAwaitingACK::AddHTLC {amount_msat, cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, ..} => {
-                                                       match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone()) {
-                                                               Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()),
-                                                               Err(e) => {
-                                                                       match e {
-                                                                               ChannelError::Ignore(ref msg) => {
-                                                                                       log_info!(logger, "Failed to send HTLC with payment_hash {} due to {}", log_bytes!(payment_hash.0), msg);
-                                                                               },
-                                                                               _ => {
-                                                                                       log_info!(logger, "Failed to send HTLC with payment_hash {} resulting in a channel closure during holding_cell freeing", log_bytes!(payment_hash.0));
-                                                                               },
-                                                                       }
-                                                                       err = Some(e);
+                               match &htlc_update {
+                                       &HTLCUpdateAwaitingACK::AddHTLC {amount_msat, cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, ..} => {
+                                               match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone()) {
+                                                       Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()),
+                                                       Err(e) => {
+                                                               match e {
+                                                                       ChannelError::Ignore(ref msg) => {
+                                                                               log_info!(logger, "Failed to send HTLC with payment_hash {} due to {}", log_bytes!(payment_hash.0), msg);
+                                                                               // If we fail to send here, then this HTLC should
+                                                                               // be failed backwards. Failing to send here
+                                                                               // indicates that this HTLC may keep being put back
+                                                                               // into the holding cell without ever being
+                                                                               // successfully forwarded/failed/fulfilled, causing
+                                                                               // our counterparty to eventually close on us.
+                                                                               htlcs_to_fail.push((source.clone(), *payment_hash));
+                                                                       },
+                                                                       _ => {
+                                                                               panic!("Got a non-IgnoreError action trying to send holding cell HTLC");
+                                                                       },
                                                                }
                                                        }
-                                               },
-                                               &HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_preimage, htlc_id, .. } => {
-                                                       match self.get_update_fulfill_htlc(htlc_id, *payment_preimage, logger) {
-                                                               Ok((update_fulfill_msg_option, additional_monitor_update_opt)) => {
-                                                                       update_fulfill_htlcs.push(update_fulfill_msg_option.unwrap());
-                                                                       if let Some(mut additional_monitor_update) = additional_monitor_update_opt {
-                                                                               monitor_update.updates.append(&mut additional_monitor_update.updates);
-                                                                       }
-                                                               },
-                                                               Err(e) => {
-                                                                       if let ChannelError::Ignore(_) = e {}
-                                                                       else {
-                                                                               panic!("Got a non-IgnoreError action trying to fulfill holding cell HTLC");
-                                                                       }
+                                               }
+                                       },
+                                       &HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_preimage, htlc_id, .. } => {
+                                               match self.get_update_fulfill_htlc(htlc_id, *payment_preimage, logger) {
+                                                       Ok((update_fulfill_msg_option, additional_monitor_update_opt)) => {
+                                                               update_fulfill_htlcs.push(update_fulfill_msg_option.unwrap());
+                                                               if let Some(mut additional_monitor_update) = additional_monitor_update_opt {
+                                                                       monitor_update.updates.append(&mut additional_monitor_update.updates);
+                                                               }
+                                                       },
+                                                       Err(e) => {
+                                                               if let ChannelError::Ignore(_) = e {}
+                                                               else {
+                                                                       panic!("Got a non-IgnoreError action trying to fulfill holding cell HTLC");
                                                                }
                                                        }
-                                               },
-                                               &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => {
-                                                       match self.get_update_fail_htlc(htlc_id, err_packet.clone()) {
-                                                               Ok(update_fail_msg_option) => update_fail_htlcs.push(update_fail_msg_option.unwrap()),
-                                                               Err(e) => {
-                                                                       if let ChannelError::Ignore(_) = e {}
-                                                                       else {
-                                                                               panic!("Got a non-IgnoreError action trying to fail holding cell HTLC");
-                                                                       }
+                                               }
+                                       },
+                                       &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => {
+                                               match self.get_update_fail_htlc(htlc_id, err_packet.clone()) {
+                                                       Ok(update_fail_msg_option) => update_fail_htlcs.push(update_fail_msg_option.unwrap()),
+                                                       Err(e) => {
+                                                               if let ChannelError::Ignore(_) = e {}
+                                                               else {
+                                                                       panic!("Got a non-IgnoreError action trying to fail holding cell HTLC");
                                                                }
                                                        }
-                                               },
-                                       }
-                                       if err.is_some() {
-                                               self.holding_cell_htlc_updates.push(htlc_update);
-                                               if let Some(ChannelError::Ignore(_)) = err {
-                                                       // If we failed to add the HTLC, but got an Ignore error, we should
-                                                       // still send the new commitment_signed, so reset the err to None.
-                                                       err = None;
                                                }
-                                       }
+                                       },
                                }
                        }
-                       //TODO: Need to examine the type of err - if it's a fee issue or similar we may want to
-                       //fail it back the route, if it's a temporary issue we can ignore it...
-                       match err {
-                               None => {
-                                       if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
-                                               // This should never actually happen and indicates we got some Errs back
-                                               // from update_fulfill_htlc/update_fail_htlc, but we handle it anyway in
-                                               // case there is some strange way to hit duplicate HTLC removes.
-                                               return Ok(None);
-                                       }
-                                       let update_fee = if let Some(feerate) = self.holding_cell_update_fee {
-                                                       self.pending_update_fee = self.holding_cell_update_fee.take();
-                                                       Some(msgs::UpdateFee {
-                                                               channel_id: self.channel_id,
-                                                               feerate_per_kw: feerate as u32,
-                                                       })
-                                               } else {
-                                                       None
-                                               };
+                       if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
+                               return Ok((None, htlcs_to_fail));
+                       }
+                       let update_fee = if let Some(feerate) = self.holding_cell_update_fee {
+                               self.pending_update_fee = self.holding_cell_update_fee.take();
+                               Some(msgs::UpdateFee {
+                                       channel_id: self.channel_id,
+                                       feerate_per_kw: feerate as u32,
+                               })
+                       } else {
+                               None
+                       };
 
-                                       let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
-                                       // send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
-                                       // but we want them to be strictly increasing by one, so reset it here.
-                                       self.latest_monitor_update_id = monitor_update.update_id;
-                                       monitor_update.updates.append(&mut additional_update.updates);
+                       let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
+                       // send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
+                       // but we want them to be strictly increasing by one, so reset it here.
+                       self.latest_monitor_update_id = monitor_update.update_id;
+                       monitor_update.updates.append(&mut additional_update.updates);
 
-                                       Ok(Some((msgs::CommitmentUpdate {
-                                               update_add_htlcs,
-                                               update_fulfill_htlcs,
-                                               update_fail_htlcs,
-                                               update_fail_malformed_htlcs: Vec::new(),
-                                               update_fee: update_fee,
-                                               commitment_signed,
-                                       }, monitor_update)))
-                               },
-                               Some(e) => Err(e)
-                       }
+                       Ok((Some((msgs::CommitmentUpdate {
+                               update_add_htlcs,
+                               update_fulfill_htlcs,
+                               update_fail_htlcs,
+                               update_fail_malformed_htlcs: Vec::new(),
+                               update_fee: update_fee,
+                               commitment_signed,
+                       }, monitor_update)), htlcs_to_fail))
                } else {
-                       Ok(None)
+                       Ok((None, Vec::new()))
                }
        }
 
@@ -2245,7 +2229,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
        /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
        /// generating an appropriate error *after* the channel state has been updated based on the
        /// revoke_and_ack message.
-       pub fn revoke_and_ack<F: Deref, L: Deref>(&mut self, msg: &msgs::RevokeAndACK, fee_estimator: &F, logger: &L) -> Result<(Option<msgs::CommitmentUpdate>, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Option<msgs::ClosingSigned>, ChannelMonitorUpdate), ChannelError>
+       pub fn revoke_and_ack<F: Deref, L: Deref>(&mut self, msg: &msgs::RevokeAndACK, fee_estimator: &F, logger: &L) -> Result<(Option<msgs::CommitmentUpdate>, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Option<msgs::ClosingSigned>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>), ChannelError>
                where F::Target: FeeEstimator,
                                        L::Target: Logger,
        {
@@ -2420,11 +2404,11 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        }
                        self.monitor_pending_forwards.append(&mut to_forward_infos);
                        self.monitor_pending_failures.append(&mut revoked_htlcs);
-                       return Ok((None, Vec::new(), Vec::new(), None, monitor_update))
+                       return Ok((None, Vec::new(), Vec::new(), None, monitor_update, Vec::new()))
                }
 
                match self.free_holding_cell_htlcs(logger)? {
-                       Some((mut commitment_update, mut additional_update)) => {
+                       (Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
                                commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
                                for fail_msg in update_fail_htlcs.drain(..) {
                                        commitment_update.update_fail_htlcs.push(fail_msg);
@@ -2439,9 +2423,9 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                                self.latest_monitor_update_id = monitor_update.update_id;
                                monitor_update.updates.append(&mut additional_update.updates);
 
-                               Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, None, monitor_update))
+                               Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, None, monitor_update, htlcs_to_fail))
                        },
-                       None => {
+                       (None, htlcs_to_fail) => {
                                if require_commitment {
                                        let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
 
@@ -2457,9 +2441,9 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                                                update_fail_malformed_htlcs,
                                                update_fee: None,
                                                commitment_signed
-                                       }), to_forward_infos, revoked_htlcs, None, monitor_update))
+                                       }), to_forward_infos, revoked_htlcs, None, monitor_update, htlcs_to_fail))
                                } else {
-                                       Ok((None, to_forward_infos, revoked_htlcs, self.maybe_propose_first_closing_signed(fee_estimator), monitor_update))
+                                       Ok((None, to_forward_infos, revoked_htlcs, self.maybe_propose_first_closing_signed(fee_estimator), monitor_update, htlcs_to_fail))
                                }
                        }
                }
@@ -2561,6 +2545,11 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
 
                self.holding_cell_htlc_updates.retain(|htlc_update| {
                        match htlc_update {
+                               // Note that currently on channel reestablish we assert that there are
+                               // no holding cell HTLC update_adds, so if in the future we stop
+                               // dropping added HTLCs here and failing them backwards, then there will
+                               // need to be corresponding changes made in the Channel's re-establish
+                               // logic.
                                &HTLCUpdateAwaitingACK::AddHTLC { ref payment_hash, ref source, .. } => {
                                        outbound_drops.push((source.clone(), payment_hash.clone()));
                                        false
@@ -2828,6 +2817,14 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        }
 
                        if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateFailed as u32)) == 0 {
+                               // Note that if in the future we no longer drop holding cell update_adds on peer
+                               // disconnect, this logic will need to be updated.
+                               for htlc_update in self.holding_cell_htlc_updates.iter() {
+                                       if let &HTLCUpdateAwaitingACK::AddHTLC { .. } = htlc_update {
+                                               debug_assert!(false, "There shouldn't be any add-HTLCs in the holding cell now because they should have been dropped on peer disconnect. Panic here because said HTLCs won't be handled correctly.");
+                                       }
+                               }
+
                                // We're up-to-date and not waiting on a remote revoke (if we are our
                                // channel_reestablish should result in them sending a revoke_and_ack), but we may
                                // have received some updates while we were disconnected. Free the holding cell
@@ -2835,8 +2832,18 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                                match self.free_holding_cell_htlcs(logger) {
                                        Err(ChannelError::Close(msg)) => return Err(ChannelError::Close(msg)),
                                        Err(ChannelError::Ignore(_)) | Err(ChannelError::CloseDelayBroadcast(_)) => panic!("Got non-channel-failing result from free_holding_cell_htlcs"),
-                                       Ok(Some((commitment_update, monitor_update))) => return Ok((resend_funding_locked, required_revoke, Some(commitment_update), Some(monitor_update), self.resend_order.clone(), shutdown_msg)),
-                                       Ok(None) => return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg)),
+                                       Ok((Some((commitment_update, monitor_update)), htlcs_to_fail)) => {
+                                               // If in the future we no longer drop holding cell update_adds on peer
+                                               // disconnect, we may be handed some HTLCs to fail backwards here.
+                                               assert!(htlcs_to_fail.is_empty());
+                                               return Ok((resend_funding_locked, required_revoke, Some(commitment_update), Some(monitor_update), self.resend_order.clone(), shutdown_msg));
+                                       },
+                                       Ok((None, htlcs_to_fail)) => {
+                                               // If in the future we no longer drop holding cell update_adds on peer
+                                               // disconnect, we may be handed some HTLCs to fail backwards here.
+                                               assert!(htlcs_to_fail.is_empty());
+                                               return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg));
+                                       },
                                }
                        } else {
                                return Ok((resend_funding_locked, required_revoke, None, None, self.resend_order.clone(), shutdown_msg));
index 5466b3d23c8c96dafd651003ce85c268964d632a..0985044c77e85281e7d1d47db3d76670c3897109 100644 (file)
@@ -1822,6 +1822,44 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                } else { false }
        }
 
+       // Fail a list of HTLCs that were just freed from the holding cell. The HTLCs need to be
+       // failed backwards or, if they were one of our outgoing HTLCs, then their failure needs to
+       // be surfaced to the user.
+       fn fail_holding_cell_htlcs(&self, mut htlcs_to_fail: Vec<(HTLCSource, PaymentHash)>, channel_id: [u8; 32]) {
+               for (htlc_src, payment_hash) in htlcs_to_fail.drain(..) {
+                       match htlc_src {
+                               HTLCSource::PreviousHopData(HTLCPreviousHopData { .. }) => {
+                                       let (failure_code, onion_failure_data) =
+                                               match self.channel_state.lock().unwrap().by_id.entry(channel_id) {
+                                                       hash_map::Entry::Occupied(chan_entry) => {
+                                                               if let Ok(upd) = self.get_channel_update(&chan_entry.get()) {
+                                                                       (0x1000|7, upd.encode_with_len())
+                                                               } else {
+                                                                       (0x4000|10, Vec::new())
+                                                               }
+                                                       },
+                                                       hash_map::Entry::Vacant(_) => (0x4000|10, Vec::new())
+                                               };
+                                       let channel_state = self.channel_state.lock().unwrap();
+                                       self.fail_htlc_backwards_internal(channel_state,
+                                               htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code, data: onion_failure_data});
+                               },
+                               HTLCSource::OutboundRoute { .. } => {
+                                       self.pending_events.lock().unwrap().push(
+                                               events::Event::PaymentFailed {
+                                                       payment_hash,
+                                                       rejected_by_dest: false,
+#[cfg(test)]
+                                                       error_code: None,
+#[cfg(test)]
+                                                       error_data: None,
+                                               }
+                                       )
+                               },
+                       };
+               }
+       }
+
        /// Fails an HTLC backwards to the sender of it to us.
        /// Note that while we take a channel_state lock as input, we do *not* assume consistency here.
        /// There are several callsites that do stupid things like loop over a list of payment_hashes
@@ -2670,23 +2708,27 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
        }
 
        fn internal_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
-               let (pending_forwards, mut pending_failures, short_channel_id) = {
+               let mut htlcs_to_fail = Vec::new();
+               let res = loop {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_state_lock;
                        match channel_state.by_id.entry(msg.channel_id) {
                                hash_map::Entry::Occupied(mut chan) => {
                                        if chan.get().get_their_node_id() != *their_node_id {
-                                               return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
+                                               break Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
                                        let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
-                                       let (commitment_update, pending_forwards, pending_failures, closing_signed, monitor_update) =
-                                               try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), channel_state, chan);
+                                       let (commitment_update, pending_forwards, pending_failures, closing_signed, monitor_update, htlcs_to_fail_in) =
+                                               break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), channel_state, chan);
+                                       htlcs_to_fail = htlcs_to_fail_in;
                                        if let Err(e) = self.monitor.update_monitor(chan.get().get_funding_txo().unwrap(), monitor_update) {
                                                if was_frozen_for_monitor {
                                                        assert!(commitment_update.is_none() && closing_signed.is_none() && pending_forwards.is_empty() && pending_failures.is_empty());
-                                                       return Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned()));
+                                                       break Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned()));
                                                } else {
-                                                       return_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, commitment_update.is_some(), pending_forwards, pending_failures);
+                                                       if let Err(e) = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, commitment_update.is_some(), pending_forwards, pending_failures) {
+                                                               break Err(e);
+                                                       } else { unreachable!(); }
                                                }
                                        }
                                        if let Some(updates) = commitment_update {
@@ -2701,17 +2743,22 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                                        msg,
                                                });
                                        }
-                                       (pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"))
+                                       break Ok((pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel")))
                                },
-                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
+                               hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
                        }
                };
-               for failure in pending_failures.drain(..) {
-                       self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
+               self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id);
+               match res {
+                       Ok((pending_forwards, mut pending_failures, short_channel_id)) => {
+                               for failure in pending_failures.drain(..) {
+                                       self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
+                               }
+                               self.forward_htlcs(&mut [(short_channel_id, pending_forwards)]);
+                               Ok(())
+                       },
+                       Err(e) => Err(e)
                }
-               self.forward_htlcs(&mut [(short_channel_id, pending_forwards)]);
-
-               Ok(())
        }
 
        fn internal_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), MsgHandleErrInternal> {
@@ -2792,6 +2839,10 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
                                if chan.get().get_their_node_id() != *their_node_id {
                                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                }
+                               // Currently, we expect all holding cell update_adds to be dropped on peer
+                               // disconnect, so Channel's reestablish will never hand us any holding cell
+                               // freed HTLCs to fail backwards. If in the future we no longer drop pending
+                               // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here.
                                let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, mut order, shutdown) =
                                        try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan);
                                if let Some(monitor_update) = monitor_update_opt {
@@ -3258,6 +3309,10 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates", log_pubkey!(their_node_id));
                                channel_state.by_id.retain(|_, chan| {
                                        if chan.get_their_node_id() == *their_node_id {
+                                               // Note that currently on channel reestablish we assert that there are no
+                                               // holding cell add-HTLCs, so if in the future we stop removing uncommitted HTLCs
+                                               // on peer disconnect here, there will need to be corresponding changes in
+                                               // reestablish logic.
                                                let failed_adds = chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
                                                chan.to_disabled_marked();
                                                if !failed_adds.is_empty() {
index 240a5741b91c72fcc017254eac17038ee035909d..d1ccdcc848fe9b14dec9f5447f38f07ad3036ac5 100644 (file)
@@ -6387,6 +6387,342 @@ fn bolt2_open_channel_sending_node_checks_part2() {
        assert!(PublicKey::from_slice(&node0_to_1_send_open_channel.delayed_payment_basepoint.serialize()).is_ok());
 }
 
+// Test that if we fail to send an HTLC that is being freed from the holding cell, and the HTLC
+// originated from our node, its failure is surfaced to the user. We trigger this failure to
+// free the HTLC by increasing our fee while the HTLC is in the holding cell such that the HTLC
+// is no longer affordable once it's freed.
+#[test]
+fn test_fail_holding_cell_htlc_upon_free() {
+       let chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+       let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+       let chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 95000000, InitFeatures::known(), InitFeatures::known());
+       let logger = test_utils::TestLogger::new();
+
+       // First nodes[0] generates an update_fee, setting the channel's
+       // pending_update_fee.
+       nodes[0].node.update_fee(chan.2, get_feerate!(nodes[0], chan.2) + 20).unwrap();
+       check_added_monitors!(nodes[0], 1);
+
+       let events = nodes[0].node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), 1);
+       let (update_msg, commitment_signed) = match events[0] {
+               MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
+                       (update_fee.as_ref(), commitment_signed)
+               },
+               _ => panic!("Unexpected event"),
+       };
+
+       nodes[1].node.handle_update_fee(&nodes[0].node.get_our_node_id(), update_msg.unwrap());
+
+       let mut chan_stat = get_channel_value_stat!(nodes[0], chan.2);
+       let channel_reserve = chan_stat.channel_reserve_msat;
+       let feerate = get_feerate!(nodes[0], chan.2);
+
+       // 2* and +1 HTLCs on the commit tx fee calculation for the fee spike reserve.
+       let (_, our_payment_hash) = get_payment_preimage_hash!(nodes[0]);
+       let max_can_send = 5000000 - channel_reserve - 2*commit_tx_fee_msat(feerate, 1 + 1);
+       let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
+       let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), None, &[], max_can_send, TEST_FINAL_CLTV, &logger).unwrap();
+
+       // Send a payment which passes reserve checks but gets stuck in the holding cell.
+       nodes[0].node.send_payment(&route, our_payment_hash, &None).unwrap();
+       chan_stat = get_channel_value_stat!(nodes[0], chan.2);
+       assert_eq!(chan_stat.holding_cell_outbound_amount_msat, max_can_send);
+
+       // Flush the pending fee update.
+       nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), commitment_signed);
+       let (as_revoke_and_ack, _) = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
+       check_added_monitors!(nodes[1], 1);
+       nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &as_revoke_and_ack);
+       check_added_monitors!(nodes[0], 1);
+
+       // Upon receipt of the RAA, there will be an attempt to resend the holding cell
+       // HTLC, but now that the fee has been raised the payment will now fail, causing
+       // us to surface its failure to the user.
+       chan_stat = get_channel_value_stat!(nodes[0], chan.2);
+       assert_eq!(chan_stat.holding_cell_outbound_amount_msat, 0);
+       nodes[0].logger.assert_log("lightning::ln::channel".to_string(), "Freeing holding cell with 1 HTLC updates".to_string(), 1);
+       let failure_log = format!("Failed to send HTLC with payment_hash {} due to Cannot send value that would put us under local channel reserve value ({})", log_bytes!(our_payment_hash.0), chan_stat.channel_reserve_msat);
+       nodes[0].logger.assert_log("lightning::ln::channel".to_string(), failure_log.to_string(), 1);
+
+       // Check that the payment failed to be sent out.
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 1);
+       match &events[0] {
+               &Event::PaymentFailed { ref payment_hash, ref rejected_by_dest, ref error_code, ref error_data } => {
+                       assert_eq!(our_payment_hash.clone(), *payment_hash);
+                       assert_eq!(*rejected_by_dest, false);
+                       assert_eq!(*error_code, None);
+                       assert_eq!(*error_data, None);
+               },
+               _ => panic!("Unexpected event"),
+       }
+}
+
+// Test that if multiple HTLCs are released from the holding cell and one is
+// valid but the other is no longer valid upon release, the valid HTLC can be
+// successfully completed while the other one fails as expected.
+#[test]
+fn test_free_and_fail_holding_cell_htlcs() {
+       let chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+       let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+       let chan = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 95000000, InitFeatures::known(), InitFeatures::known());
+       let logger = test_utils::TestLogger::new();
+
+       // First nodes[0] generates an update_fee, setting the channel's
+       // pending_update_fee.
+       nodes[0].node.update_fee(chan.2, get_feerate!(nodes[0], chan.2) + 200).unwrap();
+       check_added_monitors!(nodes[0], 1);
+
+       let events = nodes[0].node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), 1);
+       let (update_msg, commitment_signed) = match events[0] {
+               MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
+                       (update_fee.as_ref(), commitment_signed)
+               },
+               _ => panic!("Unexpected event"),
+       };
+
+       nodes[1].node.handle_update_fee(&nodes[0].node.get_our_node_id(), update_msg.unwrap());
+
+       let mut chan_stat = get_channel_value_stat!(nodes[0], chan.2);
+       let channel_reserve = chan_stat.channel_reserve_msat;
+       let feerate = get_feerate!(nodes[0], chan.2);
+
+       // 2* and +1 HTLCs on the commit tx fee calculation for the fee spike reserve.
+       let (payment_preimage_1, payment_hash_1) = get_payment_preimage_hash!(nodes[0]);
+       let amt_1 = 20000;
+       let (_, payment_hash_2) = get_payment_preimage_hash!(nodes[0]);
+       let amt_2 = 5000000 - channel_reserve - 2*commit_tx_fee_msat(feerate, 2 + 1) - amt_1;
+       let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
+       let route_1 = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), None, &[], amt_1, TEST_FINAL_CLTV, &logger).unwrap();
+       let route_2 = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[1].node.get_our_node_id(), None, &[], amt_2, TEST_FINAL_CLTV, &logger).unwrap();
+
+       // Send 2 payments which pass reserve checks but get stuck in the holding cell.
+       nodes[0].node.send_payment(&route_1, payment_hash_1, &None).unwrap();
+       chan_stat = get_channel_value_stat!(nodes[0], chan.2);
+       assert_eq!(chan_stat.holding_cell_outbound_amount_msat, amt_1);
+       nodes[0].node.send_payment(&route_2, payment_hash_2, &None).unwrap();
+       chan_stat = get_channel_value_stat!(nodes[0], chan.2);
+       assert_eq!(chan_stat.holding_cell_outbound_amount_msat, amt_1 + amt_2);
+
+       // Flush the pending fee update.
+       nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), commitment_signed);
+       let (revoke_and_ack, commitment_signed) = get_revoke_commit_msgs!(nodes[1], nodes[0].node.get_our_node_id());
+       check_added_monitors!(nodes[1], 1);
+       nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &revoke_and_ack);
+       nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &commitment_signed);
+       check_added_monitors!(nodes[0], 2);
+
+       // Upon receipt of the RAA, there will be an attempt to resend the holding cell HTLCs,
+       // but now that the fee has been raised the second payment will now fail, causing us
+       // to surface its failure to the user. The first payment should succeed.
+       chan_stat = get_channel_value_stat!(nodes[0], chan.2);
+       assert_eq!(chan_stat.holding_cell_outbound_amount_msat, 0);
+       nodes[0].logger.assert_log("lightning::ln::channel".to_string(), "Freeing holding cell with 2 HTLC updates".to_string(), 1);
+       let failure_log = format!("Failed to send HTLC with payment_hash {} due to Cannot send value that would put us under local channel reserve value ({})", log_bytes!(payment_hash_2.0), chan_stat.channel_reserve_msat);
+       nodes[0].logger.assert_log("lightning::ln::channel".to_string(), failure_log.to_string(), 1);
+
+       // Check that the second payment failed to be sent out.
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 1);
+       match &events[0] {
+               &Event::PaymentFailed { ref payment_hash, ref rejected_by_dest, ref error_code, ref error_data } => {
+                       assert_eq!(payment_hash_2.clone(), *payment_hash);
+                       assert_eq!(*rejected_by_dest, false);
+                       assert_eq!(*error_code, None);
+                       assert_eq!(*error_data, None);
+               },
+               _ => panic!("Unexpected event"),
+       }
+
+       // Complete the first payment and the RAA from the fee update.
+       let (payment_event, send_raa_event) = {
+               let mut msgs = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(msgs.len(), 2);
+               (SendEvent::from_event(msgs.remove(0)), msgs.remove(0))
+       };
+       let raa = match send_raa_event {
+               MessageSendEvent::SendRevokeAndACK { msg, .. } => msg,
+               _ => panic!("Unexpected event"),
+       };
+       nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &raa);
+       check_added_monitors!(nodes[1], 1);
+       nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
+       commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false);
+       let events = nodes[1].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 1);
+       match events[0] {
+               Event::PendingHTLCsForwardable { .. } => {},
+               _ => panic!("Unexpected event"),
+       }
+       nodes[1].node.process_pending_htlc_forwards();
+       let events = nodes[1].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 1);
+       match events[0] {
+               Event::PaymentReceived { .. } => {},
+               _ => panic!("Unexpected event"),
+       }
+       nodes[1].node.claim_funds(payment_preimage_1, &None, amt_1);
+       check_added_monitors!(nodes[1], 1);
+       let update_msgs = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &update_msgs.update_fulfill_htlcs[0]);
+       commitment_signed_dance!(nodes[0], nodes[1], update_msgs.commitment_signed, false, true);
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 1);
+       match events[0] {
+               Event::PaymentSent { ref payment_preimage } => {
+                       assert_eq!(*payment_preimage, payment_preimage_1);
+               }
+               _ => panic!("Unexpected event"),
+       }
+}
+
+// Test that if we fail to forward an HTLC that is being freed from the holding cell that the
+// HTLC is failed backwards. We trigger this failure to forward the freed HTLC by increasing
+// our fee while the HTLC is in the holding cell such that the HTLC is no longer affordable
+// once it's freed.
+#[test]
+fn test_fail_holding_cell_htlc_upon_free_multihop() {
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+       let chan_0_1 = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 95000000, InitFeatures::known(), InitFeatures::known());
+       let chan_1_2 = create_announced_chan_between_nodes_with_value(&nodes, 1, 2, 100000, 95000000, InitFeatures::known(), InitFeatures::known());
+       let logger = test_utils::TestLogger::new();
+
+       // First nodes[1] generates an update_fee, setting the channel's
+       // pending_update_fee.
+       nodes[1].node.update_fee(chan_1_2.2, get_feerate!(nodes[1], chan_1_2.2) + 20).unwrap();
+       check_added_monitors!(nodes[1], 1);
+
+       let events = nodes[1].node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), 1);
+       let (update_msg, commitment_signed) = match events[0] {
+               MessageSendEvent::UpdateHTLCs { updates: msgs::CommitmentUpdate { ref update_fee, ref commitment_signed, .. }, .. } => {
+                       (update_fee.as_ref(), commitment_signed)
+               },
+               _ => panic!("Unexpected event"),
+       };
+
+       nodes[2].node.handle_update_fee(&nodes[1].node.get_our_node_id(), update_msg.unwrap());
+
+       let mut chan_stat = get_channel_value_stat!(nodes[0], chan_0_1.2);
+       let channel_reserve = chan_stat.channel_reserve_msat;
+       let feerate = get_feerate!(nodes[0], chan_0_1.2);
+
+       // Send a payment which passes reserve checks but gets stuck in the holding cell.
+       let feemsat = 239;
+       let total_routing_fee_msat = (nodes.len() - 2) as u64 * feemsat;
+       let (_, our_payment_hash) = get_payment_preimage_hash!(nodes[0]);
+       let max_can_send = 5000000 - channel_reserve - 2*commit_tx_fee_msat(feerate, 1 + 1) - total_routing_fee_msat;
+       let payment_event = {
+               let net_graph_msg_handler = &nodes[0].net_graph_msg_handler;
+               let route = get_route(&nodes[0].node.get_our_node_id(), &net_graph_msg_handler.network_graph.read().unwrap(), &nodes[2].node.get_our_node_id(), None, &[], max_can_send, TEST_FINAL_CLTV, &logger).unwrap();
+               nodes[0].node.send_payment(&route, our_payment_hash, &None).unwrap();
+               check_added_monitors!(nodes[0], 1);
+
+               let mut events = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(events.len(), 1);
+
+               SendEvent::from_event(events.remove(0))
+       };
+       nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &payment_event.msgs[0]);
+       check_added_monitors!(nodes[1], 0);
+       commitment_signed_dance!(nodes[1], nodes[0], payment_event.commitment_msg, false);
+       expect_pending_htlcs_forwardable!(nodes[1]);
+
+       chan_stat = get_channel_value_stat!(nodes[1], chan_1_2.2);
+       assert_eq!(chan_stat.holding_cell_outbound_amount_msat, max_can_send);
+
+       // Flush the pending fee update.
+       nodes[2].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), commitment_signed);
+       let (raa, commitment_signed) = get_revoke_commit_msgs!(nodes[2], nodes[1].node.get_our_node_id());
+       check_added_monitors!(nodes[2], 1);
+       nodes[1].node.handle_revoke_and_ack(&nodes[2].node.get_our_node_id(), &raa);
+       nodes[1].node.handle_commitment_signed(&nodes[2].node.get_our_node_id(), &commitment_signed);
+       check_added_monitors!(nodes[1], 2);
+
+       // A final RAA message is generated to finalize the fee update.
+       let events = nodes[1].node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), 1);
+
+       let raa_msg = match &events[0] {
+               &MessageSendEvent::SendRevokeAndACK { ref msg, .. } => {
+                       msg.clone()
+               },
+               _ => panic!("Unexpected event"),
+       };
+
+       nodes[2].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &raa_msg);
+       check_added_monitors!(nodes[2], 1);
+       assert!(nodes[2].node.get_and_clear_pending_msg_events().is_empty());
+
+       // nodes[1]'s ChannelManager will now signal that we have HTLC forwards to process.
+       let process_htlc_forwards_event = nodes[1].node.get_and_clear_pending_events();
+       assert_eq!(process_htlc_forwards_event.len(), 1);
+       match &process_htlc_forwards_event[0] {
+               &Event::PendingHTLCsForwardable { .. } => {},
+               _ => panic!("Unexpected event"),
+       }
+
+       // In response, we call ChannelManager's process_pending_htlc_forwards
+       nodes[1].node.process_pending_htlc_forwards();
+       check_added_monitors!(nodes[1], 1);
+
+       // This causes the HTLC to be failed backwards.
+       let fail_event = nodes[1].node.get_and_clear_pending_msg_events();
+       assert_eq!(fail_event.len(), 1);
+       let (fail_msg, commitment_signed) = match &fail_event[0] {
+               &MessageSendEvent::UpdateHTLCs { ref updates, .. } => {
+                       assert_eq!(updates.update_add_htlcs.len(), 0);
+                       assert_eq!(updates.update_fulfill_htlcs.len(), 0);
+                       assert_eq!(updates.update_fail_malformed_htlcs.len(), 0);
+                       assert_eq!(updates.update_fail_htlcs.len(), 1);
+                       (updates.update_fail_htlcs[0].clone(), updates.commitment_signed.clone())
+               },
+               _ => panic!("Unexpected event"),
+       };
+
+       // Pass the failure messages back to nodes[0].
+       nodes[0].node.handle_update_fail_htlc(&nodes[1].node.get_our_node_id(), &fail_msg);
+       nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &commitment_signed);
+
+       // Complete the HTLC failure+removal process.
+       let (raa, commitment_signed) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
+       check_added_monitors!(nodes[0], 1);
+       nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(), &raa);
+       nodes[1].node.handle_commitment_signed(&nodes[0].node.get_our_node_id(), &commitment_signed);
+       check_added_monitors!(nodes[1], 2);
+       let final_raa_event = nodes[1].node.get_and_clear_pending_msg_events();
+       assert_eq!(final_raa_event.len(), 1);
+       let raa = match &final_raa_event[0] {
+               &MessageSendEvent::SendRevokeAndACK { ref msg, .. } => msg.clone(),
+               _ => panic!("Unexpected event"),
+       };
+       nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &raa);
+       let fail_msg_event = nodes[0].node.get_and_clear_pending_msg_events();
+       assert_eq!(fail_msg_event.len(), 1);
+       match &fail_msg_event[0] {
+               &MessageSendEvent::PaymentFailureNetworkUpdate { .. } => {},
+               _ => panic!("Unexpected event"),
+       }
+       let failure_event = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(failure_event.len(), 1);
+       match &failure_event[0] {
+               &Event::PaymentFailed { rejected_by_dest, .. } => {
+                       assert!(!rejected_by_dest);
+               },
+               _ => panic!("Unexpected event"),
+       }
+       check_added_monitors!(nodes[0], 1);
+}
+
 // BOLT 2 Requirements for the Sender when constructing and sending an update_add_htlc message.
 // BOLT 2 Requirement: MUST NOT offer amount_msat it cannot pay for in the remote commitment transaction at the current feerate_per_kw (see "Updating Fees") while maintaining its channel reserve.
 //TODO: I don't believe this is explicitly enforced when sending an HTLC but as the Fee aspect of the BOLT specs is in flux leaving this as a TODO.