Handle retrying sign_counterparty_commitment failures
[rust-lightning] / lightning / src / ln / channelmanager.rs
index c4efd895796e67d1f3c1ecc46e5559b277c5cc9b..37a8feb446d6865bcdbe7849e9c1a854d318b4d9 100644 (file)
@@ -457,7 +457,7 @@ impl MsgHandleErrInternal {
        #[inline]
        fn from_finish_shutdown(err: String, channel_id: ChannelId, user_channel_id: u128, shutdown_res: ShutdownResult, channel_update: Option<msgs::ChannelUpdate>, channel_capacity: u64) -> Self {
                let err_msg = msgs::ErrorMessage { channel_id, data: err.clone() };
-               let action = if let (Some(_), ..) = &shutdown_res {
+               let action = if shutdown_res.monitor_update.is_some() {
                        // We have a closing `ChannelMonitorUpdate`, which means the channel was funded and we
                        // should disconnect our peer such that we force them to broadcast their latest
                        // commitment upon reconnecting.
@@ -2602,7 +2602,7 @@ where
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
                let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
-               let mut shutdown_result = None;
+               let shutdown_result;
                loop {
                        let per_peer_state = self.per_peer_state.read().unwrap();
 
@@ -2617,10 +2617,11 @@ where
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
                                                let funding_txo_opt = chan.context.get_funding_txo();
                                                let their_features = &peer_state.latest_features;
-                                               let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
-                                               let (shutdown_msg, mut monitor_update_opt, htlcs) =
+                                               let (shutdown_msg, mut monitor_update_opt, htlcs, local_shutdown_result) =
                                                        chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
                                                failed_htlcs = htlcs;
+                                               shutdown_result = local_shutdown_result;
+                                               debug_assert_eq!(shutdown_result.is_some(), chan.is_shutdown());
 
                                                // We can send the `shutdown` message before updating the `ChannelMonitor`
                                                // here as we don't need the monitor update to complete until we send a
@@ -2648,7 +2649,6 @@ where
                                                                        });
                                                                }
                                                                self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
-                                                               shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                                                        }
                                                }
                                                break;
@@ -2739,22 +2739,21 @@ where
                self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script)
        }
 
-       fn finish_close_channel(&self, shutdown_res: ShutdownResult) {
+       fn finish_close_channel(&self, mut shutdown_res: ShutdownResult) {
                debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
                #[cfg(debug_assertions)]
                for (_, peer) in self.per_peer_state.read().unwrap().iter() {
                        debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
                }
 
-               let (monitor_update_option, mut failed_htlcs, unbroadcasted_batch_funding_txid) = shutdown_res;
-               log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len());
-               for htlc_source in failed_htlcs.drain(..) {
+               log_debug!(self.logger, "Finishing closure of channel with {} HTLCs to fail", shutdown_res.dropped_outbound_htlcs.len());
+               for htlc_source in shutdown_res.dropped_outbound_htlcs.drain(..) {
                        let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
                        let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
                        let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
                        self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
                }
-               if let Some((_, funding_txo, monitor_update)) = monitor_update_option {
+               if let Some((_, funding_txo, monitor_update)) = shutdown_res.monitor_update {
                        // There isn't anything we can do if we get an update failure - we're already
                        // force-closing. The monitor update on the required in-memory copy should broadcast
                        // the latest local state, which is the best we can do anyway. Thus, it is safe to
@@ -2762,7 +2761,7 @@ where
                        let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
                }
                let mut shutdown_results = Vec::new();
-               if let Some(txid) = unbroadcasted_batch_funding_txid {
+               if let Some(txid) = shutdown_res.unbroadcasted_batch_funding_txid {
                        let mut funding_batch_states = self.funding_batch_states.lock().unwrap();
                        let affected_channels = funding_batch_states.remove(&txid).into_iter().flatten();
                        let per_peer_state = self.per_peer_state.read().unwrap();
@@ -3803,7 +3802,7 @@ where
 
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
-               let (chan, msg) = match peer_state.channel_by_id.remove(temporary_channel_id) {
+               let (chan, msg_opt) = match peer_state.channel_by_id.remove(temporary_channel_id) {
                        Some(ChannelPhase::UnfundedOutboundV1(chan)) => {
                                let funding_txo = find_funding_output(&chan, &funding_transaction)?;
 
@@ -3842,10 +3841,12 @@ where
                                }),
                };
 
-               peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
-                       node_id: chan.context.get_counterparty_node_id(),
-                       msg,
-               });
+               if let Some(msg) = msg_opt {
+                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
+                               node_id: chan.context.get_counterparty_node_id(),
+                               msg,
+                       });
+               }
                match peer_state.channel_by_id.entry(chan.context.channel_id()) {
                        hash_map::Entry::Occupied(_) => {
                                panic!("Generated duplicate funding txid?");
@@ -3907,7 +3908,7 @@ where
        /// Return values are identical to [`Self::funding_transaction_generated`], respective to
        /// each individual channel and transaction output.
        ///
-       /// Do NOT broadcast the funding transaction yourself. This batch funding transcaction
+       /// Do NOT broadcast the funding transaction yourself. This batch funding transaction
        /// will only be broadcast when we have safely received and persisted the counterparty's
        /// signature for each channel.
        ///
@@ -3961,7 +3962,7 @@ where
                                btree_map::Entry::Vacant(vacant) => Some(vacant.insert(Vec::new())),
                        }
                });
-               for &(temporary_channel_id, counterparty_node_id) in temporary_channels.iter() {
+               for &(temporary_channel_id, counterparty_node_id) in temporary_channels {
                        result = result.and_then(|_| self.funding_transaction_generated_intern(
                                temporary_channel_id,
                                counterparty_node_id,
@@ -6230,7 +6231,7 @@ where
 
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
-               let (chan, funding_msg, monitor) =
+               let (chan, funding_msg_opt, monitor) =
                        match peer_state.channel_by_id.remove(&msg.temporary_channel_id) {
                                Some(ChannelPhase::UnfundedInboundV1(inbound_chan)) => {
                                        match inbound_chan.funding_created(msg, best_block, &self.signer_provider, &self.logger) {
@@ -6253,9 +6254,12 @@ where
                                None => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id))
                        };
 
-               match peer_state.channel_by_id.entry(funding_msg.channel_id) {
+               match peer_state.channel_by_id.entry(chan.context.channel_id()) {
                        hash_map::Entry::Occupied(_) => {
-                               Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
+                               Err(MsgHandleErrInternal::send_err_msg_no_close(
+                                       "Already had channel with the new channel_id".to_owned(),
+                                       chan.context.channel_id()
+                               ))
                        },
                        hash_map::Entry::Vacant(e) => {
                                let mut id_to_peer_lock = self.id_to_peer.lock().unwrap();
@@ -6263,7 +6267,7 @@ where
                                        hash_map::Entry::Occupied(_) => {
                                                return Err(MsgHandleErrInternal::send_err_msg_no_close(
                                                        "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
-                                                       funding_msg.channel_id))
+                                                       chan.context.channel_id()))
                                        },
                                        hash_map::Entry::Vacant(i_e) => {
                                                let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
@@ -6275,10 +6279,12 @@ where
                                                        // hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
                                                        // accepted payment from yet. We do, however, need to wait to send our channel_ready
                                                        // until we have persisted our monitor.
-                                                       peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
-                                                               node_id: counterparty_node_id.clone(),
-                                                               msg: funding_msg,
-                                                       });
+                                                       if let Some(msg) = funding_msg_opt {
+                                                               peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
+                                                                       node_id: counterparty_node_id.clone(),
+                                                                       msg,
+                                                               });
+                                                       }
 
                                                        if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) {
                                                                handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state,
@@ -6289,9 +6295,13 @@ where
                                                        Ok(())
                                                } else {
                                                        log_error!(self.logger, "Persisting initial ChannelMonitor failed, implying the funding outpoint was duplicated");
+                                                       let channel_id = match funding_msg_opt {
+                                                               Some(msg) => msg.channel_id,
+                                                               None => chan.context.channel_id(),
+                                                       };
                                                        return Err(MsgHandleErrInternal::send_err_msg_no_close(
                                                                "The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
-                                                               funding_msg.channel_id));
+                                                               channel_id));
                                                }
                                        }
                                }
@@ -6453,22 +6463,20 @@ where
        }
 
        fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
-               let mut shutdown_result = None;
-               let unbroadcasted_batch_funding_txid;
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
                                debug_assert!(false);
                                MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
                        })?;
-               let (tx, chan_option) = {
+               let (tx, chan_option, shutdown_result) = {
                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                        let peer_state = &mut *peer_state_lock;
                        match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_phase_entry) => {
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
-                                               unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
-                                               let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
+                                               let (closing_signed, tx, shutdown_result) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
+                                               debug_assert_eq!(shutdown_result.is_some(), chan.is_shutdown());
                                                if let Some(msg) = closing_signed {
                                                        peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
                                                                node_id: counterparty_node_id.clone(),
@@ -6481,8 +6489,8 @@ where
                                                        // also implies there are no pending HTLCs left on the channel, so we can
                                                        // fully delete it from tracking (the channel monitor is still around to
                                                        // watch for old state broadcasts)!
-                                                       (tx, Some(remove_channel_phase!(self, chan_phase_entry)))
-                                               } else { (tx, None) }
+                                                       (tx, Some(remove_channel_phase!(self, chan_phase_entry)), shutdown_result)
+                                               } else { (tx, None, shutdown_result) }
                                        } else {
                                                return try_chan_phase_entry!(self, Err(ChannelError::Close(
                                                        "Got a closing_signed message for an unfunded channel!".into())), chan_phase_entry);
@@ -6504,7 +6512,6 @@ where
                                });
                        }
                        self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure);
-                       shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                }
                mem::drop(per_peer_state);
                if let Some(shutdown_result) = shutdown_result {
@@ -7220,6 +7227,66 @@ where
                has_update
        }
 
+       /// When a call to a [`ChannelSigner`] method returns an error, this indicates that the signer
+       /// is (temporarily) unavailable, and the operation should be retried later.
+       ///
+       /// This method allows for that retry - either checking for any signer-pending messages to be
+       /// attempted in every channel, or in the specifically provided channel.
+       ///
+       /// [`ChannelSigner`]: crate::sign::ChannelSigner
+       #[cfg(test)] // This is only implemented for one signer method, and should be private until we
+                    // actually finish implementing it fully.
+       pub fn signer_unblocked(&self, channel_opt: Option<(PublicKey, ChannelId)>) {
+               let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+
+               let unblock_chan = |phase: &mut ChannelPhase<SP>, pending_msg_events: &mut Vec<MessageSendEvent>| {
+                       let node_id = phase.context().get_counterparty_node_id();
+                       if let ChannelPhase::Funded(chan) = phase {
+                               let msgs = chan.signer_maybe_unblocked(&self.logger);
+                               if let Some(updates) = msgs.commitment_update {
+                                       pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
+                                               node_id,
+                                               updates,
+                                       });
+                               }
+                               if let Some(msg) = msgs.funding_signed {
+                                       pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
+                                               node_id,
+                                               msg,
+                                       });
+                               }
+                               if let Some(msg) = msgs.funding_created {
+                                       pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
+                                               node_id,
+                                               msg,
+                                       });
+                               }
+                               if let Some(msg) = msgs.channel_ready {
+                                       send_channel_ready!(self, pending_msg_events, chan, msg);
+                               }
+                       }
+               };
+
+               let per_peer_state = self.per_peer_state.read().unwrap();
+               if let Some((counterparty_node_id, channel_id)) = channel_opt {
+                       if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                               let peer_state = &mut *peer_state_lock;
+                               if let Some(chan) = peer_state.channel_by_id.get_mut(&channel_id) {
+                                       unblock_chan(chan, &mut peer_state.pending_msg_events);
+                               }
+                       }
+               } else {
+                       for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
+                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
+                               let peer_state = &mut *peer_state_lock;
+                               for (_, chan) in peer_state.channel_by_id.iter_mut() {
+                                       unblock_chan(chan, &mut peer_state.pending_msg_events);
+                               }
+                       }
+               }
+       }
+
        /// Check whether any channels have finished removing all pending updates after a shutdown
        /// exchange and can now send a closing_signed.
        /// Returns whether any closing_signed messages were generated.
@@ -7237,15 +7304,18 @@ where
                                peer_state.channel_by_id.retain(|channel_id, phase| {
                                        match phase {
                                                ChannelPhase::Funded(chan) => {
-                                                       let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                        match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) {
-                                                               Ok((msg_opt, tx_opt)) => {
+                                                               Ok((msg_opt, tx_opt, shutdown_result_opt)) => {
                                                                        if let Some(msg) = msg_opt {
                                                                                has_update = true;
                                                                                pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
                                                                                        node_id: chan.context.get_counterparty_node_id(), msg,
                                                                                });
                                                                        }
+                                                                       debug_assert_eq!(shutdown_result_opt.is_some(), chan.is_shutdown());
+                                                                       if let Some(shutdown_result) = shutdown_result_opt {
+                                                                               shutdown_results.push(shutdown_result);
+                                                                       }
                                                                        if let Some(tx) = tx_opt {
                                                                                // We're done with this channel. We got a closing_signed and sent back
                                                                                // a closing_signed with a closing transaction to broadcast.
@@ -7260,7 +7330,6 @@ where
                                                                                log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
                                                                                self.tx_broadcaster.broadcast_transactions(&[&tx]);
                                                                                update_maps_on_chan_removal!(self, &chan.context);
-                                                                               shutdown_results.push((None, Vec::new(), unbroadcasted_batch_funding_txid));
                                                                                false
                                                                        } else { true }
                                                                },
@@ -7301,7 +7370,7 @@ where
                        // Channel::force_shutdown tries to make us do) as we may still be in initialization,
                        // so we track the update internally and handle it when the user next calls
                        // timer_tick_occurred, guaranteeing we're running normally.
-                       if let Some((counterparty_node_id, funding_txo, update)) = failure.0.take() {
+                       if let Some((counterparty_node_id, funding_txo, update)) = failure.monitor_update.take() {
                                assert_eq!(update.updates.len(), 1);
                                if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] {
                                        assert!(should_broadcast);
@@ -9966,16 +10035,16 @@ where
                                                log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
                                                        &channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
                                        }
-                                       let (monitor_update, mut new_failed_htlcs, batch_funding_txid) = channel.context.force_shutdown(true);
-                                       if batch_funding_txid.is_some() {
+                                       let mut shutdown_result = channel.context.force_shutdown(true);
+                                       if shutdown_result.unbroadcasted_batch_funding_txid.is_some() {
                                                return Err(DecodeError::InvalidValue);
                                        }
-                                       if let Some((counterparty_node_id, funding_txo, update)) = monitor_update {
+                                       if let Some((counterparty_node_id, funding_txo, update)) = shutdown_result.monitor_update {
                                                close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
                                                        counterparty_node_id, funding_txo, update
                                                });
                                        }
-                                       failed_htlcs.append(&mut new_failed_htlcs);
+                                       failed_htlcs.append(&mut shutdown_result.dropped_outbound_htlcs);
                                        channel_closures.push_back((events::Event::ChannelClosed {
                                                channel_id: channel.context.channel_id(),
                                                user_channel_id: channel.context.get_user_id(),