X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=392f3e1cb052cf196b50b2b7e3734b62ab167e4c;hb=4ab6c551a00ce167abfad2ea97310e8043e08375;hp=747a78e8c918309bb1580c5fee146e2ad884911e;hpb=971f7a7e42652d5697d761df6650ae9e6dcaa5db;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 747a78e8..392f3e1c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -64,7 +64,7 @@ use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, Maybe use crate::util::logger::{Level, Logger}; use crate::util::errors::APIError; -use alloc::collections::BTreeMap; +use alloc::collections::{btree_map, BTreeMap}; use crate::io; use crate::prelude::*; @@ -1201,6 +1201,12 @@ where /// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the /// Notifier the lock contains sends out a notification when the lock is released. total_consistency_lock: RwLock<()>, + /// Tracks the progress of channels going through batch funding by whether funding_signed was + /// received and the monitor has been persisted. + /// + /// This information does not need to be persisted as funding nodes can forget + /// unfunded channels upon disconnection. + funding_batch_states: Mutex>>, background_events_processed_since_startup: AtomicBool, @@ -1788,7 +1794,7 @@ macro_rules! handle_error { let mut msg_events = Vec::with_capacity(2); if let Some((shutdown_res, update_option)) = shutdown_finish { - $self.finish_force_close_channel(shutdown_res); + $self.finish_close_channel(shutdown_res); if let Some(update) = update_option { msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update @@ -2025,9 +2031,54 @@ macro_rules! handle_monitor_update_completion { } let channel_id = $chan.context.channel_id(); + let unbroadcasted_batch_funding_txid = $chan.context.unbroadcasted_batch_funding_txid(); core::mem::drop($peer_state_lock); core::mem::drop($per_peer_state_lock); + // If the channel belongs to a batch funding transaction, the progress of the batch + // should be updated as we have received funding_signed and persisted the monitor. + if let Some(txid) = unbroadcasted_batch_funding_txid { + let mut funding_batch_states = $self.funding_batch_states.lock().unwrap(); + let mut batch_completed = false; + if let Some(batch_state) = funding_batch_states.get_mut(&txid) { + let channel_state = batch_state.iter_mut().find(|(chan_id, pubkey, _)| ( + *chan_id == channel_id && + *pubkey == counterparty_node_id + )); + if let Some(channel_state) = channel_state { + channel_state.2 = true; + } else { + debug_assert!(false, "Missing channel batch state for channel which completed initial monitor update"); + } + batch_completed = batch_state.iter().all(|(_, _, completed)| *completed); + } else { + debug_assert!(false, "Missing batch state for channel which completed initial monitor update"); + } + + // When all channels in a batched funding transaction have become ready, it is not necessary + // to track the progress of the batch anymore and the state of the channels can be updated. + if batch_completed { + let removed_batch_state = funding_batch_states.remove(&txid).into_iter().flatten(); + let per_peer_state = $self.per_peer_state.read().unwrap(); + let mut batch_funding_tx = None; + for (channel_id, counterparty_node_id, _) in removed_batch_state { + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state = peer_state_mutex.lock().unwrap(); + if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(&channel_id) { + batch_funding_tx = batch_funding_tx.or_else(|| chan.context.unbroadcasted_funding()); + chan.set_batch_ready(); + let mut pending_events = $self.pending_events.lock().unwrap(); + emit_channel_pending_event!(pending_events, chan); + } + } + } + if let Some(tx) = batch_funding_tx { + log_info!($self.logger, "Broadcasting batch funding transaction with txid {}", tx.txid()); + $self.tx_broadcaster.broadcast_transactions(&[&tx]); + } + } + } + $self.handle_monitor_update_completion_actions(update_actions); if let Some(forwards) = htlc_forwards { @@ -2045,6 +2096,11 @@ macro_rules! handle_new_monitor_update { ($self: ident, $update_res: expr, $chan: expr, _internal, $completed: expr) => { { debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); match $update_res { + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!($self.logger, "{}", err_str); + panic!("{}", err_str); + }, ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", &$chan.context.channel_id()); @@ -2225,9 +2281,9 @@ where pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), background_events_processed_since_startup: AtomicBool::new(false), - event_persist_notifier: Notifier::new(), needs_persist_flag: AtomicBool::new(false), + funding_batch_states: Mutex::new(BTreeMap::new()), entropy_source, node_signer, @@ -2492,6 +2548,7 @@ where let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>; + let mut shutdown_result = None; loop { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -2506,6 +2563,7 @@ where if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { let funding_txo_opt = chan.context.get_funding_txo(); let their_features = &peer_state.latest_features; + let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid(); let (shutdown_msg, mut monitor_update_opt, htlcs) = chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; failed_htlcs = htlcs; @@ -2536,6 +2594,7 @@ where }); } self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed); + shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid)); } } break; @@ -2557,6 +2616,10 @@ where self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver); } + if let Some(shutdown_result) = shutdown_result { + self.finish_close_channel(shutdown_result); + } + Ok(()) } @@ -2621,9 +2684,14 @@ where self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script) } - #[inline] - fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) { - let (monitor_update_option, mut failed_htlcs) = shutdown_res; + fn finish_close_channel(&self, shutdown_res: ShutdownResult) { + debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread); + #[cfg(debug_assertions)] + for (_, peer) in self.per_peer_state.read().unwrap().iter() { + debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread); + } + + let (monitor_update_option, mut failed_htlcs, unbroadcasted_batch_funding_txid) = shutdown_res; log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len()); for htlc_source in failed_htlcs.drain(..) { let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source; @@ -2638,6 +2706,31 @@ where // ignore the result here. let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update); } + let mut shutdown_results = Vec::new(); + if let Some(txid) = unbroadcasted_batch_funding_txid { + let mut funding_batch_states = self.funding_batch_states.lock().unwrap(); + let affected_channels = funding_batch_states.remove(&txid).into_iter().flatten(); + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut has_uncompleted_channel = None; + for (channel_id, counterparty_node_id, state) in affected_channels { + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state = peer_state_mutex.lock().unwrap(); + if let Some(mut chan) = peer_state.channel_by_id.remove(&channel_id) { + update_maps_on_chan_removal!(self, &chan.context()); + self.issue_channel_close_events(&chan.context(), ClosureReason::FundingBatchClosure); + shutdown_results.push(chan.context_mut().force_shutdown(false)); + } + } + has_uncompleted_channel = Some(has_uncompleted_channel.map_or(!state, |v| v || !state)); + } + debug_assert!( + has_uncompleted_channel.unwrap_or(true), + "Closing a batch where all channels have completed initial monitor update", + ); + } + for shutdown_result in shutdown_results.drain(..) { + self.finish_close_channel(shutdown_result); + } } /// `peer_msg` should be set when we receive a message from a peer, but not set when the @@ -2648,8 +2741,7 @@ where let peer_state_mutex = per_peer_state.get(peer_node_id) .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?; let (update_opt, counterparty_node_id) = { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; + let mut peer_state = peer_state_mutex.lock().unwrap(); let closure_reason = if let Some(peer_msg) = peer_msg { ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) } } else { @@ -2659,13 +2751,15 @@ where log_error!(self.logger, "Force-closing channel {}", channel_id); self.issue_channel_close_events(&chan_phase_entry.get().context(), closure_reason); let mut chan_phase = remove_channel_phase!(self, chan_phase_entry); + mem::drop(peer_state); + mem::drop(per_peer_state); match chan_phase { ChannelPhase::Funded(mut chan) => { - self.finish_force_close_channel(chan.context.force_shutdown(broadcast)); + self.finish_close_channel(chan.context.force_shutdown(broadcast)); (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id()) }, ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => { - self.finish_force_close_channel(chan_phase.context_mut().force_shutdown(false)); + self.finish_close_channel(chan_phase.context_mut().force_shutdown(false)); // Unfunded channel has no update (None, chan_phase.context().get_counterparty_node_id()) }, @@ -2681,10 +2775,17 @@ where } }; if let Some(update) = update_opt { - let mut peer_state = peer_state_mutex.lock().unwrap(); - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); + // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if + // not try to broadcast it via whatever peer we have. + let per_peer_state = self.per_peer_state.read().unwrap(); + let a_peer_state_opt = per_peer_state.get(peer_node_id) + .ok_or(per_peer_state.values().next()); + if let Ok(a_peer_state_mutex) = a_peer_state_opt { + let mut a_peer_state = a_peer_state_mutex.lock().unwrap(); + a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } } Ok(counterparty_node_id) @@ -3519,13 +3620,13 @@ where /// /// See [`ChannelManager::send_preflight_probes`] for more information. pub fn send_spontaneous_preflight_probes( - &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32, + &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32, liquidity_limit_multiplier: Option, ) -> Result, ProbeSendFailure> { let payment_params = PaymentParameters::from_node_id(node_id, final_cltv_expiry_delta); - let route_params = RouteParameters { payment_params, final_value_msat: amount_msat }; + let route_params = RouteParameters::from_payment_params_and_value(payment_params, amount_msat); self.send_preflight_probes(route_params, liquidity_limit_multiplier) } @@ -3626,8 +3727,9 @@ where /// Handles the generation of a funding transaction, optionally (for tests) with a function /// which checks the correctness of the funding transaction given the associated channel. - fn funding_transaction_generated_intern, &Transaction) -> Result>( - &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput + fn funding_transaction_generated_intern, &Transaction) -> Result>( + &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, is_batch_funding: bool, + mut find_funding_output: FundingOutput, ) -> Result<(), APIError> { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -3639,7 +3741,7 @@ where Some(ChannelPhase::UnfundedOutboundV1(chan)) => { let funding_txo = find_funding_output(&chan, &funding_transaction)?; - let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger) + let funding_res = chan.get_funding_created(funding_transaction, funding_txo, is_batch_funding, &self.logger) .map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e { let channel_id = chan.context.channel_id(); let user_id = chan.context.get_user_id(); @@ -3695,7 +3797,7 @@ where #[cfg(test)] pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> { - self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |_, tx| { + self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, false, |_, tx| { Ok(OutPoint { txid: tx.txid(), index: output_index }) }) } @@ -3731,17 +3833,37 @@ where /// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady /// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed pub fn funding_transaction_generated(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> { + self.batch_funding_transaction_generated(&[(temporary_channel_id, counterparty_node_id)], funding_transaction) + } + + /// Call this upon creation of a batch funding transaction for the given channels. + /// + /// Return values are identical to [`Self::funding_transaction_generated`], respective to + /// each individual channel and transaction output. + /// + /// Do NOT broadcast the funding transaction yourself. This batch funding transcaction + /// will only be broadcast when we have safely received and persisted the counterparty's + /// signature for each channel. + /// + /// If there is an error, all channels in the batch are to be considered closed. + pub fn batch_funding_transaction_generated(&self, temporary_channels: &[(&ChannelId, &PublicKey)], funding_transaction: Transaction) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let mut result = Ok(()); if !funding_transaction.is_coin_base() { for inp in funding_transaction.input.iter() { if inp.witness.is_empty() { - return Err(APIError::APIMisuseError { + result = result.and(Err(APIError::APIMisuseError { err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned() - }); + })); } } } + if funding_transaction.output.len() > u16::max_value() as usize { + result = result.and(Err(APIError::APIMisuseError { + err: "Transaction had more than 2^16 outputs, which is not supported".to_owned() + })); + } { let height = self.best_block.read().unwrap().height(); // Transactions are evaluated as final by network mempools if their locktime is strictly @@ -3749,37 +3871,93 @@ where // node might not have perfect sync about their blockchain views. Thus, if the wallet // module is ahead of LDK, only allow one more block of headroom. if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 1 { - return Err(APIError::APIMisuseError { + result = result.and(Err(APIError::APIMisuseError { err: "Funding transaction absolute timelock is non-final".to_owned() - }); + })); } } - self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| { - if tx.output.len() > u16::max_value() as usize { - return Err(APIError::APIMisuseError { - err: "Transaction had more than 2^16 outputs, which is not supported".to_owned() - }); - } - let mut output_index = None; - let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh(); - for (idx, outp) in tx.output.iter().enumerate() { - if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() { - if output_index.is_some() { + let txid = funding_transaction.txid(); + let is_batch_funding = temporary_channels.len() > 1; + let mut funding_batch_states = if is_batch_funding { + Some(self.funding_batch_states.lock().unwrap()) + } else { + None + }; + let mut funding_batch_state = funding_batch_states.as_mut().and_then(|states| { + match states.entry(txid) { + btree_map::Entry::Occupied(_) => { + result = result.clone().and(Err(APIError::APIMisuseError { + err: "Batch funding transaction with the same txid already exists".to_owned() + })); + None + }, + btree_map::Entry::Vacant(vacant) => Some(vacant.insert(Vec::new())), + } + }); + for (channel_idx, &(temporary_channel_id, counterparty_node_id)) in temporary_channels.iter().enumerate() { + result = result.and_then(|_| self.funding_transaction_generated_intern( + temporary_channel_id, + counterparty_node_id, + funding_transaction.clone(), + is_batch_funding, + |chan, tx| { + let mut output_index = None; + let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh(); + for (idx, outp) in tx.output.iter().enumerate() { + if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() { + if output_index.is_some() { + return Err(APIError::APIMisuseError { + err: "Multiple outputs matched the expected script and value".to_owned() + }); + } + output_index = Some(idx as u16); + } + } + if output_index.is_none() { return Err(APIError::APIMisuseError { - err: "Multiple outputs matched the expected script and value".to_owned() + err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned() }); } - output_index = Some(idx as u16); + let outpoint = OutPoint { txid: tx.txid(), index: output_index.unwrap() }; + if let Some(funding_batch_state) = funding_batch_state.as_mut() { + funding_batch_state.push((outpoint.to_channel_id(), *counterparty_node_id, false)); + } + Ok(outpoint) + }) + ); + } + if let Err(ref e) = result { + // Remaining channels need to be removed on any error. + let e = format!("Error in transaction funding: {:?}", e); + let mut channels_to_remove = Vec::new(); + channels_to_remove.extend(funding_batch_states.as_mut() + .and_then(|states| states.remove(&txid)) + .into_iter().flatten() + .map(|(chan_id, node_id, _state)| (chan_id, node_id)) + ); + channels_to_remove.extend(temporary_channels.iter() + .map(|(&chan_id, &node_id)| (chan_id, node_id)) + ); + let mut shutdown_results = Vec::new(); + { + let per_peer_state = self.per_peer_state.read().unwrap(); + for (channel_id, counterparty_node_id) in channels_to_remove { + per_peer_state.get(&counterparty_node_id) + .map(|peer_state_mutex| peer_state_mutex.lock().unwrap()) + .and_then(|mut peer_state| peer_state.channel_by_id.remove(&channel_id)) + .map(|mut chan| { + update_maps_on_chan_removal!(self, &chan.context()); + self.issue_channel_close_events(&chan.context(), ClosureReason::ProcessingError { err: e.clone() }); + shutdown_results.push(chan.context_mut().force_shutdown(false)); + }); } } - if output_index.is_none() { - return Err(APIError::APIMisuseError { - err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned() - }); + for shutdown_result in shutdown_results.drain(..) { + self.finish_close_channel(shutdown_result); } - Ok(OutPoint { txid: tx.txid(), index: output_index.unwrap() }) - }) + } + result } /// Atomically applies partial updates to the [`ChannelConfig`] of the given channels. @@ -4622,8 +4800,9 @@ where let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new(); let mut timed_out_mpp_htlcs = Vec::new(); let mut pending_peers_awaiting_removal = Vec::new(); + let mut shutdown_channels = Vec::new(); - let process_unfunded_channel_tick = | + let mut process_unfunded_channel_tick = | chan_id: &ChannelId, context: &mut ChannelContext, unfunded_context: &mut UnfundedChannelContext, @@ -4636,7 +4815,7 @@ where "Force-closing pending channel with ID {} for not establishing in a timely manner", chan_id); update_maps_on_chan_removal!(self, &context); self.issue_channel_close_events(&context, ClosureReason::HolderForceClosed); - self.finish_force_close_channel(context.force_shutdown(false)); + shutdown_channels.push(context.force_shutdown(false)); pending_msg_events.push(MessageSendEvent::HandleError { node_id: counterparty_node_id, action: msgs::ErrorAction::SendErrorMessage { @@ -4829,6 +5008,10 @@ where let _ = handle_error!(self, err, counterparty_node_id); } + for shutdown_res in shutdown_channels { + self.finish_close_channel(shutdown_res); + } + self.pending_outbound_payments.remove_stale_payments(&self.pending_events); // Technically we don't need to do this here, but if we have holding cell entries in a @@ -4985,6 +5168,7 @@ where // This ensures that future code doesn't introduce a lock-order requirement for // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling // this function with any `per_peer_state` peer lock acquired would. + #[cfg(debug_assertions)] for (_, peer) in self.per_peer_state.read().unwrap().iter() { debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread); } @@ -5992,7 +6176,8 @@ where } fn internal_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) -> Result<(), MsgHandleErrInternal> { - let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)>; + let mut dropped_htlcs: Vec<(HTLCSource, PaymentHash)> = Vec::new(); + let mut finish_shutdown = None; { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -6037,8 +6222,7 @@ where log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id); self.issue_channel_close_events(&context, ClosureReason::CounterpartyCoopClosedUnfundedChannel); let mut chan = remove_channel_phase!(self, chan_phase_entry); - self.finish_force_close_channel(chan.context_mut().force_shutdown(false)); - return Ok(()); + finish_shutdown = Some(chan.context_mut().force_shutdown(false)); }, } } else { @@ -6050,11 +6234,16 @@ where let reason = HTLCFailReason::from_failure_code(0x4000 | 8); self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver); } + if let Some(shutdown_res) = finish_shutdown { + self.finish_close_channel(shutdown_res); + } Ok(()) } fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> { + let mut shutdown_result = None; + let unbroadcasted_batch_funding_txid; let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -6067,6 +6256,7 @@ where match peer_state.channel_by_id.entry(msg.channel_id.clone()) { hash_map::Entry::Occupied(mut chan_phase_entry) => { if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { + unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid(); let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry); if let Some(msg) = closing_signed { peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned { @@ -6103,6 +6293,11 @@ where }); } self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure); + shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid)); + } + mem::drop(per_peer_state); + if let Some(shutdown_result) = shutdown_result { + self.finish_close_channel(shutdown_result); } Ok(()) } @@ -6561,7 +6756,7 @@ where if were_node_one == msg_from_node_one { return Ok(NotifyOption::SkipPersistNoEvents); } else { - log_debug!(self.logger, "Received channel_update for channel {}.", chan_id); + log_debug!(self.logger, "Received channel_update {:?} for channel {}.", msg, chan_id); try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry); } } else { @@ -6707,7 +6902,7 @@ where } for failure in failed_channels.drain(..) { - self.finish_force_close_channel(failure); + self.finish_close_channel(failure); } has_pending_monitor_events @@ -6777,6 +6972,8 @@ where fn maybe_generate_initial_closing_signed(&self) -> bool { let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new(); let mut has_update = false; + let mut shutdown_result = None; + let mut unbroadcasted_batch_funding_txid = None; { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6787,6 +6984,7 @@ where peer_state.channel_by_id.retain(|channel_id, phase| { match phase { ChannelPhase::Funded(chan) => { + unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid(); match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) { Ok((msg_opt, tx_opt)) => { if let Some(msg) = msg_opt { @@ -6809,6 +7007,7 @@ where log_info!(self.logger, "Broadcasting {}", log_tx!(tx)); self.tx_broadcaster.broadcast_transactions(&[&tx]); update_maps_on_chan_removal!(self, &chan.context); + shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid)); false } else { true } }, @@ -6830,6 +7029,10 @@ where let _ = handle_error!(self, err, counterparty_node_id); } + if let Some(shutdown_result) = shutdown_result { + self.finish_close_channel(shutdown_result); + } + has_update } @@ -6855,7 +7058,7 @@ where counterparty_node_id, funding_txo, update }); } - self.finish_force_close_channel(failure); + self.finish_close_channel(failure); } } @@ -7795,7 +7998,6 @@ where fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { let _persistence_guard = PersistenceNotifierGuard::optionally_notify( self, || NotifyOption::SkipPersistHandleEvents); - let mut failed_channels = Vec::new(); let mut per_peer_state = self.per_peer_state.write().unwrap(); let remove_peer = { @@ -7808,24 +8010,24 @@ where peer_state.channel_by_id.retain(|_, phase| { let context = match phase { ChannelPhase::Funded(chan) => { - chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger); - // We only retain funded channels that are not shutdown. - if !chan.is_shutdown() { + if chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger).is_ok() { + // We only retain funded channels that are not shutdown. return true; } - &chan.context + &mut chan.context }, // Unfunded channels will always be removed. ChannelPhase::UnfundedOutboundV1(chan) => { - &chan.context + &mut chan.context }, ChannelPhase::UnfundedInboundV1(chan) => { - &chan.context + &mut chan.context }, }; // Clean up for removal. update_maps_on_chan_removal!(self, &context); self.issue_channel_close_events(&context, ClosureReason::DisconnectedPeer); + failed_channels.push(context.force_shutdown(false)); false }); // Note that we don't bother generating any events for pre-accept channels - @@ -7884,7 +8086,7 @@ where mem::drop(per_peer_state); for failure in failed_channels.drain(..) { - self.finish_force_close_channel(failure); + self.finish_close_channel(failure); } } @@ -8629,7 +8831,7 @@ where } number_of_funded_channels += peer_state.channel_by_id.iter().filter( - |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_initiated() } else { false } + |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_broadcast() } else { false } ).count(); } @@ -8640,7 +8842,7 @@ where let peer_state = &mut *peer_state_lock; for channel in peer_state.channel_by_id.iter().filter_map( |(_, phase)| if let ChannelPhase::Funded(channel) = phase { - if channel.context.is_funding_initiated() { Some(channel) } else { None } + if channel.context.is_funding_broadcast() { Some(channel) } else { None } } else { None } ) { channel.write(writer)?; @@ -9064,7 +9266,10 @@ where log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.", &channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number()); } - let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true); + let (monitor_update, mut new_failed_htlcs, batch_funding_txid) = channel.context.force_shutdown(true); + if batch_funding_txid.is_some() { + return Err(DecodeError::InvalidValue); + } if let Some((counterparty_node_id, funding_txo, update)) = monitor_update { close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update @@ -9104,7 +9309,7 @@ where if let Some(short_channel_id) = channel.context.get_short_channel_id() { short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id())); } - if channel.context.is_funding_initiated() { + if channel.context.is_funding_broadcast() { id_to_peer.insert(channel.context.channel_id(), channel.context.get_counterparty_node_id()); } match funded_peer_channels.entry(channel.context.get_counterparty_node_id()) { @@ -9469,6 +9674,7 @@ where pending_fee_msat: Some(path_fee), total_msat: path_amt, starting_block_height: best_block_height, + remaining_max_total_routing_fee_msat: None, // only used for retries, and we'll never retry on startup }); log_info!(args.logger, "Added a pending payment for {} msat with payment hash {} for path with session priv {}", path_amt, &htlc.payment_hash, log_bytes!(session_priv_bytes)); @@ -9817,6 +10023,8 @@ where event_persist_notifier: Notifier::new(), needs_persist_flag: AtomicBool::new(false), + funding_batch_states: Mutex::new(BTreeMap::new()), + entropy_source: args.entropy_source, node_signer: args.node_signer, signer_provider: args.signer_provider, @@ -10100,7 +10308,7 @@ mod tests { TEST_FINAL_CLTV, false), 100_000); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &(), &random_seed_bytes + None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -10134,7 +10342,7 @@ mod tests { let payment_preimage = PaymentPreimage([42; 32]); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &(), &random_seed_bytes + None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -10191,7 +10399,7 @@ mod tests { ); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &(), &random_seed_bytes + None, nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let payment_id_2 = PaymentId([45; 32]); nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), @@ -10242,7 +10450,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &(), &random_seed_bytes + nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]); @@ -10287,7 +10495,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &(), &random_seed_bytes + nodes[0].logger, &scorer, &Default::default(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]);