From: Matt Corallo Date: Mon, 19 Jul 2021 19:57:37 +0000 (+0000) Subject: Send initial closing_signed message asynchronously and handle errs X-Git-Tag: v0.0.100~1^2~6 X-Git-Url: http://git.bitcoin.ninja/index.cgi?p=rust-lightning;a=commitdiff_plain;h=67ddd46aeded274caf44cc7a3f9f61975ebb1dd7 Send initial closing_signed message asynchronously and handle errs When we added the support for external signing, many of the signing functions were allowed to return an error, closing the channel in such a case. `sign_closing_transaction` is one such function which can now return an error, except instead of handling it properly we'd simply never send a `closing_signed` message, hanging the channel until users intervene and force-close it. Piping the channel-closing error back through the various callsites (several of which already have pending results by the time they call `maybe_propose_first_closing_signed`) may be rather complicated, so instead we simply attempt to propose the initial `closing_signed` in `get_and_clear_pending_msg_events` like we do for holding-cell freeing. Further, since we now (possibly) generate a `ChannelMonitorUpdate` on `shutdown`, we may need to wait for monitor updating to complete before we can send a `closing_signed`, meaning we need to handle the send asynchronously anyway. This simplifies a few function interfaces and has no impact on behavior, aside from a few message-ordering edge-cases, as seen in the two small test changes required. --- diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 37bcabfc..ee9a4927 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -142,7 +142,7 @@ fn test_monitor_and_persister_update_fail() { assert_eq!(updates.update_fulfill_htlcs.len(), 1); nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); if let Some(ref mut channel) = nodes[0].node.channel_state.lock().unwrap().by_id.get_mut(&chan.2) { - if let Ok((_, _, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].fee_estimator, &node_cfgs[0].logger) { + if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { // Check that even though the persister is returning a TemporaryFailure, // because the update is bogus, ultimately the error that's returned // should be a PermanentFailure. @@ -2561,8 +2561,8 @@ fn test_reconnect_dup_htlc_claims() { #[test] fn test_temporary_error_during_shutdown() { - // Test that temporary failures when updating the monitor's shutdown script do not prevent - // cooperative close. + // Test that temporary failures when updating the monitor's shutdown script delay cooperative + // close. let mut config = test_default_channel_config(); config.channel_options.commit_upfront_shutdown_pubkey = false; @@ -2575,9 +2575,39 @@ fn test_temporary_error_during_shutdown() { *nodes[0].chain_monitor.update_ret.lock().unwrap() = Some(Err(ChannelMonitorUpdateErr::TemporaryFailure)); *nodes[1].chain_monitor.update_ret.lock().unwrap() = Some(Err(ChannelMonitorUpdateErr::TemporaryFailure)); - close_channel(&nodes[0], &nodes[1], &channel_id, funding_tx, false); - check_added_monitors!(nodes[0], 1); + + nodes[0].node.close_channel(&channel_id).unwrap(); + nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &InitFeatures::known(), &get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id())); check_added_monitors!(nodes[1], 1); + + nodes[0].node.handle_shutdown(&nodes[1].node.get_our_node_id(), &InitFeatures::known(), &get_event_msg!(nodes[1], MessageSendEvent::SendShutdown, nodes[0].node.get_our_node_id())); + check_added_monitors!(nodes[0], 1); + + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + + *nodes[0].chain_monitor.update_ret.lock().unwrap() = None; + *nodes[1].chain_monitor.update_ret.lock().unwrap() = None; + + let (outpoint, latest_update) = nodes[0].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[0].node.channel_monitor_updated(&outpoint, latest_update); + nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id())); + + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + + *nodes[1].chain_monitor.update_ret.lock().unwrap() = None; + let (outpoint, latest_update) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&channel_id).unwrap().clone(); + nodes[1].node.channel_monitor_updated(&outpoint, latest_update); + let (_, closing_signed_b) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id()); + nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &closing_signed_b.unwrap()); + let txn_b = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + + let (_, none_a) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id()); + assert!(none_a.is_none()); + let txn_a = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); + + assert_eq!(txn_a, txn_b); + assert_eq!(txn_a.len(), 1); + check_spends!(txn_a[0], funding_tx); } #[test] diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index d47b4e72..373bfef1 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -455,6 +455,11 @@ pub(super) struct Channel { last_sent_closing_fee: Option<(u32, u64, Signature)>, // (feerate, fee, holder_sig) + /// If our counterparty sent us a closing_signed while we were waiting for a `ChannelMonitor` + /// update, we need to delay processing it until later. We do that here by simply storing the + /// closing_signed message and handling it in `maybe_propose_closing_signed`. + pending_counterparty_closing_signed: Option, + /// The hash of the block in which the funding transaction was included. funding_tx_confirmed_in: Option, funding_tx_confirmation_height: u32, @@ -695,6 +700,7 @@ impl Channel { counterparty_max_commitment_tx_output: Mutex::new((channel_value_satoshis * 1000 - push_msat, push_msat)), last_sent_closing_fee: None, + pending_counterparty_closing_signed: None, funding_tx_confirmed_in: None, funding_tx_confirmation_height: 0, @@ -954,6 +960,7 @@ impl Channel { counterparty_max_commitment_tx_output: Mutex::new((msg.push_msat, msg.funding_satoshis * 1000 - msg.push_msat)), last_sent_closing_fee: None, + pending_counterparty_closing_signed: None, funding_tx_confirmed_in: None, funding_tx_confirmation_height: 0, @@ -2373,9 +2380,8 @@ impl Channel { Ok(()) } - pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, fee_estimator: &F, logger: &L) -> Result<(msgs::RevokeAndACK, Option, Option, ChannelMonitorUpdate), (Option, ChannelError)> - where F::Target: FeeEstimator, - L::Target: Logger + pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<(msgs::RevokeAndACK, Option, ChannelMonitorUpdate), (Option, ChannelError)> + where L::Target: Logger { if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) { return Err((None, ChannelError::Close("Got commitment signed message when channel was not in an operational state".to_owned()))); @@ -2541,12 +2547,10 @@ impl Channel { } log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id)); - // TODO: Call maybe_propose_first_closing_signed on restoration (or call it here and - // re-send the message on restoration) return Err((Some(monitor_update), ChannelError::Ignore("Previous monitor update failure prevented generation of RAA".to_owned()))); } - let (commitment_signed, closing_signed) = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { + let commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { // If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok - // we'll send one right away when we get the revoke_and_ack when we // free_holding_cell_htlcs(). @@ -2555,10 +2559,8 @@ impl Channel { // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - (Some(msg), None) - } else if !need_commitment { - (None, self.maybe_propose_first_closing_signed(fee_estimator)) - } else { (None, None) }; + Some(msg) + } else { None }; log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.", log_bytes!(self.channel_id()), if commitment_signed.is_some() { " our own commitment_signed and" } else { "" }); @@ -2567,7 +2569,7 @@ impl Channel { channel_id: self.channel_id, per_commitment_secret, next_per_commitment_point, - }, commitment_signed, closing_signed, monitor_update)) + }, commitment_signed, monitor_update)) } /// Public version of the below, checking relevant preconditions first. @@ -2704,9 +2706,8 @@ impl Channel { /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail, /// generating an appropriate error *after* the channel state has been updated based on the /// revoke_and_ack message. - pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, fee_estimator: &F, logger: &L) -> Result<(Option, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Option, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>), ChannelError> - where F::Target: FeeEstimator, - L::Target: Logger, + pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Option, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>), ChannelError> + where L::Target: Logger, { if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) { return Err(ChannelError::Close("Got revoke/ACK message when channel was not in an operational state".to_owned())); @@ -2887,7 +2888,7 @@ impl Channel { self.monitor_pending_forwards.append(&mut to_forward_infos); self.monitor_pending_failures.append(&mut revoked_htlcs); log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id())); - return Ok((None, Vec::new(), Vec::new(), None, monitor_update, Vec::new())) + return Ok((None, Vec::new(), Vec::new(), monitor_update, Vec::new())) } match self.free_holding_cell_htlcs(logger)? { @@ -2906,7 +2907,7 @@ impl Channel { self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, None, monitor_update, htlcs_to_fail)) + Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail)) }, (None, htlcs_to_fail) => { if require_commitment { @@ -2926,14 +2927,13 @@ impl Channel { update_fail_malformed_htlcs, update_fee: None, commitment_signed - }), to_forward_infos, revoked_htlcs, None, monitor_update, htlcs_to_fail)) + }), to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail)) } else { log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id())); - Ok((None, to_forward_infos, revoked_htlcs, self.maybe_propose_first_closing_signed(fee_estimator), monitor_update, htlcs_to_fail)) + Ok((None, to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail)) } } } - } /// Adds a pending update to this channel. See the doc for send_htlc for @@ -2988,6 +2988,7 @@ impl Channel { // Upon reconnect we have to start the closing_signed dance over, but shutdown messages // will be retransmitted. self.last_sent_closing_fee = None; + self.pending_counterparty_closing_signed = None; let mut inbound_drop_count = 0; self.pending_inbound_htlcs.retain(|htlc| { @@ -3355,13 +3356,24 @@ impl Channel { } } - fn maybe_propose_first_closing_signed(&mut self, fee_estimator: &F) -> Option - where F::Target: FeeEstimator + pub fn maybe_propose_closing_signed(&mut self, fee_estimator: &F, logger: &L) + -> Result<(Option, Option), ChannelError> + where F::Target: FeeEstimator, L::Target: Logger { - if !self.is_outbound() || !self.pending_inbound_htlcs.is_empty() || !self.pending_outbound_htlcs.is_empty() || - self.channel_state & (BOTH_SIDES_SHUTDOWN_MASK | ChannelState::AwaitingRemoteRevoke as u32) != BOTH_SIDES_SHUTDOWN_MASK || + if !self.pending_inbound_htlcs.is_empty() || !self.pending_outbound_htlcs.is_empty() || + self.channel_state & + (BOTH_SIDES_SHUTDOWN_MASK | ChannelState::AwaitingRemoteRevoke as u32 | + ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateFailed as u32) + != BOTH_SIDES_SHUTDOWN_MASK || self.last_sent_closing_fee.is_some() || self.pending_update_fee.is_some() { - return None; + return Ok((None, None)); + } + + if !self.is_outbound() { + if let Some(msg) = &self.pending_counterparty_closing_signed.take() { + return self.closing_signed(fee_estimator, &msg); + } + return Ok((None, None)); } let mut proposed_feerate = fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::Background); @@ -3371,29 +3383,27 @@ impl Channel { assert!(self.shutdown_scriptpubkey.is_some()); let tx_weight = self.get_closing_transaction_weight(Some(&self.get_closing_scriptpubkey()), Some(self.counterparty_shutdown_scriptpubkey.as_ref().unwrap())); let proposed_total_fee_satoshis = proposed_feerate as u64 * tx_weight / 1000; + log_trace!(logger, "Proposing initial closing signed for our counterparty with a feerate of {} sat/kWeight (= {} sats)", + proposed_feerate, proposed_total_fee_satoshis); let (closing_tx, total_fee_satoshis) = self.build_closing_transaction(proposed_total_fee_satoshis, false); let sig = self.holder_signer .sign_closing_transaction(&closing_tx, &self.secp_ctx) - .ok(); - assert!(closing_tx.get_weight() as u64 <= tx_weight); - if sig.is_none() { return None; } + .map_err(|()| ChannelError::Close("Failed to get signature for closing transaction.".to_owned()))?; - self.last_sent_closing_fee = Some((proposed_feerate, total_fee_satoshis, sig.clone().unwrap())); - Some(msgs::ClosingSigned { + self.last_sent_closing_fee = Some((proposed_feerate, total_fee_satoshis, sig.clone())); + Ok((Some(msgs::ClosingSigned { channel_id: self.channel_id, fee_satoshis: total_fee_satoshis, - signature: sig.unwrap(), + signature: sig, fee_range: None, - }) + }), None)) } - pub fn shutdown( - &mut self, fee_estimator: &F, keys_provider: &K, their_features: &InitFeatures, msg: &msgs::Shutdown - ) -> Result<(Option, Option, Option, Vec<(HTLCSource, PaymentHash)>), ChannelError> - where - F::Target: FeeEstimator, - K::Target: KeysInterface + pub fn shutdown( + &mut self, keys_provider: &K, their_features: &InitFeatures, msg: &msgs::Shutdown + ) -> Result<(Option, Option, Vec<(HTLCSource, PaymentHash)>), ChannelError> + where K::Target: KeysInterface { if self.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 { return Err(ChannelError::Close("Peer sent shutdown when we needed a channel_reestablish".to_owned())); @@ -3481,7 +3491,7 @@ impl Channel { self.channel_state |= ChannelState::LocalShutdownSent as u32; self.update_time_counter += 1; - Ok((shutdown, self.maybe_propose_first_closing_signed(fee_estimator), monitor_update, dropped_outbound_htlcs)) + Ok((shutdown, monitor_update, dropped_outbound_htlcs)) } fn build_signed_closing_transaction(&self, tx: &mut Transaction, counterparty_sig: &Signature, sig: &Signature) { @@ -3522,6 +3532,11 @@ impl Channel { return Err(ChannelError::Close("Remote tried to send us a closing tx with > 21 million BTC fee".to_owned())); } + if self.channel_state & ChannelState::MonitorUpdateFailed as u32 != 0 { + self.pending_counterparty_closing_signed = Some(msg.clone()); + return Ok((None, None)); + } + let funding_redeemscript = self.get_funding_redeemscript(); let (mut closing_tx, used_total_fee) = self.build_closing_transaction(msg.fee_satoshis, false); if used_total_fee != msg.fee_satoshis { @@ -5326,6 +5341,7 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel counterparty_max_commitment_tx_output: Mutex::new((0, 0)), last_sent_closing_fee: None, + pending_counterparty_closing_signed: None, funding_tx_confirmed_in, funding_tx_confirmation_height, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index a50c072b..f85fe21a 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3372,7 +3372,7 @@ impl ChannelMana if chan_entry.get().sent_shutdown() { " after we initiated shutdown" } else { "" }); } - let (shutdown, closing_signed, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.fee_estimator, &self.keys_manager, &their_features, &msg), channel_state, chan_entry); + let (shutdown, monitor_update, htlcs) = try_chan_entry!(self, chan_entry.get_mut().shutdown(&self.keys_manager, &their_features, &msg), channel_state, chan_entry); dropped_htlcs = htlcs; // Update the monitor with the shutdown script if necessary. @@ -3393,13 +3393,6 @@ impl ChannelMana msg, }); } - if let Some(msg) = closing_signed { - // TODO: Do not send this if the monitor update failed. - channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: *counterparty_node_id, - msg, - }); - } break Ok(()); }, @@ -3585,8 +3578,8 @@ impl ChannelMana if chan.get().get_counterparty_node_id() != *counterparty_node_id { return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } - let (revoke_and_ack, commitment_signed, closing_signed, monitor_update) = - match chan.get_mut().commitment_signed(&msg, &self.fee_estimator, &self.logger) { + let (revoke_and_ack, commitment_signed, monitor_update) = + match chan.get_mut().commitment_signed(&msg, &self.logger) { Err((None, e)) => try_chan_entry!(self, Err(e), channel_state, chan), Err((Some(update), e)) => { assert!(chan.get().is_awaiting_monitor_update()); @@ -3598,7 +3591,6 @@ impl ChannelMana }; if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { return_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::RevokeAndACKFirst, true, commitment_signed.is_some()); - //TODO: Rebroadcast closing_signed if present on monitor update restoration } channel_state.pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK { node_id: counterparty_node_id.clone(), @@ -3617,12 +3609,6 @@ impl ChannelMana }, }); } - if let Some(msg) = closing_signed { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: counterparty_node_id.clone(), - msg, - }); - } Ok(()) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) @@ -3678,12 +3664,12 @@ impl ChannelMana break Err(MsgHandleErrInternal::send_err_msg_no_close("Got a message for a channel from the wrong node!".to_owned(), msg.channel_id)); } let was_frozen_for_monitor = chan.get().is_awaiting_monitor_update(); - let (commitment_update, pending_forwards, pending_failures, closing_signed, monitor_update, htlcs_to_fail_in) = - break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), channel_state, chan); + let (commitment_update, pending_forwards, pending_failures, monitor_update, htlcs_to_fail_in) = + break_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), channel_state, chan); htlcs_to_fail = htlcs_to_fail_in; if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) { if was_frozen_for_monitor { - assert!(commitment_update.is_none() && closing_signed.is_none() && pending_forwards.is_empty() && pending_failures.is_empty()); + assert!(commitment_update.is_none() && pending_forwards.is_empty() && pending_failures.is_empty()); break Err(MsgHandleErrInternal::ignore_no_close("Previous monitor update failure prevented responses to RAA".to_owned())); } else { if let Err(e) = handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, commitment_update.is_some(), pending_forwards, pending_failures) { @@ -3697,12 +3683,6 @@ impl ChannelMana updates, }); } - if let Some(msg) = closing_signed { - channel_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { - node_id: counterparty_node_id.clone(), - msg, - }); - } break Ok((pending_forwards, pending_failures, chan.get().get_short_channel_id().expect("RAA should only work on a short-id-available channel"), chan.get().get_funding_txo().unwrap())) }, hash_map::Entry::Vacant(_) => break Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id)) @@ -3946,7 +3926,7 @@ impl ChannelMana }); } - let has_update = has_monitor_update || !failed_htlcs.is_empty(); + let has_update = has_monitor_update || !failed_htlcs.is_empty() || !handle_errors.is_empty(); for (failures, channel_id) in failed_htlcs.drain(..) { self.fail_holding_cell_htlcs(failures, channel_id); } @@ -3958,6 +3938,63 @@ impl ChannelMana has_update } + /// Check whether any channels have finished removing all pending updates after a shutdown + /// exchange and can now send a closing_signed. + /// Returns whether any closing_signed messages were generated. + fn maybe_generate_initial_closing_signed(&self) -> bool { + let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new(); + let mut has_update = false; + { + let mut channel_state_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_state_lock; + let by_id = &mut channel_state.by_id; + let short_to_id = &mut channel_state.short_to_id; + let pending_msg_events = &mut channel_state.pending_msg_events; + + by_id.retain(|channel_id, chan| { + match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) { + Ok((msg_opt, tx_opt)) => { + if let Some(msg) = msg_opt { + has_update = true; + pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { + node_id: chan.get_counterparty_node_id(), msg, + }); + } + if let Some(tx) = tx_opt { + // We're done with this channel. We got a closing_signed and sent back + // a closing_signed with a closing transaction to broadcast. + if let Some(short_id) = chan.get_short_channel_id() { + short_to_id.remove(&short_id); + } + + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + + log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); + self.tx_broadcaster.broadcast_transaction(&tx); + false + } else { true } + }, + Err(e) => { + has_update = true; + let (close_channel, res) = convert_chan_err!(self, e, short_to_id, chan, channel_id); + handle_errors.push((chan.get_counterparty_node_id(), Err(res))); + !close_channel + } + } + }); + } + + for (counterparty_node_id, err) in handle_errors.drain(..) { + let _ = handle_error!(self, err, counterparty_node_id); + } + + has_update + } + /// Handle a list of channel failures during a block_connected or block_disconnected call, /// pushing the channel monitor update (if any) to the background events queue and removing the /// Channel object. @@ -4111,6 +4148,9 @@ impl MessageSend if self.check_free_holding_cells() { result = NotifyOption::DoPersist; } + if self.maybe_generate_initial_closing_signed() { + result = NotifyOption::DoPersist; + } let mut pending_events = Vec::new(); let mut channel_state = self.channel_state.lock().unwrap(); diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 1a20d86f..ea1c8993 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -1011,19 +1011,19 @@ fn htlc_fail_async_shutdown() { let msg_events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(msg_events.len(), 2); - let node_0_closing_signed = match msg_events[0] { + match msg_events[0] { + MessageSendEvent::PaymentFailureNetworkUpdate { update: msgs::HTLCFailChannelUpdate::ChannelUpdateMessage { ref msg }} => { + assert_eq!(msg.contents.short_channel_id, chan_1.0.contents.short_channel_id); + }, + _ => panic!("Unexpected event"), + } + let node_0_closing_signed = match msg_events[1] { MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { assert_eq!(*node_id, nodes[1].node.get_our_node_id()); (*msg).clone() }, _ => panic!("Unexpected event"), }; - match msg_events[1] { - MessageSendEvent::PaymentFailureNetworkUpdate { update: msgs::HTLCFailChannelUpdate::ChannelUpdateMessage { ref msg }} => { - assert_eq!(msg.contents.short_channel_id, chan_1.0.contents.short_channel_id); - }, - _ => panic!("Unexpected event"), - } assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_closing_signed); @@ -1140,7 +1140,23 @@ fn do_test_shutdown_rebroadcast(recv_count: u8) { let node_1_2nd_reestablish = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReestablish, nodes[0].node.get_our_node_id()); nodes[0].node.handle_channel_reestablish(&nodes[1].node.get_our_node_id(), &node_1_2nd_reestablish); - let node_0_3rd_shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id()); + let node_0_msgs = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(node_0_msgs.len(), 2); + let node_0_2nd_closing_signed = match node_0_msgs[1] { + MessageSendEvent::SendClosingSigned { ref msg, .. } => { + assert_eq!(node_0_closing_signed, *msg); + msg.clone() + }, + _ => panic!(), + }; + + let node_0_3rd_shutdown = match node_0_msgs[0] { + MessageSendEvent::SendShutdown { ref msg, .. } => { + assert_eq!(node_0_2nd_shutdown, *msg); + msg.clone() + }, + _ => panic!(), + }; assert!(node_0_2nd_shutdown == node_0_3rd_shutdown); nodes[1].node.handle_channel_reestablish(&nodes[0].node.get_our_node_id(), &node_0_2nd_reestablish); @@ -1151,8 +1167,6 @@ fn do_test_shutdown_rebroadcast(recv_count: u8) { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); nodes[0].node.handle_shutdown(&nodes[1].node.get_our_node_id(), &InitFeatures::known(), &node_1_3rd_shutdown); - let node_0_2nd_closing_signed = get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id()); - assert!(node_0_closing_signed == node_0_2nd_closing_signed); nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_2nd_closing_signed); let (_, node_1_closing_signed) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id()); @@ -9023,7 +9037,7 @@ fn test_update_err_monitor_lockdown() { assert_eq!(updates.update_fulfill_htlcs.len(), 1); nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); if let Some(ref mut channel) = nodes[0].node.channel_state.lock().unwrap().by_id.get_mut(&chan_1.2) { - if let Ok((_, _, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].fee_estimator, &node_cfgs[0].logger) { + if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { if let Err(_) = watchtower.chain_monitor.update_channel(outpoint, update.clone()) {} else { assert!(false); } if let Ok(_) = nodes[0].chain_monitor.update_channel(outpoint, update) {} else { assert!(false); } } else { assert!(false); } @@ -9117,7 +9131,7 @@ fn test_concurrent_monitor_claim() { assert_eq!(updates.update_add_htlcs.len(), 1); nodes[0].node.handle_update_add_htlc(&nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]); if let Some(ref mut channel) = nodes[0].node.channel_state.lock().unwrap().by_id.get_mut(&chan_1.2) { - if let Ok((_, _, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].fee_estimator, &node_cfgs[0].logger) { + if let Ok((_, _, update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { // Watchtower Alice should already have seen the block and reject the update if let Err(_) = watchtower_alice.chain_monitor.update_channel(outpoint, update.clone()) {} else { assert!(false); } if let Ok(_) = watchtower_bob.chain_monitor.update_channel(outpoint, update.clone()) {} else { assert!(false); }