Merge pull request #851 from TheBlueMatt/2021-03-holding-cell-clear-msg-get
[rust-lightning] / lightning / src / ln / channelmanager.rs
index fe3be2aeca56d9dc089d3385afadd5921b24ebd4..190fb2bc041181179b7fe33a150ca4f4aeefb6c9 100644 (file)
@@ -766,22 +766,44 @@ macro_rules! handle_error {
        }
 }
 
+/// Returns (boolean indicating if we should remove the Channel object from memory, a mapped error)
+macro_rules! convert_chan_err {
+       ($self: ident, $err: expr, $short_to_id: expr, $channel: expr, $channel_id: expr) => {
+               match $err {
+                       ChannelError::Ignore(msg) => {
+                               (false, MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $channel_id.clone()))
+                       },
+                       ChannelError::Close(msg) => {
+                               log_trace!($self.logger, "Closing channel {} due to close-required error: {}", log_bytes!($channel_id[..]), msg);
+                               if let Some(short_id) = $channel.get_short_channel_id() {
+                                       $short_to_id.remove(&short_id);
+                               }
+                               let shutdown_res = $channel.force_shutdown(true);
+                               (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $self.get_channel_update(&$channel).ok()))
+                       },
+                       ChannelError::CloseDelayBroadcast(msg) => {
+                               log_error!($self.logger, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($channel_id[..]), msg);
+                               if let Some(short_id) = $channel.get_short_channel_id() {
+                                       $short_to_id.remove(&short_id);
+                               }
+                               let shutdown_res = $channel.force_shutdown(false);
+                               (true, MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $self.get_channel_update(&$channel).ok()))
+                       }
+               }
+       }
+}
+
 macro_rules! break_chan_entry {
        ($self: ident, $res: expr, $channel_state: expr, $entry: expr) => {
                match $res {
                        Ok(res) => res,
-                       Err(ChannelError::Ignore(msg)) => {
-                               break Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $entry.key().clone()))
-                       },
-                       Err(ChannelError::Close(msg)) => {
-                               log_trace!($self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!($entry.key()[..]), msg);
-                               let (channel_id, mut chan) = $entry.remove_entry();
-                               if let Some(short_id) = chan.get_short_channel_id() {
-                                       $channel_state.short_to_id.remove(&short_id);
+                       Err(e) => {
+                               let (drop, res) = convert_chan_err!($self, e, $channel_state.short_to_id, $entry.get_mut(), $entry.key());
+                               if drop {
+                                       $entry.remove_entry();
                                }
-                               break Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()))
-                       },
-                       Err(ChannelError::CloseDelayBroadcast(_)) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
+                               break Err(res);
+                       }
                }
        }
 }
@@ -790,25 +812,12 @@ macro_rules! try_chan_entry {
        ($self: ident, $res: expr, $channel_state: expr, $entry: expr) => {
                match $res {
                        Ok(res) => res,
-                       Err(ChannelError::Ignore(msg)) => {
-                               return Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore(msg), $entry.key().clone()))
-                       },
-                       Err(ChannelError::Close(msg)) => {
-                               log_trace!($self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!($entry.key()[..]), msg);
-                               let (channel_id, mut chan) = $entry.remove_entry();
-                               if let Some(short_id) = chan.get_short_channel_id() {
-                                       $channel_state.short_to_id.remove(&short_id);
-                               }
-                               return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()))
-                       },
-                       Err(ChannelError::CloseDelayBroadcast(msg)) => {
-                               log_error!($self.logger, "Channel {} need to be shutdown but closing transactions not broadcast due to {}", log_bytes!($entry.key()[..]), msg);
-                               let (channel_id, mut chan) = $entry.remove_entry();
-                               if let Some(short_id) = chan.get_short_channel_id() {
-                                       $channel_state.short_to_id.remove(&short_id);
+                       Err(e) => {
+                               let (drop, res) = convert_chan_err!($self, e, $channel_state.short_to_id, $entry.get_mut(), $entry.key());
+                               if drop {
+                                       $entry.remove_entry();
                                }
-                               let shutdown_res = chan.force_shutdown(false);
-                               return Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel_id, shutdown_res, $self.get_channel_update(&chan).ok()))
+                               return Err(res);
                        }
                }
        }
@@ -818,13 +827,12 @@ macro_rules! handle_monitor_err {
        ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr) => {
                handle_monitor_err!($self, $err, $channel_state, $entry, $action_type, $resend_raa, $resend_commitment, Vec::new(), Vec::new())
        };
-       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => {
+       ($self: ident, $err: expr, $short_to_id: expr, $chan: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr, $chan_id: expr) => {
                match $err {
                        ChannelMonitorUpdateErr::PermanentFailure => {
-                               log_error!($self.logger, "Closing channel {} due to monitor update PermanentFailure", log_bytes!($entry.key()[..]));
-                               let (channel_id, mut chan) = $entry.remove_entry();
-                               if let Some(short_id) = chan.get_short_channel_id() {
-                                       $channel_state.short_to_id.remove(&short_id);
+                               log_error!($self.logger, "Closing channel {} due to monitor update ChannelMonitorUpdateErr::PermanentFailure", log_bytes!($chan_id[..]));
+                               if let Some(short_id) = $chan.get_short_channel_id() {
+                                       $short_to_id.remove(&short_id);
                                }
                                // TODO: $failed_fails is dropped here, which will cause other channels to hit the
                                // chain in a confused state! We need to move them into the ChannelMonitor which
@@ -835,12 +843,12 @@ macro_rules! handle_monitor_err {
                                // splitting hairs we'd prefer to claim payments that were to us, but we haven't
                                // given up the preimage yet, so might as well just wait until the payment is
                                // retried, avoiding the on-chain fees.
-                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), channel_id, chan.force_shutdown(true), $self.get_channel_update(&chan).ok()));
-                               res
+                               let res: Result<(), _> = Err(MsgHandleErrInternal::from_finish_shutdown("ChannelMonitor storage failure".to_owned(), *$chan_id, $chan.force_shutdown(true), $self.get_channel_update(&$chan).ok()));
+                               (res, true)
                        },
                        ChannelMonitorUpdateErr::TemporaryFailure => {
                                log_info!($self.logger, "Disabling channel {} due to monitor update TemporaryFailure. On restore will send {} and process {} forwards and {} fails",
-                                               log_bytes!($entry.key()[..]),
+                                               log_bytes!($chan_id[..]),
                                                if $resend_commitment && $resend_raa {
                                                                match $action_type {
                                                                        RAACommitmentOrder::CommitmentFirst => { "commitment then RAA" },
@@ -857,11 +865,18 @@ macro_rules! handle_monitor_err {
                                if !$resend_raa {
                                        debug_assert!($action_type == RAACommitmentOrder::CommitmentFirst || !$resend_commitment);
                                }
-                               $entry.get_mut().monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails);
-                               Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$entry.key()))
+                               $chan.monitor_update_failed($resend_raa, $resend_commitment, $failed_forwards, $failed_fails);
+                               (Err(MsgHandleErrInternal::from_chan_no_close(ChannelError::Ignore("Failed to update ChannelMonitor".to_owned()), *$chan_id)), false)
                        },
                }
-       }
+       };
+       ($self: ident, $err: expr, $channel_state: expr, $entry: expr, $action_type: path, $resend_raa: expr, $resend_commitment: expr, $failed_forwards: expr, $failed_fails: expr) => { {
+               let (res, drop) = handle_monitor_err!($self, $err, $channel_state.short_to_id, $entry.get_mut(), $action_type, $resend_raa, $resend_commitment, $failed_forwards, $failed_fails, $entry.key());
+               if drop {
+                       $entry.remove_entry();
+               }
+               res
+       } };
 }
 
 macro_rules! return_monitor_err {
@@ -885,6 +900,133 @@ macro_rules! maybe_break_monitor_err {
        }
 }
 
+macro_rules! handle_chan_restoration_locked {
+       ($self: ident, $channel_lock: expr, $channel_state: expr, $channel_entry: expr,
+        $raa: expr, $commitment_update: expr, $order: expr, $chanmon_update: expr,
+        $pending_forwards: expr, $funding_broadcastable: expr, $funding_locked: expr) => { {
+               let mut htlc_forwards = None;
+               let counterparty_node_id = $channel_entry.get().get_counterparty_node_id();
+
+               let chanmon_update: Option<ChannelMonitorUpdate> = $chanmon_update; // Force type-checking to resolve
+               let chanmon_update_is_none = chanmon_update.is_none();
+               let res = loop {
+                       let forwards: Vec<(PendingHTLCInfo, u64)> = $pending_forwards; // Force type-checking to resolve
+                       if !forwards.is_empty() {
+                               htlc_forwards = Some(($channel_entry.get().get_short_channel_id().expect("We can't have pending forwards before funding confirmation"),
+                                       $channel_entry.get().get_funding_txo().unwrap(), forwards));
+                       }
+
+                       if chanmon_update.is_some() {
+                               // On reconnect, we, by definition, only resend a funding_locked if there have been
+                               // no commitment updates, so the only channel monitor update which could also be
+                               // associated with a funding_locked would be the funding_created/funding_signed
+                               // monitor update. That monitor update failing implies that we won't send
+                               // funding_locked until it's been updated, so we can't have a funding_locked and a
+                               // monitor update here (so we don't bother to handle it correctly below).
+                               assert!($funding_locked.is_none());
+                               // A channel monitor update makes no sense without either a funding_locked or a
+                               // commitment update to process after it. Since we can't have a funding_locked, we
+                               // only bother to handle the monitor-update + commitment_update case below.
+                               assert!($commitment_update.is_some());
+                       }
+
+                       if let Some(msg) = $funding_locked {
+                               // Similar to the above, this implies that we're letting the funding_locked fly
+                               // before it should be allowed to.
+                               assert!(chanmon_update.is_none());
+                               $channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
+                                       node_id: counterparty_node_id,
+                                       msg,
+                               });
+                               if let Some(announcement_sigs) = $self.get_announcement_sigs($channel_entry.get()) {
+                                       $channel_state.pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
+                                               node_id: counterparty_node_id,
+                                               msg: announcement_sigs,
+                                       });
+                               }
+                               $channel_state.short_to_id.insert($channel_entry.get().get_short_channel_id().unwrap(), $channel_entry.get().channel_id());
+                       }
+
+                       let funding_broadcastable: Option<Transaction> = $funding_broadcastable; // Force type-checking to resolve
+                       if let Some(monitor_update) = chanmon_update {
+                               // We only ever broadcast a funding transaction in response to a funding_signed
+                               // message and the resulting monitor update. Thus, on channel_reestablish
+                               // message handling we can't have a funding transaction to broadcast. When
+                               // processing a monitor update finishing resulting in a funding broadcast, we
+                               // cannot have a second monitor update, thus this case would indicate a bug.
+                               assert!(funding_broadcastable.is_none());
+                               // Given we were just reconnected or finished updating a channel monitor, the
+                               // only case where we can get a new ChannelMonitorUpdate would be if we also
+                               // have some commitment updates to send as well.
+                               assert!($commitment_update.is_some());
+                               if let Err(e) = $self.chain_monitor.update_channel($channel_entry.get().get_funding_txo().unwrap(), monitor_update) {
+                                       // channel_reestablish doesn't guarantee the order it returns is sensical
+                                       // for the messages it returns, but if we're setting what messages to
+                                       // re-transmit on monitor update success, we need to make sure it is sane.
+                                       let mut order = $order;
+                                       if $raa.is_none() {
+                                               order = RAACommitmentOrder::CommitmentFirst;
+                                       }
+                                       break handle_monitor_err!($self, e, $channel_state, $channel_entry, order, $raa.is_some(), true);
+                               }
+                       }
+
+                       macro_rules! handle_cs { () => {
+                               if let Some(update) = $commitment_update {
+                                       $channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                               node_id: counterparty_node_id,
+                                               updates: update,
+                                       });
+                               }
+                       } }
+                       macro_rules! handle_raa { () => {
+                               if let Some(revoke_and_ack) = $raa {
+                                       $channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
+                                               node_id: counterparty_node_id,
+                                               msg: revoke_and_ack,
+                                       });
+                               }
+                       } }
+                       match $order {
+                               RAACommitmentOrder::CommitmentFirst => {
+                                       handle_cs!();
+                                       handle_raa!();
+                               },
+                               RAACommitmentOrder::RevokeAndACKFirst => {
+                                       handle_raa!();
+                                       handle_cs!();
+                               },
+                       }
+                       if let Some(tx) = funding_broadcastable {
+                               log_info!($self.logger, "Broadcasting funding transaction with txid {}", tx.txid());
+                               $self.tx_broadcaster.broadcast_transaction(&tx);
+                       }
+                       break Ok(());
+               };
+
+               if chanmon_update_is_none {
+                       // If there was no ChannelMonitorUpdate, we should never generate an Err in the res loop
+                       // above. Doing so would imply calling handle_err!() from channel_monitor_updated() which
+                       // should *never* end up calling back to `chain_monitor.update_channel()`.
+                       assert!(res.is_ok());
+               }
+
+               (htlc_forwards, res, counterparty_node_id)
+       } }
+}
+
+macro_rules! post_handle_chan_restoration {
+       ($self: ident, $locked_res: expr) => { {
+               let (htlc_forwards, res, counterparty_node_id) = $locked_res;
+
+               let _ = handle_error!($self, res, counterparty_node_id);
+
+               if let Some(forwards) = htlc_forwards {
+                       $self.forward_htlcs(&mut [forwards][..]);
+               }
+       } }
+}
+
 impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelManager<Signer, M, T, K, F, L>
        where M::Target: chain::Watch<Signer>,
         T::Target: BroadcasterInterface,
@@ -1927,7 +2069,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                                },
                                                                HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
                                                                        log_trace!(self.logger, "Failing HTLC back to channel with short id {} after delay", short_chan_id);
-                                                                       match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet) {
+                                                                       match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
                                                                                Err(e) => {
                                                                                        if let ChannelError::Ignore(msg) = e {
                                                                                                log_trace!(self.logger, "Failed to fail backwards to short_id {}: {}", short_chan_id, msg);
@@ -2593,85 +2735,24 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
 
-               let mut close_results = Vec::new();
-               let mut htlc_forwards = Vec::new();
-               let mut htlc_failures = Vec::new();
-               let mut pending_events = Vec::new();
-
-               {
+               let (mut pending_failures, chan_restoration_res) = {
                        let mut channel_lock = self.channel_state.lock().unwrap();
                        let channel_state = &mut *channel_lock;
-                       let short_to_id = &mut channel_state.short_to_id;
-                       let pending_msg_events = &mut channel_state.pending_msg_events;
-                       let channel = match channel_state.by_id.get_mut(&funding_txo.to_channel_id()) {
-                               Some(chan) => chan,
-                               None => return,
+                       let mut channel = match channel_state.by_id.entry(funding_txo.to_channel_id()) {
+                               hash_map::Entry::Occupied(chan) => chan,
+                               hash_map::Entry::Vacant(_) => return,
                        };
-                       if !channel.is_awaiting_monitor_update() || channel.get_latest_monitor_update_id() != highest_applied_update_id {
+                       if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
                                return;
                        }
 
-                       let (raa, commitment_update, order, pending_forwards, mut pending_failures, funding_broadcastable, funding_locked) = channel.monitor_updating_restored(&self.logger);
-                       if !pending_forwards.is_empty() {
-                               htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), funding_txo.clone(), pending_forwards));
-                       }
-                       htlc_failures.append(&mut pending_failures);
-
-                       macro_rules! handle_cs { () => {
-                               if let Some(update) = commitment_update {
-                                       pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
-                                               node_id: channel.get_counterparty_node_id(),
-                                               updates: update,
-                                       });
-                               }
-                       } }
-                       macro_rules! handle_raa { () => {
-                               if let Some(revoke_and_ack) = raa {
-                                       pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
-                                               node_id: channel.get_counterparty_node_id(),
-                                               msg: revoke_and_ack,
-                                       });
-                               }
-                       } }
-                       match order {
-                               RAACommitmentOrder::CommitmentFirst => {
-                                       handle_cs!();
-                                       handle_raa!();
-                               },
-                               RAACommitmentOrder::RevokeAndACKFirst => {
-                                       handle_raa!();
-                                       handle_cs!();
-                               },
-                       }
-                       if let Some(tx) = funding_broadcastable {
-                               log_info!(self.logger, "Broadcasting funding transaction with txid {}", tx.txid());
-                               self.tx_broadcaster.broadcast_transaction(&tx);
-                       }
-                       if let Some(msg) = funding_locked {
-                               pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
-                                       node_id: channel.get_counterparty_node_id(),
-                                       msg,
-                               });
-                               if let Some(announcement_sigs) = self.get_announcement_sigs(channel) {
-                                       pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
-                                               node_id: channel.get_counterparty_node_id(),
-                                               msg: announcement_sigs,
-                                       });
-                               }
-                               short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id());
-                       }
-               }
-
-               self.pending_events.lock().unwrap().append(&mut pending_events);
-
-               for failure in htlc_failures.drain(..) {
+                       let (raa, commitment_update, order, pending_forwards, pending_failures, funding_broadcastable, funding_locked) = channel.get_mut().monitor_updating_restored(&self.logger);
+                       (pending_failures, handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, raa, commitment_update, order, None, pending_forwards, funding_broadcastable, funding_locked))
+               };
+               post_handle_chan_restoration!(self, chan_restoration_res);
+               for failure in pending_failures.drain(..) {
                        self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
                }
-               self.forward_htlcs(&mut htlc_forwards[..]);
-
-               for res in close_results.drain(..) {
-                       self.finish_force_close_channel(res);
-               }
        }
 
        fn internal_open_channel(&self, counterparty_node_id: &PublicKey, their_features: InitFeatures, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> {
@@ -3282,77 +3363,35 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
        }
 
        fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> {
-               let mut channel_state_lock = self.channel_state.lock().unwrap();
-               let channel_state = &mut *channel_state_lock;
+               let (htlcs_failed_forward, chan_restoration_res) = {
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
+                       let channel_state = &mut *channel_state_lock;
 
-               match channel_state.by_id.entry(msg.channel_id) {
-                       hash_map::Entry::Occupied(mut chan) => {
-                               if chan.get().get_counterparty_node_id() != *counterparty_node_id {
-                                       return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
-                               }
-                               // Currently, we expect all holding cell update_adds to be dropped on peer
-                               // disconnect, so Channel's reestablish will never hand us any holding cell
-                               // freed HTLCs to fail backwards. If in the future we no longer drop pending
-                               // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here.
-                               let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, mut order, shutdown) =
-                                       try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan);
-                               if let Some(monitor_update) = monitor_update_opt {
-                                       if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
-                                               // channel_reestablish doesn't guarantee the order it returns is sensical
-                                               // for the messages it returns, but if we're setting what messages to
-                                               // re-transmit on monitor update success, we need to make sure it is sane.
-                                               if revoke_and_ack.is_none() {
-                                                       order = RAACommitmentOrder::CommitmentFirst;
-                                               }
-                                               if commitment_update.is_none() {
-                                                       order = RAACommitmentOrder::RevokeAndACKFirst;
-                                               }
-                                               return_monitor_err!(self, e, channel_state, chan, order, revoke_and_ack.is_some(), commitment_update.is_some());
-                                               //TODO: Resend the funding_locked if needed once we get the monitor running again
-                                       }
-                               }
-                               if let Some(msg) = funding_locked {
-                                       channel_state.pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
-                                               node_id: counterparty_node_id.clone(),
-                                               msg
-                                       });
-                               }
-                               macro_rules! send_raa { () => {
-                                       if let Some(msg) = revoke_and_ack {
-                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
-                                                       node_id: counterparty_node_id.clone(),
-                                                       msg
-                                               });
+                       match channel_state.by_id.entry(msg.channel_id) {
+                               hash_map::Entry::Occupied(mut chan) => {
+                                       if chan.get().get_counterparty_node_id() != *counterparty_node_id {
+                                               return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
-                               } }
-                               macro_rules! send_cu { () => {
-                                       if let Some(updates) = commitment_update {
-                                               channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                       // Currently, we expect all holding cell update_adds to be dropped on peer
+                                       // disconnect, so Channel's reestablish will never hand us any holding cell
+                                       // freed HTLCs to fail backwards. If in the future we no longer drop pending
+                                       // add-HTLCs on disconnect, we may be handed HTLCs to fail backwards here.
+                                       let (funding_locked, revoke_and_ack, commitment_update, monitor_update_opt, order, htlcs_failed_forward, shutdown) =
+                                               try_chan_entry!(self, chan.get_mut().channel_reestablish(msg, &self.logger), channel_state, chan);
+                                       if let Some(msg) = shutdown {
+                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
                                                        node_id: counterparty_node_id.clone(),
-                                                       updates
+                                                       msg,
                                                });
                                        }
-                               } }
-                               match order {
-                                       RAACommitmentOrder::RevokeAndACKFirst => {
-                                               send_raa!();
-                                               send_cu!();
-                                       },
-                                       RAACommitmentOrder::CommitmentFirst => {
-                                               send_cu!();
-                                               send_raa!();
-                                       },
-                               }
-                               if let Some(msg) = shutdown {
-                                       channel_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
-                                               node_id: counterparty_node_id.clone(),
-                                               msg,
-                                       });
-                               }
-                               Ok(())
-                       },
-                       hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
-               }
+                                       (htlcs_failed_forward, handle_chan_restoration_locked!(self, channel_state_lock, channel_state, chan, revoke_and_ack, commitment_update, order, monitor_update_opt, Vec::new(), None, funding_locked))
+                               },
+                               hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
+                       }
+               };
+               post_handle_chan_restoration!(self, chan_restoration_res);
+               self.fail_holding_cell_htlcs(htlcs_failed_forward, msg.channel_id);
+               Ok(())
        }
 
        /// Begin Update fee process. Allowed only on an outbound channel.
@@ -3458,6 +3497,57 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                }
        }
 
+       /// Check the holding cell in each channel and free any pending HTLCs in them if possible.
+       /// This should only apply to HTLCs which were added to the holding cell because we were
+       /// waiting on a monitor update to finish. In that case, we don't want to free the holding cell
+       /// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user
+       /// code to inform them of a channel monitor update.
+       fn check_free_holding_cells(&self) {
+               let mut failed_htlcs = Vec::new();
+               let mut handle_errors = Vec::new();
+               {
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
+                       let channel_state = &mut *channel_state_lock;
+                       let by_id = &mut channel_state.by_id;
+                       let short_to_id = &mut channel_state.short_to_id;
+                       let pending_msg_events = &mut channel_state.pending_msg_events;
+
+                       by_id.retain(|channel_id, chan| {
+                               match chan.maybe_free_holding_cell_htlcs(&self.logger) {
+                                       Ok((None, ref htlcs)) if htlcs.is_empty() => true,
+                                       Ok((commitment_opt, holding_cell_failed_htlcs)) => {
+                                               failed_htlcs.push((holding_cell_failed_htlcs, *channel_id));
+                                               if let Some((commitment_update, monitor_update)) = commitment_opt {
+                                                       if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) {
+                                                               let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id);
+                                                               handle_errors.push((chan.get_counterparty_node_id(), res));
+                                                               if close_channel { return false; }
+                                                       } else {
+                                                               pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                                                       node_id: chan.get_counterparty_node_id(),
+                                                                       updates: commitment_update,
+                                                               });
+                                                       }
+                                               }
+                                               true
+                                       },
+                                       Err(e) => {
+                                               let (close_channel, res) = convert_chan_err!(self, e, short_to_id, chan, channel_id);
+                                               handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
+                                               !close_channel
+                                       }
+                               }
+                       });
+               }
+               for (failures, channel_id) in failed_htlcs.drain(..) {
+                       self.fail_holding_cell_htlcs(failures, channel_id);
+               }
+
+               for (counterparty_node_id, err) in handle_errors.drain(..) {
+                       let _ = handle_error!(self, err, counterparty_node_id);
+               }
+       }
+
        /// Handle a list of channel failures during a block_connected or block_disconnected call,
        /// pushing the channel monitor update (if any) to the background events queue and removing the
        /// Channel object.
@@ -3594,6 +3684,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSend
                // ChannelMonitors when clearing other events.
                self.process_pending_monitor_events();
 
+               self.check_free_holding_cells();
+
                let mut ret = Vec::new();
                let mut channel_state = self.channel_state.lock().unwrap();
                mem::swap(&mut ret, &mut channel_state.pending_msg_events);
@@ -3961,7 +4053,6 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
        fn peer_disconnected(&self, counterparty_node_id: &PublicKey, no_connection_possible: bool) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
                let mut failed_channels = Vec::new();
-               let mut failed_payments = Vec::new();
                let mut no_channels_remain = true;
                {
                        let mut channel_state_lock = self.channel_state.lock().unwrap();
@@ -3990,15 +4081,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                                log_debug!(self.logger, "Marking channels with {} disconnected and generating channel_updates", log_pubkey!(counterparty_node_id));
                                channel_state.by_id.retain(|_, chan| {
                                        if chan.get_counterparty_node_id() == *counterparty_node_id {
-                                               // Note that currently on channel reestablish we assert that there are no
-                                               // holding cell add-HTLCs, so if in the future we stop removing uncommitted HTLCs
-                                               // on peer disconnect here, there will need to be corresponding changes in
-                                               // reestablish logic.
-                                               let failed_adds = chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
-                                               if !failed_adds.is_empty() {
-                                                       let chan_update = self.get_channel_update(&chan).map(|u| u.encode_with_len()).unwrap(); // Cannot add/recv HTLCs before we have a short_id so unwrap is safe
-                                                       failed_payments.push((chan_update, failed_adds));
-                                               }
+                                               chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
                                                if chan.is_shutdown() {
                                                        if let Some(short_id) = chan.get_short_channel_id() {
                                                                short_to_id.remove(&short_id);
@@ -4042,11 +4125,6 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
                for failure in failed_channels.drain(..) {
                        self.finish_force_close_channel(failure);
                }
-               for (chan_update, mut htlc_sources) in failed_payments {
-                       for (htlc_source, payment_hash) in htlc_sources.drain(..) {
-                               self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source, &payment_hash, HTLCFailReason::Reason { failure_code: 0x1000 | 7, data: chan_update.clone() });
-                       }
-               }
        }
 
        fn peer_connected(&self, counterparty_node_id: &PublicKey, init_msg: &msgs::Init) {