Merge pull request #1863 from TheBlueMatt/2022-11-holding-cell-batch-update
[rust-lightning] / lightning / src / ln / channelmanager.rs
index d4209ce9539695138e80e1eeaec5b93671c57c2a..bb99b02d9f9a90bdafee770671100a22a95e2e70 100644 (file)
@@ -3163,7 +3163,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                let mut new_events = Vec::new();
                let mut failed_forwards = Vec::new();
                let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
-               let mut handle_errors = Vec::new();
                {
                        let mut forward_htlcs = HashMap::new();
                        mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
@@ -3279,8 +3278,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                        continue;
                                                },
                                                hash_map::Entry::Occupied(mut chan) => {
-                                                       let mut add_htlc_msgs = Vec::new();
-                                                       let mut fail_htlc_msgs = Vec::new();
                                                        for forward_info in pending_forwards.drain(..) {
                                                                match forward_info {
                                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
@@ -3299,34 +3296,21 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                                                        // Phantom payments are only PendingHTLCRouting::Receive.
                                                                                        phantom_shared_secret: None,
                                                                                });
-                                                                               match chan.get_mut().send_htlc(outgoing_amt_msat, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
-                                                                                       Err(e) => {
-                                                                                               if let ChannelError::Ignore(msg) = e {
-                                                                                                       log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
-                                                                                               } else {
-                                                                                                       panic!("Stated return value requirements in send_htlc() were not met");
-                                                                                               }
-                                                                                               let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
-                                                                                               failed_forwards.push((htlc_source, payment_hash,
-                                                                                                       HTLCFailReason::reason(failure_code, data),
-                                                                                                       HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id }
-                                                                                               ));
-                                                                                               continue;
-                                                                                       },
-                                                                                       Ok(update_add) => {
-                                                                                               match update_add {
-                                                                                                       Some(msg) => { add_htlc_msgs.push(msg); },
-                                                                                                       None => {
-                                                                                                               // Nothing to do here...we're waiting on a remote
-                                                                                                               // revoke_and_ack before we can add anymore HTLCs. The Channel
-                                                                                                               // will automatically handle building the update_add_htlc and
-                                                                                                               // commitment_signed messages when we can.
-                                                                                                               // TODO: Do some kind of timer to set the channel as !is_live()
-                                                                                                               // as we don't really want others relying on us relaying through
-                                                                                                               // this channel currently :/.
-                                                                                                       }
-                                                                                               }
+                                                                               if let Err(e) = chan.get_mut().queue_add_htlc(outgoing_amt_msat,
+                                                                                       payment_hash, outgoing_cltv_value, htlc_source.clone(),
+                                                                                       onion_packet, &self.logger)
+                                                                               {
+                                                                                       if let ChannelError::Ignore(msg) = e {
+                                                                                               log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
+                                                                                       } else {
+                                                                                               panic!("Stated return value requirements in send_htlc() were not met");
                                                                                        }
+                                                                                       let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
+                                                                                       failed_forwards.push((htlc_source, payment_hash,
+                                                                                               HTLCFailReason::reason(failure_code, data),
+                                                                                               HTLCDestination::NextHopChannel { node_id: Some(chan.get().get_counterparty_node_id()), channel_id: forward_chan_id }
+                                                                                       ));
+                                                                                       continue;
                                                                                }
                                                                        },
                                                                        HTLCForwardInfo::AddHTLC { .. } => {
@@ -3334,77 +3318,22 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                                                                        },
                                                                        HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
                                                                                log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
-                                                                               match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
-                                                                                       Err(e) => {
-                                                                                               if let ChannelError::Ignore(msg) = e {
-                                                                                                       log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
-                                                                                               } else {
-                                                                                                       panic!("Stated return value requirements in get_update_fail_htlc() were not met");
-                                                                                               }
-                                                                                               // fail-backs are best-effort, we probably already have one
-                                                                                               // pending, and if not that's OK, if not, the channel is on
-                                                                                               // the chain and sending the HTLC-Timeout is their problem.
-                                                                                               continue;
-                                                                                       },
-                                                                                       Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
-                                                                                       Ok(None) => {
-                                                                                               // Nothing to do here...we're waiting on a remote
-                                                                                               // revoke_and_ack before we can update the commitment
-                                                                                               // transaction. The Channel will automatically handle
-                                                                                               // building the update_fail_htlc and commitment_signed
-                                                                                               // messages when we can.
-                                                                                               // We don't need any kind of timer here as they should fail
-                                                                                               // the channel onto the chain if they can't get our
-                                                                                               // update_fail_htlc in time, it's not our problem.
+                                                                               if let Err(e) = chan.get_mut().queue_fail_htlc(
+                                                                                       htlc_id, err_packet, &self.logger
+                                                                               ) {
+                                                                                       if let ChannelError::Ignore(msg) = e {
+                                                                                               log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
+                                                                                       } else {
+                                                                                               panic!("Stated return value requirements in queue_fail_htlc() were not met");
                                                                                        }
+                                                                                       // fail-backs are best-effort, we probably already have one
+                                                                                       // pending, and if not that's OK, if not, the channel is on
+                                                                                       // the chain and sending the HTLC-Timeout is their problem.
+                                                                                       continue;
                                                                                }
                                                                        },
                                                                }
                                                        }
-
-                                                       if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
-                                                               let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
-                                                                       Ok(res) => res,
-                                                                       Err(e) => {
-                                                                               // We surely failed send_commitment due to bad keys, in that case
-                                                                               // close channel and then send error message to peer.
-                                                                               let counterparty_node_id = chan.get().get_counterparty_node_id();
-                                                                               let err: Result<(), _>  = match e {
-                                                                                       ChannelError::Ignore(_) | ChannelError::Warn(_) => {
-                                                                                               panic!("Stated return value requirements in send_commitment() were not met");
-                                                                                       }
-                                                                                       ChannelError::Close(msg) => {
-                                                                                               log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
-                                                                                               let mut channel = remove_channel!(self, chan);
-                                                                                               // ChannelClosed event is generated by handle_error for us.
-                                                                                               Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
-                                                                                       },
-                                                                               };
-                                                                               handle_errors.push((counterparty_node_id, err));
-                                                                               continue;
-                                                                       }
-                                                               };
-                                                               match self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
-                                                                       ChannelMonitorUpdateStatus::Completed => {},
-                                                                       e => {
-                                                                               handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
-                                                                               continue;
-                                                                       }
-                                                               }
-                                                               log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
-                                                                       add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
-                                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                                       node_id: chan.get().get_counterparty_node_id(),
-                                                                       updates: msgs::CommitmentUpdate {
-                                                                               update_add_htlcs: add_htlc_msgs,
-                                                                               update_fulfill_htlcs: Vec::new(),
-                                                                               update_fail_htlcs: fail_htlc_msgs,
-                                                                               update_fail_malformed_htlcs: Vec::new(),
-                                                                               update_fee: None,
-                                                                               commitment_signed: commitment_msg,
-                                                                       },
-                                                               });
-                                                       }
                                                }
                                        }
                                } else {
@@ -3608,9 +3537,11 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                }
                self.forward_htlcs(&mut phantom_receives);
 
-               for (counterparty_node_id, err) in handle_errors.drain(..) {
-                       let _ = handle_error!(self, err, counterparty_node_id);
-               }
+               // Freeing the holding cell here is relatively redundant - in practice we'll do it when we
+               // next get a `get_and_clear_pending_msg_events` call, but some tests rely on it, and it's
+               // nice to do the work now if we can rather than while we're trying to get messages in the
+               // network stack.
+               self.check_free_holding_cells();
 
                if new_events.is_empty() { return }
                let mut events = self.pending_events.lock().unwrap();
@@ -3648,59 +3579,24 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
                self.process_background_events();
        }
 
-       fn update_channel_fee(&self, pending_msg_events: &mut Vec<events::MessageSendEvent>, chan_id: &[u8; 32], chan: &mut Channel<<K::Target as KeysInterface>::Signer>, new_feerate: u32) -> (bool, NotifyOption, Result<(), MsgHandleErrInternal>) {
-               if !chan.is_outbound() { return (true, NotifyOption::SkipPersist, Ok(())); }
+       fn update_channel_fee(&self, chan_id: &[u8; 32], chan: &mut Channel<<K::Target as KeysInterface>::Signer>, new_feerate: u32) -> NotifyOption {
+               if !chan.is_outbound() { return NotifyOption::SkipPersist; }
                // If the feerate has decreased by less than half, don't bother
                if new_feerate <= chan.get_feerate() && new_feerate * 2 > chan.get_feerate() {
                        log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {}.",
                                log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
-                       return (true, NotifyOption::SkipPersist, Ok(()));
+                       return NotifyOption::SkipPersist;
                }
                if !chan.is_live() {
                        log_trace!(self.logger, "Channel {} does not qualify for a feerate change from {} to {} as it cannot currently be updated (probably the peer is disconnected).",
                                log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
-                       return (true, NotifyOption::SkipPersist, Ok(()));
+                       return NotifyOption::SkipPersist;
                }
                log_trace!(self.logger, "Channel {} qualifies for a feerate change from {} to {}.",
                        log_bytes!(chan_id[..]), chan.get_feerate(), new_feerate);
 
-               let mut retain_channel = true;
-               let res = match chan.send_update_fee_and_commit(new_feerate, &self.logger) {
-                       Ok(res) => Ok(res),
-                       Err(e) => {
-                               let (drop, res) = convert_chan_err!(self, e, chan, chan_id);
-                               if drop { retain_channel = false; }
-                               Err(res)
-                       }
-               };
-               let ret_err = match res {
-                       Ok(Some((update_fee, commitment_signed, monitor_update))) => {
-                               match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
-                                       ChannelMonitorUpdateStatus::Completed => {
-                                               pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                                       node_id: chan.get_counterparty_node_id(),
-                                                       updates: msgs::CommitmentUpdate {
-                                                               update_add_htlcs: Vec::new(),
-                                                               update_fulfill_htlcs: Vec::new(),
-                                                               update_fail_htlcs: Vec::new(),
-                                                               update_fail_malformed_htlcs: Vec::new(),
-                                                               update_fee: Some(update_fee),
-                                                               commitment_signed,
-                                                       },
-                                               });
-                                               Ok(())
-                                       },
-                                       e => {
-                                               let (res, drop) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, chan_id, COMMITMENT_UPDATE_ONLY);
-                                               if drop { retain_channel = false; }
-                                               res
-                                       }
-                               }
-                       },
-                       Ok(None) => Ok(()),
-                       Err(e) => Err(e),
-               };
-               (retain_channel, NotifyOption::DoPersist, ret_err)
+               chan.queue_update_fee(new_feerate, &self.logger);
+               NotifyOption::DoPersist
        }
 
        #[cfg(fuzzing)]
@@ -3714,19 +3610,10 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
 
                        let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
 
-                       let mut handle_errors = Vec::new();
-                       {
-                               let mut channel_state_lock = self.channel_state.lock().unwrap();
-                               let channel_state = &mut *channel_state_lock;
-                               let pending_msg_events = &mut channel_state.pending_msg_events;
-                               channel_state.by_id.retain(|chan_id, chan| {
-                                       let (retain_channel, chan_needs_persist, err) = self.update_channel_fee(pending_msg_events, chan_id, chan, new_feerate);
-                                       if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
-                                       if err.is_err() {
-                                               handle_errors.push(err);
-                                       }
-                                       retain_channel
-                               });
+                       let mut channel_state = self.channel_state.lock().unwrap();
+                       for (chan_id, chan) in channel_state.by_id.iter_mut() {
+                               let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
+                               if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
                        }
 
                        should_persist
@@ -3791,20 +3678,15 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
 
                        let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal);
 
-                       let mut handle_errors = Vec::new();
+                       let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
                        let mut timed_out_mpp_htlcs = Vec::new();
                        {
                                let mut channel_state_lock = self.channel_state.lock().unwrap();
                                let channel_state = &mut *channel_state_lock;
                                let pending_msg_events = &mut channel_state.pending_msg_events;
                                channel_state.by_id.retain(|chan_id, chan| {
-                                       let counterparty_node_id = chan.get_counterparty_node_id();
-                                       let (retain_channel, chan_needs_persist, err) = self.update_channel_fee(pending_msg_events, chan_id, chan, new_feerate);
+                                       let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
                                        if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
-                                       if err.is_err() {
-                                               handle_errors.push((err, counterparty_node_id));
-                                       }
-                                       if !retain_channel { return false; }
 
                                        if let Err(e) = chan.timer_check_closing_negotiation_progress() {
                                                let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id);
@@ -3879,6 +3761,13 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
 
                        self.remove_stale_resolved_payments();
 
+                       // Technically we don't need to do this here, but if we have holding cell entries in a
+                       // channel that need freeing, it's better to do that here and block a background task
+                       // than block the message queueing pipeline.
+                       if self.check_free_holding_cells() {
+                               should_persist = NotifyOption::DoPersist;
+                       }
+
                        should_persist
                });
        }
@@ -5502,11 +5391,6 @@ impl<M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<M, T, K, F
        /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
        /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor
        /// update was applied.
-       ///
-       /// This should only apply to HTLCs which were added to the holding cell because we were
-       /// waiting on a monitor update to finish. In that case, we don't want to free the holding cell
-       /// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user
-       /// code to inform them of a channel monitor update.
        fn check_free_holding_cells(&self) -> bool {
                let mut has_monitor_update = false;
                let mut failed_htlcs = Vec::new();