Send initial closing_signed message asynchronously and handle errs
authorMatt Corallo <git@bluematt.me>
Mon, 19 Jul 2021 19:57:37 +0000 (19:57 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 17 Aug 2021 02:16:01 +0000 (02:16 +0000)
When we added the support for external signing, many of the
signing functions were allowed to return an error, closing the
channel in such a case. `sign_closing_transaction` is one such
function which can now return an error, except instead of handling
it properly we'd simply never send a `closing_signed` message,
hanging the channel until users intervene and force-close it.

Piping the channel-closing error back through the various callsites
(several of which already have pending results by the time they
call `maybe_propose_first_closing_signed`) may be rather
complicated, so instead we simply attempt to propose the initial
`closing_signed` in `get_and_clear_pending_msg_events` like we do
for holding-cell freeing.

Further, since we now (possibly) generate a `ChannelMonitorUpdate`
on `shutdown`, we may need to wait for monitor updating to complete
before we can send a `closing_signed`, meaning we need to handle
the send asynchronously anyway.

This simplifies a few function interfaces and has no impact on
behavior, aside from a few message-ordering edge-cases, as seen in
the two small test changes required.

lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_tests.rs

index 37bcabfcea88467f9652463fe2d5070756539cfd..ee9a492746b4a93436b9620c78f15b3f1a6b4694 100644 (file)
@@ -142,7 +142,7 @@ fn test_monitor_and_persister_update_fail() {
        assert_eq!(updates.update_fulfill_htlcs.len(), 1);
        nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
        if let Some(ref mut channel) = nodes[0].node.channel_state.lock().unwrap().by_id.get_mut(&chan.2) {
-               if let Ok((_, _, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].fee_estimator, &node_cfgs[0].logger) {
+               if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
                        // Check that even though the persister is returning a TemporaryFailure,
                        // because the update is bogus, ultimately the error that's returned
                        // should be a PermanentFailure.
@@ -2561,8 +2561,8 @@ fn test_reconnect_dup_htlc_claims() {
 
 #[test]
 fn test_temporary_error_during_shutdown() {
-       // Test that temporary failures when updating the monitor's shutdown script do not prevent
-       // cooperative close.
+       // Test that temporary failures when updating the monitor's shutdown script delay cooperative
+       // close.
        let mut config = test_default_channel_config();
        config.channel_options.commit_upfront_shutdown_pubkey = false;
 
@@ -2575,9 +2575,39 @@ fn test_temporary_error_during_shutdown() {
 
        *nodes[0].chain_monitor.update_ret.lock().unwrap() = Some(Err(ChannelMonitorUpdateErr::TemporaryFailure));
        *nodes[1].chain_monitor.update_ret.lock().unwrap() = Some(Err(ChannelMonitorUpdateErr::TemporaryFailure));
-       close_channel(&nodes[0], &nodes[1], &channel_id, funding_tx, false);
-       check_added_monitors!(nodes[0], 1);
+
+       nodes[0].node.close_channel(&channel_id).unwrap();
+       nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &InitFeatures::known(), &get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id()));
        check_added_monitors!(nodes[1], 1);
+
+       nodes[0].node.handle_shutdown(&nodes[1].node.get_our_node_id(), &InitFeatures::known(), &get_event_msg!(nodes[1], MessageSendEvent::SendShutdown, nodes[0].node.get_our_node_id()));
+       check_added_monitors!(nodes[0], 1);
+
+       assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
+
+       *nodes[0].chain_monitor.update_ret.lock().unwrap() = None;
+       *nodes[1].chain_monitor.update_ret.lock().unwrap() = None;
+
+       let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
+       nodes[0].node.channel_monitor_updated(&outpoint, latest_update);
+       nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id()));
+
+       assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
+
+       *nodes[1].chain_monitor.update_ret.lock().unwrap() = None;
+       let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone();
+       nodes[1].node.channel_monitor_updated(&outpoint, latest_update);
+       let (_, closing_signed_b) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &closing_signed_b.unwrap());
+       let txn_b = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
+
+       let (_, none_a) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id());
+       assert!(none_a.is_none());
+       let txn_a = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
+
+       assert_eq!(txn_a, txn_b);
+       assert_eq!(txn_a.len(), 1);
+       check_spends!(txn_a[0], funding_tx);
 }
 
 #[test]
index d47b4e72439f401334d5f500b77c2338408349b0..373bfef13435359dbb7fe53e997a245e618d1533 100644 (file)
@@ -455,6 +455,11 @@ pub(super) struct Channel<Signer: Sign> {
 
        last_sent_closing_fee: Option<(u32, u64, Signature)>, // (feerate, fee, holder_sig)
 
+       /// If our counterparty sent us a closing_signed while we were waiting for a `ChannelMonitor`
+       /// update, we need to delay processing it until later. We do that here by simply storing the
+       /// closing_signed message and handling it in `maybe_propose_closing_signed`.
+       pending_counterparty_closing_signed: Option<msgs::ClosingSigned>,
+
        /// The hash of the block in which the funding transaction was included.
        funding_tx_confirmed_in: Option<BlockHash>,
        funding_tx_confirmation_height: u32,
@@ -695,6 +700,7 @@ impl<Signer: Sign> Channel<Signer> {
                        counterparty_max_commitment_tx_output: Mutex::new((channel_value_satoshis * 1000 - push_msat, push_msat)),
 
                        last_sent_closing_fee: None,
+                       pending_counterparty_closing_signed: None,
 
                        funding_tx_confirmed_in: None,
                        funding_tx_confirmation_height: 0,
@@ -954,6 +960,7 @@ impl<Signer: Sign> Channel<Signer> {
                        counterparty_max_commitment_tx_output: Mutex::new((msg.push_msat, msg.funding_satoshis * 1000 - msg.push_msat)),
 
                        last_sent_closing_fee: None,
+                       pending_counterparty_closing_signed: None,
 
                        funding_tx_confirmed_in: None,
                        funding_tx_confirmation_height: 0,
@@ -2373,9 +2380,8 @@ impl<Signer: Sign> Channel<Signer> {
                Ok(())
        }
 
-       pub fn commitment_signed<F: Deref, L: Deref>(&mut self, msg: &msgs::CommitmentSigned, fee_estimator: &F, logger: &L) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>, Option<msgs::ClosingSigned>, ChannelMonitorUpdate), (Option<ChannelMonitorUpdate>, ChannelError)>
-       where F::Target: FeeEstimator,
-                               L::Target: Logger
+       pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<(msgs::RevokeAndACK, Option<msgs::CommitmentSigned>, ChannelMonitorUpdate), (Option<ChannelMonitorUpdate>, ChannelError)>
+               where L::Target: Logger
        {
                if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) {
                        return Err((None, ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned())));
@@ -2541,12 +2547,10 @@ impl<Signer: Sign> Channel<Signer> {
                        }
                        log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
                                log_bytes!(self.channel_id));
-                       // TODO: Call maybe_propose_first_closing_signed on restoration (or call it here and
-                       // re-send the message on restoration)
                        return Err((Some(monitor_update), ChannelError::Ignore("Previous monitor update failure prevented generation of RAA".to_owned())));
                }
 
-               let (commitment_signed, closing_signed) = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
+               let commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
                        // If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok -
                        // we'll send one right away when we get the revoke_and_ack when we
                        // free_holding_cell_htlcs().
@@ -2555,10 +2559,8 @@ impl<Signer: Sign> Channel<Signer> {
                        // strictly increasing by one, so decrement it here.
                        self.latest_monitor_update_id = monitor_update.update_id;
                        monitor_update.updates.append(&mut additional_update.updates);
-                       (Some(msg), None)
-               } else if !need_commitment {
-                       (None, self.maybe_propose_first_closing_signed(fee_estimator))
-               } else { (None, None) };
+                       Some(msg)
+               } else { None };
 
                log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
                        log_bytes!(self.channel_id()), if commitment_signed.is_some() { " our own commitment_signed and" } else { "" });
@@ -2567,7 +2569,7 @@ impl<Signer: Sign> Channel<Signer> {
                        channel_id: self.channel_id,
                        per_commitment_secret,
                        next_per_commitment_point,
-               }, commitment_signed, closing_signed, monitor_update))
+               }, commitment_signed, monitor_update))
        }
 
        /// Public version of the below, checking relevant preconditions first.
@@ -2704,9 +2706,8 @@ impl<Signer: Sign> Channel<Signer> {
        /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
        /// generating an appropriate error *after* the channel state has been updated based on the
        /// revoke_and_ack message.
-       pub fn revoke_and_ack<F: Deref, L: Deref>(&mut self, msg: &msgs::RevokeAndACK, fee_estimator: &F, logger: &L) -> Result<(Option<msgs::CommitmentUpdate>, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Option<msgs::ClosingSigned>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>), ChannelError>
-               where F::Target: FeeEstimator,
-                                       L::Target: Logger,
+       pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Option<msgs::CommitmentUpdate>, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>), ChannelError>
+               where L::Target: Logger,
        {
                if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) {
                        return Err(ChannelError::Close("Got revoke/ACK message when channel was not in an operational state".to_owned()));
@@ -2887,7 +2888,7 @@ impl<Signer: Sign> Channel<Signer> {
                        self.monitor_pending_forwards.append(&mut to_forward_infos);
                        self.monitor_pending_failures.append(&mut revoked_htlcs);
                        log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
-                       return Ok((None, Vec::new(), Vec::new(), None, monitor_update, Vec::new()))
+                       return Ok((None, Vec::new(), Vec::new(), monitor_update, Vec::new()))
                }
 
                match self.free_holding_cell_htlcs(logger)? {
@@ -2906,7 +2907,7 @@ impl<Signer: Sign> Channel<Signer> {
                                self.latest_monitor_update_id = monitor_update.update_id;
                                monitor_update.updates.append(&mut additional_update.updates);
 
-                               Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, None, monitor_update, htlcs_to_fail))
+                               Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail))
                        },
                        (None, htlcs_to_fail) => {
                                if require_commitment {
@@ -2926,14 +2927,13 @@ impl<Signer: Sign> Channel<Signer> {
                                                update_fail_malformed_htlcs,
                                                update_fee: None,
                                                commitment_signed
-                                       }), to_forward_infos, revoked_htlcs, None, monitor_update, htlcs_to_fail))
+                                       }), to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail))
                                } else {
                                        log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
-                                       Ok((None, to_forward_infos, revoked_htlcs, self.maybe_propose_first_closing_signed(fee_estimator), monitor_update, htlcs_to_fail))
+                                       Ok((None, to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail))
                                }
                        }
                }
-
        }
 
        /// Adds a pending update to this channel. See the doc for send_htlc for
@@ -2988,6 +2988,7 @@ impl<Signer: Sign> Channel<Signer> {
                // Upon reconnect we have to start the closing_signed dance over, but shutdown messages
                // will be retransmitted.
                self.last_sent_closing_fee = None;
+               self.pending_counterparty_closing_signed = None;
 
                let mut inbound_drop_count = 0;
                self.pending_inbound_htlcs.retain(|htlc| {
@@ -3355,13 +3356,24 @@ impl<Signer: Sign> Channel<Signer> {
                }
        }
 
-       fn maybe_propose_first_closing_signed<F: Deref>(&mut self, fee_estimator: &F) -> Option<msgs::ClosingSigned>
-               where F::Target: FeeEstimator
+       pub fn maybe_propose_closing_signed<F: Deref, L: Deref>(&mut self, fee_estimator: &F, logger: &L)
+               -> Result<(Option<msgs::ClosingSigned>, Option<Transaction>), ChannelError>
+               where F::Target: FeeEstimator, L::Target: Logger
        {
-               if !self.is_outbound() || !self.pending_inbound_htlcs.is_empty() || !self.pending_outbound_htlcs.is_empty() ||
-                               self.channel_state & (BOTH_SIDES_SHUTDOWN_MASK | ChannelState::AwaitingRemoteRevoke as u32) != BOTH_SIDES_SHUTDOWN_MASK ||
+               if !self.pending_inbound_htlcs.is_empty() || !self.pending_outbound_htlcs.is_empty() ||
+                               self.channel_state &
+                                       (BOTH_SIDES_SHUTDOWN_MASK | ChannelState::AwaitingRemoteRevoke as u32 |
+                                        ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32)
+                               != BOTH_SIDES_SHUTDOWN_MASK ||
                                self.last_sent_closing_fee.is_some() || self.pending_update_fee.is_some() {
-                       return None;
+                       return Ok((None, None));
+               }
+
+               if !self.is_outbound() {
+                       if let Some(msg) = &self.pending_counterparty_closing_signed.take() {
+                               return self.closing_signed(fee_estimator, &msg);
+                       }
+                       return Ok((None, None));
                }
 
                let mut proposed_feerate = fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::Background);
@@ -3371,29 +3383,27 @@ impl<Signer: Sign> Channel<Signer> {
                assert!(self.shutdown_scriptpubkey.is_some());
                let tx_weight = self.get_closing_transaction_weight(Some(&self.get_closing_scriptpubkey()), Some(self.counterparty_shutdown_scriptpubkey.as_ref().unwrap()));
                let proposed_total_fee_satoshis = proposed_feerate as u64 * tx_weight / 1000;
+               log_trace!(logger, "Proposing initial closing signed for our counterparty with a feerate of {} sat/kWeight (= {} sats)",
+                       proposed_feerate, proposed_total_fee_satoshis);
 
                let (closing_tx, total_fee_satoshis) = self.build_closing_transaction(proposed_total_fee_satoshis, false);
                let sig = self.holder_signer
                        .sign_closing_transaction(&closing_tx, &self.secp_ctx)
-                       .ok();
-               assert!(closing_tx.get_weight() as u64 <= tx_weight);
-               if sig.is_none() { return None; }
+                       .map_err(|()| ChannelError::Close("Failed to get signature for closing transaction.".to_owned()))?;
 
-               self.last_sent_closing_fee = Some((proposed_feerate, total_fee_satoshis, sig.clone().unwrap()));
-               Some(msgs::ClosingSigned {
+               self.last_sent_closing_fee = Some((proposed_feerate, total_fee_satoshis, sig.clone()));
+               Ok((Some(msgs::ClosingSigned {
                        channel_id: self.channel_id,
                        fee_satoshis: total_fee_satoshis,
-                       signature: sig.unwrap(),
+                       signature: sig,
                        fee_range: None,
-               })
+               }), None))
        }
 
-       pub fn shutdown<F: Deref, K: Deref>(
-               &mut self, fee_estimator: &F, keys_provider: &K, their_features: &InitFeatures, msg: &msgs::Shutdown
-       ) -> Result<(Option<msgs::Shutdown>, Option<msgs::ClosingSigned>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
-       where
-               F::Target: FeeEstimator,
-               K::Target: KeysInterface<Signer = Signer>
+       pub fn shutdown<K: Deref>(
+               &mut self, keys_provider: &K, their_features: &InitFeatures, msg: &msgs::Shutdown
+       ) -> Result<(Option<msgs::Shutdown>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
+       where K::Target: KeysInterface<Signer = Signer>
        {
                if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
                        return Err(ChannelError::Close("Peer sent shutdown when we needed a channel_reestablish".to_owned()));
@@ -3481,7 +3491,7 @@ impl<Signer: Sign> Channel<Signer> {
                self.channel_state |= ChannelState::LocalShutdownSent as u32;
                self.update_time_counter += 1;
 
-               Ok((shutdown, self.maybe_propose_first_closing_signed(fee_estimator), monitor_update, dropped_outbound_htlcs))
+               Ok((shutdown, monitor_update, dropped_outbound_htlcs))
        }
 
        fn build_signed_closing_transaction(&self, tx: &mut Transaction, counterparty_sig: &Signature, sig: &Signature) {
@@ -3522,6 +3532,11 @@ impl<Signer: Sign> Channel<Signer> {
                        return Err(ChannelError::Close("Remote tried to send us a closing tx with > 21 million BTC fee".to_owned()));
                }
 
+               if self.channel_state & ChannelState::MonitorUpdateFailed as u32 != 0 {
+                       self.pending_counterparty_closing_signed = Some(msg.clone());
+                       return Ok((None, None));
+               }
+
                let funding_redeemscript = self.get_funding_redeemscript();
                let (mut closing_tx, used_total_fee) = self.build_closing_transaction(msg.fee_satoshis, false);
                if used_total_fee != msg.fee_satoshis {
@@ -5326,6 +5341,7 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel<Signer>
                        counterparty_max_commitment_tx_output: Mutex::new((0, 0)),
 
                        last_sent_closing_fee: None,
+                       pending_counterparty_closing_signed: None,
 
                        funding_tx_confirmed_in,
                        funding_tx_confirmation_height,
index a50c072b4cbd07b8740a3c9a69af54c01f44f3b1..f85fe21af02c78d14e4a0ea91692330fe584aa62 100644 (file)
@@ -3372,7 +3372,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                        if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" });
                                        }
 
-                                       let (shutdown, closing_signed, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.fee_estimator, &self.keys_manager, &their_features, &msg), channel_state, chan_entry);
+                                       let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.keys_manager, &their_features, &msg), channel_state, chan_entry);
                                        dropped_htlcs = htlcs;
 
                                        // Update the monitor with the shutdown script if necessary.
@@ -3393,13 +3393,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                        msg,
                                                });
                                        }
-                                       if let Some(msg) = closing_signed {
-                                               // TODO: Do not send this if the monitor update failed.
-                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
-                                                       node_id: *counterparty_node_id,
-                                                       msg,
-                                               });
-                                       }
 
                                        break Ok(());
                                },
@@ -3585,8 +3578,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                if chan.get().get_counterparty_node_id() != *counterparty_node_id {
                                        return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                }
-                               let (revoke_and_ack, commitment_signed, closing_signed, monitor_update) =
-                                       match chan.get_mut().commitment_signed(&msg, &self.fee_estimator, &self.logger) {
+                               let (revoke_and_ack, commitment_signed, monitor_update) =
+                                       match chan.get_mut().commitment_signed(&msg, &self.logger) {
                                                Err((None, e)) => try_chan_entry!(self, Err(e), channel_state, chan),
                                                Err((Some(update), e)) => {
                                                        assert!(chan.get().is_awaiting_monitor_update());
@@ -3598,7 +3591,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                        };
                                if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
                                        return_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some());
-                                       //TODO: Rebroadcast closing_signed if present on monitor update restoration
                                }
                                channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
                                        node_id: counterparty_node_id.clone(),
@@ -3617,12 +3609,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                },
                                        });
                                }
-                               if let Some(msg) = closing_signed {
-                                       channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
-                                               node_id: counterparty_node_id.clone(),
-                                               msg,
-                                       });
-                               }
                                Ok(())
                        },
                        hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -3678,12 +3664,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                break Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id));
                                        }
                                        let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update();
-                                       let (commitment_update, pending_forwards, pending_failures, closing_signed, monitor_update, htlcs_to_fail_in) =
-                                               break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), channel_state, chan);
+                                       let (commitment_update, pending_forwards, pending_failures, monitor_update, htlcs_to_fail_in) =
+                                               break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan);
                                        htlcs_to_fail = htlcs_to_fail_in;
                                        if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
                                                if was_frozen_for_monitor {
-                                                       assert!(commitment_update.is_none() && closing_signed.is_none() && pending_forwards.is_empty() && pending_failures.is_empty());
+                                                       assert!(commitment_update.is_none() && pending_forwards.is_empty() && pending_failures.is_empty());
                                                        break Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned()));
                                                } else {
                                                        if let Err(e) = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, commitment_update.is_some(), pending_forwards, pending_failures) {
@@ -3697,12 +3683,6 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                        updates,
                                                });
                                        }
-                                       if let Some(msg) = closing_signed {
-                                               channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
-                                                       node_id: counterparty_node_id.clone(),
-                                                       msg,
-                                               });
-                                       }
                                        break Ok((pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"), chan.get().get_funding_txo().unwrap()))
                                },
                                hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -3946,7 +3926,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                        });
                }
 
-               let has_update = has_monitor_update || !failed_htlcs.is_empty();
+               let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty();
                for (failures, channel_id) in failed_htlcs.drain(..) {
                        self.fail_holding_cell_htlcs(failures, channel_id);
                }
@@ -3958,6 +3938,63 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                has_update
        }
 
+       /// Check whether any channels have finished removing all pending updates after a shutdown
+       /// exchange and can now send a closing_signed.
+       /// Returns whether any closing_signed messages were generated.
+       fn maybe_generate_initial_closing_signed(&self) -> bool {
+               let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new();
+               let mut has_update = false;
+               {
+                       let mut channel_state_lock = self.channel_state.lock().unwrap();
+                       let channel_state = &mut *channel_state_lock;
+                       let by_id = &mut channel_state.by_id;
+                       let short_to_id = &mut channel_state.short_to_id;
+                       let pending_msg_events = &mut channel_state.pending_msg_events;
+
+                       by_id.retain(|channel_id, chan| {
+                               match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) {
+                                       Ok((msg_opt, tx_opt)) => {
+                                               if let Some(msg) = msg_opt {
+                                                       has_update = true;
+                                                       pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
+                                                               node_id: chan.get_counterparty_node_id(), msg,
+                                                       });
+                                               }
+                                               if let Some(tx) = tx_opt {
+                                                       // We're done with this channel. We got a closing_signed and sent back
+                                                       // a closing_signed with a closing transaction to broadcast.
+                                                       if let Some(short_id) = chan.get_short_channel_id() {
+                                                               short_to_id.remove(&short_id);
+                                                       }
+
+                                                       if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
+                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                       msg: update
+                                                               });
+                                                       }
+
+                                                       log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
+                                                       self.tx_broadcaster.broadcast_transaction(&tx);
+                                                       false
+                                               } else { true }
+                                       },
+                                       Err(e) => {
+                                               has_update = true;
+                                               let (close_channel, res) = convert_chan_err!(self, e, short_to_id, chan, channel_id);
+                                               handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
+                                               !close_channel
+                                       }
+                               }
+                       });
+               }
+
+               for (counterparty_node_id, err) in handle_errors.drain(..) {
+                       let _ = handle_error!(self, err, counterparty_node_id);
+               }
+
+               has_update
+       }
+
        /// Handle a list of channel failures during a block_connected or block_disconnected call,
        /// pushing the channel monitor update (if any) to the background events queue and removing the
        /// Channel object.
@@ -4111,6 +4148,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSend
                        if self.check_free_holding_cells() {
                                result = NotifyOption::DoPersist;
                        }
+                       if self.maybe_generate_initial_closing_signed() {
+                               result = NotifyOption::DoPersist;
+                       }
 
                        let mut pending_events = Vec::new();
                        let mut channel_state = self.channel_state.lock().unwrap();
index 1a20d86fb2b41369957705113eaab762b2b55641..ea1c8993067b7590d6f7e327102ff0d6f85cc9ab 100644 (file)
@@ -1011,19 +1011,19 @@ fn htlc_fail_async_shutdown() {
 
        let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
        assert_eq!(msg_events.len(), 2);
-       let node_0_closing_signed = match msg_events[0] {
+       match msg_events[0] {
+               MessageSendEvent::PaymentFailureNetworkUpdate { update: msgs::HTLCFailChannelUpdate::ChannelUpdateMessage { ref msg }} => {
+                       assert_eq!(msg.contents.short_channel_id, chan_1.0.contents.short_channel_id);
+               },
+               _ => panic!("Unexpected event"),
+       }
+       let node_0_closing_signed = match msg_events[1] {
                MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
                        assert_eq!(*node_id, nodes[1].node.get_our_node_id());
                        (*msg).clone()
                },
                _ => panic!("Unexpected event"),
        };
-       match msg_events[1] {
-               MessageSendEvent::PaymentFailureNetworkUpdate { update: msgs::HTLCFailChannelUpdate::ChannelUpdateMessage { ref msg }} => {
-                       assert_eq!(msg.contents.short_channel_id, chan_1.0.contents.short_channel_id);
-               },
-               _ => panic!("Unexpected event"),
-       }
 
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
        nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_closing_signed);
@@ -1140,7 +1140,23 @@ fn do_test_shutdown_rebroadcast(recv_count: u8) {
                let node_1_2nd_reestablish = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id());
 
                nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &node_1_2nd_reestablish);
-               let node_0_3rd_shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id());
+               let node_0_msgs = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(node_0_msgs.len(), 2);
+               let node_0_2nd_closing_signed = match node_0_msgs[1] {
+                       MessageSendEvent::SendClosingSigned { ref msg, .. } => {
+                               assert_eq!(node_0_closing_signed, *msg);
+                               msg.clone()
+                       },
+                       _ => panic!(),
+               };
+
+               let node_0_3rd_shutdown = match node_0_msgs[0] {
+                       MessageSendEvent::SendShutdown { ref msg, .. } => {
+                               assert_eq!(node_0_2nd_shutdown, *msg);
+                               msg.clone()
+                       },
+                       _ => panic!(),
+               };
                assert!(node_0_2nd_shutdown == node_0_3rd_shutdown);
 
                nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &node_0_2nd_reestablish);
@@ -1151,8 +1167,6 @@ fn do_test_shutdown_rebroadcast(recv_count: u8) {
                assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
 
                nodes[0].node.handle_shutdown(&nodes[1].node.get_our_node_id(), &InitFeatures::known(), &node_1_3rd_shutdown);
-               let node_0_2nd_closing_signed = get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id());
-               assert!(node_0_closing_signed == node_0_2nd_closing_signed);
 
                nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_2nd_closing_signed);
                let (_, node_1_closing_signed) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id());
@@ -9023,7 +9037,7 @@ fn test_update_err_monitor_lockdown() {
        assert_eq!(updates.update_fulfill_htlcs.len(), 1);
        nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
        if let Some(ref mut channel) = nodes[0].node.channel_state.lock().unwrap().by_id.get_mut(&chan_1.2) {
-               if let Ok((_, _, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].fee_estimator, &node_cfgs[0].logger) {
+               if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
                        if let Err(_) =  watchtower.chain_monitor.update_channel(outpoint, update.clone()) {} else { assert!(false); }
                        if let Ok(_) = nodes[0].chain_monitor.update_channel(outpoint, update) {} else { assert!(false); }
                } else { assert!(false); }
@@ -9117,7 +9131,7 @@ fn test_concurrent_monitor_claim() {
        assert_eq!(updates.update_add_htlcs.len(), 1);
        nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]);
        if let Some(ref mut channel) = nodes[0].node.channel_state.lock().unwrap().by_id.get_mut(&chan_1.2) {
-               if let Ok((_, _, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].fee_estimator, &node_cfgs[0].logger) {
+               if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
                        // Watchtower Alice should already have seen the block and reject the update
                        if let Err(_) =  watchtower_alice.chain_monitor.update_channel(outpoint, update.clone()) {} else { assert!(false); }
                        if let Ok(_) = watchtower_bob.chain_monitor.update_channel(outpoint, update.clone()) {} else { assert!(false); }