Lean on the holding cell when batch-forwarding/failing HTLCs
authorMatt Corallo <git@bluematt.me>
Sat, 19 Nov 2022 00:00:28 +0000 (00:00 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 6 Dec 2022 18:18:26 +0000 (18:18 +0000)
When we batch HTLC updates, we currently do the explicit queueing
plus the commitment generation in the `ChannelManager`. This is a
bit strange as its ultimately really a `Channel` responsibility to
generate commitments at the correct time, with the abstraction
leaking into `ChannelManager` with the `send_htlc` and
`get_update_fail_htlc` method docs having clear comments about
how `send_commitment` MUST be called prior to calling other
`Channel` methods.

Luckily `Channel` already has an update queue - the holding cell.
Thus, we can trivially rewrite the batch update logic as inserting
the desired updates into the holding cell and then asking all
channels to clear their holding cells.

lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs

index 5cc4f4a364bf3fe80ab15b896b51557a58acc5d1..e8bae2e917de3eb506088aafcb52b1a386490878 100644 (file)
@@ -1942,13 +1942,27 @@ impl<Signer: Sign> Channel<Signer> {
                }
        }
 
+       /// We can only have one resolution per HTLC. In some cases around reconnect, we may fulfill
+       /// an HTLC more than once or fulfill once and then attempt to fail after reconnect. We cannot,
+       /// however, fail more than once as we wait for an upstream failure to be irrevocably committed
+       /// before we fail backwards.
+       ///
+       /// If we do fail twice, we debug_assert!(false) and return Ok(()). Thus, will always return
+       /// Ok(()) if debug assertions are turned on or preconditions are met.
+       pub fn queue_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L)
+       -> Result<(), ChannelError> where L::Target: Logger {
+               self.fail_htlc(htlc_id_arg, err_packet, true, logger)
+                       .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
+       }
+
        /// We can only have one resolution per HTLC. In some cases around reconnect, we may fulfill
        /// an HTLC more than once or fulfill once and then attempt to fail after reconnect. We cannot,
        /// however, fail more than once as we wait for an upstream failure to be irrevocably committed
        /// before we fail backwards.
        /// If we do fail twice, we debug_assert!(false) and return Ok(None). Thus, will always return
        /// Ok(_) if debug assertions are turned on or preconditions are met.
-       pub fn get_update_fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, logger: &L) -> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
+       fn fail_htlc<L: Deref>(&mut self, htlc_id_arg: u64, err_packet: msgs::OnionErrorPacket, mut force_holding_cell: bool, logger: &L)
+       -> Result<Option<msgs::UpdateFailHTLC>, ChannelError> where L::Target: Logger {
                if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
                        panic!("Was asked to fail an HTLC when channel was not in an operational state");
                }
@@ -1986,8 +2000,13 @@ impl<Signer: Sign> Channel<Signer> {
                        return Ok(None);
                }
 
-               // Now update local state:
                if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
+                       debug_assert!(force_holding_cell, "We don't expect to need to use the holding cell if we weren't trying to");
+                       force_holding_cell = true;
+               }
+
+               // Now update local state:
+               if force_holding_cell {
                        for pending_update in self.holding_cell_htlc_updates.iter() {
                                match pending_update {
                                        &HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => {
@@ -3146,8 +3165,8 @@ impl<Signer: Sign> Channel<Signer> {
                } else { Ok((None, Vec::new())) }
        }
 
-       /// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them
-       /// fulfilling or failing the last pending HTLC)
+       /// Frees any pending commitment updates in the holding cell, generating the relevant messages
+       /// for our counterparty.
        fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
                assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
                if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
@@ -3173,7 +3192,7 @@ impl<Signer: Sign> Channel<Signer> {
                                // to rebalance channels.
                                match &htlc_update {
                                        &HTLCUpdateAwaitingACK::AddHTLC {amount_msat, cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, ..} => {
-                                               match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), logger) {
+                                               match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), false, logger) {
                                                        Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()),
                                                        Err(e) => {
                                                                match e {
@@ -3209,13 +3228,13 @@ impl<Signer: Sign> Channel<Signer> {
                                                monitor_update.updates.append(&mut additional_monitor_update.updates);
                                        },
                                        &HTLCUpdateAwaitingACK::FailHTLC { htlc_id, ref err_packet } => {
-                                               match self.get_update_fail_htlc(htlc_id, err_packet.clone(), logger) {
+                                               match self.fail_htlc(htlc_id, err_packet.clone(), false, logger) {
                                                        Ok(update_fail_msg_option) => {
                                                                // If an HTLC failure was previously added to the holding cell (via
-                                                               // `get_update_fail_htlc`) then generating the fail message itself
-                                                               // must not fail - we should never end up in a state where we
-                                                               // double-fail an HTLC or fail-then-claim an HTLC as it indicates
-                                                               // we didn't wait for a full revocation before failing.
+                                                               // `queue_fail_htlc`) then generating the fail message itself must
+                                                               // not fail - we should never end up in a state where we double-fail
+                                                               // an HTLC or fail-then-claim an HTLC as it indicates we didn't wait
+                                                               // for a full revocation before failing.
                                                                update_fail_htlcs.push(update_fail_msg_option.unwrap())
                                                        },
                                                        Err(e) => {
@@ -5470,6 +5489,18 @@ impl<Signer: Sign> Channel<Signer> {
 
        // Send stuff to our remote peers:
 
+       /// Queues up an outbound HTLC to send by placing it in the holding cell. You should call
+       /// [`Self::maybe_free_holding_cell_htlcs`] in order to actually generate and send the
+       /// commitment update.
+       ///
+       /// `Err`s will only be [`ChannelError::Ignore`].
+       pub fn queue_add_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
+               onion_routing_packet: msgs::OnionPacket, logger: &L)
+       -> Result<(), ChannelError> where L::Target: Logger {
+               self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, true, logger)
+                       .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
+       }
+
        /// Adds a pending outbound HTLC to this channel, note that you probably want
        /// send_htlc_and_commit instead cause you'll want both messages at once.
        ///
@@ -5482,10 +5513,13 @@ impl<Signer: Sign> Channel<Signer> {
        ///   we may not yet have sent the previous commitment update messages and will need to
        ///   regenerate them.
        ///
-       /// You MUST call send_commitment prior to calling any other methods on this Channel!
+       /// You MUST call send_commitment prior to calling any other methods on this Channel if
+       /// `force_holding_cell` is false.
        ///
        /// If an Err is returned, it's a ChannelError::Ignore!
-       pub fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
+       fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
+               onion_routing_packet: msgs::OnionPacket, mut force_holding_cell: bool, logger: &L)
+       -> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
                if (self.channel_state & (ChannelState::ChannelReady as u32 | BOTH_SIDES_SHUTDOWN_MASK)) != (ChannelState::ChannelReady as u32) {
                        return Err(ChannelError::Ignore("Cannot send HTLC until channel is fully established and we haven't started shutting down".to_owned()));
                }
@@ -5580,8 +5614,12 @@ impl<Signer: Sign> Channel<Signer> {
                        return Err(ChannelError::Ignore(format!("Cannot send value that would put our balance under counterparty-announced channel reserve value ({})", chan_reserve_msat)));
                }
 
-               // Now update local state:
                if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::MonitorUpdateInProgress as u32)) != 0 {
+                       force_holding_cell = true;
+               }
+
+               // Now update local state:
+               if force_holding_cell {
                        self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::AddHTLC {
                                amount_msat,
                                payment_hash,
@@ -5774,7 +5812,7 @@ impl<Signer: Sign> Channel<Signer> {
        /// Shorthand for calling send_htlc() followed by send_commitment(), see docs on those for
        /// more info.
        pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<(msgs::UpdateAddHTLC, msgs::CommitmentSigned, ChannelMonitorUpdate)>, ChannelError> where L::Target: Logger {
-               match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, logger)? {
+               match self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger)? {
                        Some(update_add_htlc) => {
                                let (commitment_signed, monitor_update) = self.send_commitment_no_status_check(logger)?;
                                Ok(Some((update_add_htlc, commitment_signed, monitor_update)))
index 306739ad6e0d0d7fd9a7e5e2df3c053201f02f7d..fdd0bb202417ba5f62d1a339bb001633db9ade68 100644 (file)
@@ -3170,7 +3170,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                let mut new_events = Vec::new();
                let mut failed_forwards = Vec::new();
                let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
-               let mut handle_errors = Vec::new();
                {
                        let mut forward_htlcs = HashMap::new();
                        mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
@@ -3286,8 +3285,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                        continue;
                                                },
                                                hash_map::Entry::Occupied(mut chan) => {
-                                                       let mut add_htlc_msgs = Vec::new();
-                                                       let mut fail_htlc_msgs = Vec::new();
                                                        for forward_info in pending_forwards.drain(..) {
                                                                match forward_info {
                                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
@@ -3306,34 +3303,21 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                                                        // Phantom payments are only PendingHTLCRouting::Receive.
                                                                                        phantom_shared_secret: None,
                                                                                });
-                                                                               match chan.get_mut().send_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
-                                                                                       Err(e) => {
-                                                                                               if let ChannelError::Ignore(msg) = e {
-                                                                                                       log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
-                                                                                               } else {
-                                                                                                       panic!("Stated return value requirements in send_htlc() were not met");
-                                                                                               }
-                                                                                               let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
-                                                                                               failed_forwards.push((htlc_source, payment_hash,
-                                                                                                       HTLCFailReason::reason(failure_code, data),
-                                                                                                       HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id }
-                                                                                               ));
-                                                                                               continue;
-                                                                                       },
-                                                                                       Ok(update_add) => {
-                                                                                               match update_add {
-                                                                                                       Some(msg) => { add_htlc_msgs.push(msg); },
-                                                                                                       None => {
-                                                                                                               // Nothing to do here...we're waiting on a remote
-                                                                                                               // revoke_and_ack before we can add anymore HTLCs. The Channel
-                                                                                                               // will automatically handle building the update_add_htlc and
-                                                                                                               // commitment_signed messages when we can.
-                                                                                                               // TODO: Do some kind of timer to set the channel as !is_live()
-                                                                                                               // as we don't really want others relying on us relaying through
-                                                                                                               // this channel currently :/.
-                                                                                                       }
-                                                                                               }
+                                                                               if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat,
+                                                                                       payment_hash, outgoing_cltv_value, htlc_source.clone(),
+                                                                                       onion_packet, &self.logger)
+                                                                               {
+                                                                                       if let ChannelError::Ignore(msg) = e {
+                                                                                               log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
+                                                                                       } else {
+                                                                                               panic!("Stated return value requirements in send_htlc() were not met");
                                                                                        }
+                                                                                       let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
+                                                                                       failed_forwards.push((htlc_source, payment_hash,
+                                                                                               HTLCFailReason::reason(failure_code, data),
+                                                                                               HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id }
+                                                                                       ));
+                                                                                       continue;
                                                                                }
                                                                        },
                                                                        HTLCForwardInfo::AddHTLC { .. } => {
@@ -3341,77 +3325,22 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                                        },
                                                                        HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
                                                                                log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
-                                                                               match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
-                                                                                       Err(e) => {
-                                                                                               if let ChannelError::Ignore(msg) = e {
-                                                                                                       log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
-                                                                                               } else {
-                                                                                                       panic!("Stated return value requirements in get_update_fail_htlc() were not met");
-                                                                                               }
-                                                                                               // fail-backs are best-effort, we probably already have one
-                                                                                               // pending, and if not that's OK, if not, the channel is on
-                                                                                               // the chain and sending the HTLC-Timeout is their problem.
-                                                                                               continue;
-                                                                                       },
-                                                                                       Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
-                                                                                       Ok(None) => {
-                                                                                               // Nothing to do here...we're waiting on a remote
-                                                                                               // revoke_and_ack before we can update the commitment
-                                                                                               // transaction. The Channel will automatically handle
-                                                                                               // building the update_fail_htlc and commitment_signed
-                                                                                               // messages when we can.
-                                                                                               // We don't need any kind of timer here as they should fail
-                                                                                               // the channel onto the chain if they can't get our
-                                                                                               // update_fail_htlc in time, it's not our problem.
+                                                                               if let Err(e) = chan.get_mut().queue_fail_htlc(
+                                                                                       htlc_id, err_packet, &self.logger
+                                                                               ) {
+                                                                                       if let ChannelError::Ignore(msg) = e {
+                                                                                               log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
+                                                                                       } else {
+                                                                                               panic!("Stated return value requirements in queue_fail_htlc() were not met");
                                                                                        }
+                                                                                       // fail-backs are best-effort, we probably already have one
+                                                                                       // pending, and if not that's OK, if not, the channel is on
+                                                                                       // the chain and sending the HTLC-Timeout is their problem.
+                                                                                       continue;
                                                                                }
                                                                        },
                                                                }
                                                        }
-
-                                                       if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
-                                                               let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
-                                                                       Ok(res) => res,
-                                                                       Err(e) => {
-                                                                               // We surely failed send_commitment due to bad keys, in that case
-                                                                               // close channel and then send error message to peer.
-                                                                               let counterparty_node_id = chan.get().get_counterparty_node_id();
-                                                                               let err: Result<(), _>  = match e {
-                                                                                       ChannelError::Ignore(_) | ChannelError::Warn(_) => {
-                                                                                               panic!("Stated return value requirements in send_commitment() were not met");
-                                                                                       }
-                                                                                       ChannelError::Close(msg) => {
-                                                                                               log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
-                                                                                               let mut channel = remove_channel!(self, chan);
-                                                                                               // ChannelClosed event is generated by handle_error for us.
-                                                                                               Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
-                                                                                       },
-                                                                               };
-                                                                               handle_errors.push((counterparty_node_id, err));
-                                                                               continue;
-                                                                       }
-                                                               };
-                                                               match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
-                                                                       ChannelMonitorUpdateStatus::Completed => {},
-                                                                       e => {
-                                                                               handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
-                                                                               continue;
-                                                                       }
-                                                               }
-                                                               log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
-                                                                       add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
-                                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                                       node_id: chan.get().get_counterparty_node_id(),
-                                                                       updates: msgs::CommitmentUpdate {
-                                                                               update_add_htlcs: add_htlc_msgs,
-                                                                               update_fulfill_htlcs: Vec::new(),
-                                                                               update_fail_htlcs: fail_htlc_msgs,
-                                                                               update_fail_malformed_htlcs: Vec::new(),
-                                                                               update_fee: None,
-                                                                               commitment_signed: commitment_msg,
-                                                                       },
-                                                               });
-                                                       }
                                                }
                                        }
                                } else {
@@ -3615,9 +3544,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                }
                self.forward_htlcs(&mut phantom_receives);
 
-               for (counterparty_node_id, err) in handle_errors.drain(..) {
-                       let _ = handle_error!(self, err, counterparty_node_id);
-               }
+               // Freeing the holding cell here is relatively redundant - in practice we'll do it when we
+               // next get a `get_and_clear_pending_msg_events` call, but some tests rely on it, and it's
+               // nice to do the work now if we can rather than while we're trying to get messages in the
+               // network stack.
+               self.check_free_holding_cells();
 
                if new_events.is_empty() { return }
                let mut events = self.pending_events.lock().unwrap();
@@ -5568,11 +5499,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
        /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
        /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
        /// update was applied.
-       ///
-       /// This should only apply to HTLCs which were added to the holding cell because we were
-       /// waiting on a monitor update to finish. In that case, we don't want to free the holding cell
-       /// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user
-       /// code to inform them of a channel monitor update.
        fn check_free_holding_cells(&self) -> bool {
                let mut has_monitor_update = false;
                let mut failed_htlcs = Vec::new();