Merge pull request #2591 from TheBlueMatt/2023-09-2562-followups
authorElias Rohrer <dev@tnull.de>
Fri, 29 Sep 2023 17:08:36 +0000 (19:08 +0200)
committerGitHub <noreply@github.com>
Fri, 29 Sep 2023 17:08:36 +0000 (19:08 +0200)
Doc and comment followups to #2562

1  2 
lightning-persister/src/fs_store.rs
lightning/src/ln/channelmanager.rs

index 3475544849f9420398851818c0d805c4d7f48e6f,c8bd3f563fd22c8ae0d2e94660cce8339696be8b..c665d8083cb194e2934263e6eef982585bb37b67
@@@ -67,7 -67,7 +67,7 @@@ impl FilesystemStore 
                }
        }
  
 -      fn get_dest_dir_path(&self, namespace: &str, sub_namespace: &str) -> std::io::Result<PathBuf> {
 +      fn get_dest_dir_path(&self, primary_namespace: &str, secondary_namespace: &str) -> std::io::Result<PathBuf> {
                let mut dest_dir_path = {
                        #[cfg(target_os = "windows")]
                        {
@@@ -81,9 -81,9 +81,9 @@@
                        }
                };
  
 -              dest_dir_path.push(namespace);
 -              if !sub_namespace.is_empty() {
 -                      dest_dir_path.push(sub_namespace);
 +              dest_dir_path.push(primary_namespace);
 +              if !secondary_namespace.is_empty() {
 +                      dest_dir_path.push(secondary_namespace);
                }
  
                Ok(dest_dir_path)
  }
  
  impl KVStore for FilesystemStore {
 -      fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
 -              check_namespace_key_validity(namespace, sub_namespace, Some(key), "read")?;
 +      fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
 +              check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
  
 -              let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?;
 +              let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
                dest_file_path.push(key);
  
                let mut buf = Vec::new();
                Ok(buf)
        }
  
 -      fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
 -              check_namespace_key_validity(namespace, sub_namespace, Some(key), "write")?;
 +      fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
 +              check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
  
 -              let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?;
 +              let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
                dest_file_path.push(key);
  
                let parent_directory = dest_file_path
                res
        }
  
 -      fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> std::io::Result<()> {
 -              check_namespace_key_validity(namespace, sub_namespace, Some(key), "remove")?;
 +      fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> std::io::Result<()> {
 +              check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
  
 -              let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?;
 +              let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
                dest_file_path.push(key);
  
                if !dest_file_path.is_file() {
                Ok(())
        }
  
 -      fn list(&self, namespace: &str, sub_namespace: &str) -> std::io::Result<Vec<String>> {
 -              check_namespace_key_validity(namespace, sub_namespace, None, "list")?;
 +      fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> std::io::Result<Vec<String>> {
 +              check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
  
 -              let prefixed_dest = self.get_dest_dir_path(namespace, sub_namespace)?;
 +              let prefixed_dest = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
                let mut keys = Vec::new();
  
                if !Path::new(&prefixed_dest).exists() {
  
                        let metadata = p.metadata()?;
  
 -                      // We allow the presence of directories in the empty namespace and just skip them.
 +                      // We allow the presence of directories in the empty primary namespace and just skip them.
                        if metadata.is_dir() {
                                continue;
                        }
                        // If we otherwise don't find a file at the given path something went wrong.
                        if !metadata.is_file() {
                                debug_assert!(false, "Failed to list keys of {}/{}: file couldn't be accessed.",
 -                                      PrintableString(namespace), PrintableString(sub_namespace));
 +                                      PrintableString(primary_namespace), PrintableString(secondary_namespace));
                                let msg = format!("Failed to list keys of {}/{}: file couldn't be accessed.",
 -                                      PrintableString(namespace), PrintableString(sub_namespace));
 +                                      PrintableString(primary_namespace), PrintableString(secondary_namespace));
                                return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
                        }
  
                                                }
                                        } else {
                                                debug_assert!(false, "Failed to list keys of {}/{}: file path is not valid UTF-8",
 -                                                      PrintableString(namespace), PrintableString(sub_namespace));
 +                                                      PrintableString(primary_namespace), PrintableString(secondary_namespace));
                                                let msg = format!("Failed to list keys of {}/{}: file path is not valid UTF-8",
 -                                                      PrintableString(namespace), PrintableString(sub_namespace));
 +                                                      PrintableString(primary_namespace), PrintableString(secondary_namespace));
                                                return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
                                        }
                                }
                                Err(e) => {
                                        debug_assert!(false, "Failed to list keys of {}/{}: {}",
 -                                              PrintableString(namespace), PrintableString(sub_namespace), e);
 +                                              PrintableString(primary_namespace), PrintableString(secondary_namespace), e);
                                        let msg = format!("Failed to list keys of {}/{}: {}",
 -                                              PrintableString(namespace), PrintableString(sub_namespace), e);
 +                                              PrintableString(primary_namespace), PrintableString(secondary_namespace), e);
                                        return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
                                }
                        }
@@@ -436,7 -436,7 +436,7 @@@ mod tests 
        }
  
        // Test that if the store's path to channel data is read-only, writing a
-       // monitor to it results in the store returning an InProgress.
+       // monitor to it results in the store returning an UnrecoverableError.
        // Windows ignores the read-only flag for folders, so this test is Unix-only.
        #[cfg(not(target_os = "windows"))]
        #[test]
                let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
  
                // Set the store's directory to read-only, which should result in
-               // returning a permanent failure when we then attempt to persist a
+               // returning an unrecoverable failure when we then attempt to persist a
                // channel update.
                let path = &store.get_data_dir();
                let mut perms = fs::metadata(path).unwrap().permissions();
index 392f3e1cb052cf196b50b2b7e3734b62ab167e4c,23b45077af02bafb0ea53df1f55455260283cf66..00becff19d73c5e4a013ec2472addd4c2bdd8cc2
@@@ -64,7 -64,7 +64,7 @@@ use crate::util::ser::{BigSize, FixedLe
  use crate::util::logger::{Level, Logger};
  use crate::util::errors::APIError;
  
 -use alloc::collections::BTreeMap;
 +use alloc::collections::{btree_map, BTreeMap};
  
  use crate::io;
  use crate::prelude::*;
@@@ -1201,12 -1201,6 +1201,12 @@@ wher
        /// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
        /// Notifier the lock contains sends out a notification when the lock is released.
        total_consistency_lock: RwLock<()>,
 +      /// Tracks the progress of channels going through batch funding by whether funding_signed was
 +      /// received and the monitor has been persisted.
 +      ///
 +      /// This information does not need to be persisted as funding nodes can forget
 +      /// unfunded channels upon disconnection.
 +      funding_batch_states: Mutex<BTreeMap<Txid, Vec<(ChannelId, PublicKey, bool)>>>,
  
        background_events_processed_since_startup: AtomicBool,
  
@@@ -1794,7 -1788,7 +1794,7 @@@ macro_rules! handle_error 
                                let mut msg_events = Vec::with_capacity(2);
  
                                if let Some((shutdown_res, update_option)) = shutdown_finish {
 -                                      $self.finish_force_close_channel(shutdown_res);
 +                                      $self.finish_close_channel(shutdown_res);
                                        if let Some(update) = update_option {
                                                msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
@@@ -2031,54 -2025,9 +2031,54 @@@ macro_rules! handle_monitor_update_comp
                }
  
                let channel_id = $chan.context.channel_id();
 +              let unbroadcasted_batch_funding_txid = $chan.context.unbroadcasted_batch_funding_txid();
                core::mem::drop($peer_state_lock);
                core::mem::drop($per_peer_state_lock);
  
 +              // If the channel belongs to a batch funding transaction, the progress of the batch
 +              // should be updated as we have received funding_signed and persisted the monitor.
 +              if let Some(txid) = unbroadcasted_batch_funding_txid {
 +                      let mut funding_batch_states = $self.funding_batch_states.lock().unwrap();
 +                      let mut batch_completed = false;
 +                      if let Some(batch_state) = funding_batch_states.get_mut(&txid) {
 +                              let channel_state = batch_state.iter_mut().find(|(chan_id, pubkey, _)| (
 +                                      *chan_id == channel_id &&
 +                                      *pubkey == counterparty_node_id
 +                              ));
 +                              if let Some(channel_state) = channel_state {
 +                                      channel_state.2 = true;
 +                              } else {
 +                                      debug_assert!(false, "Missing channel batch state for channel which completed initial monitor update");
 +                              }
 +                              batch_completed = batch_state.iter().all(|(_, _, completed)| *completed);
 +                      } else {
 +                              debug_assert!(false, "Missing batch state for channel which completed initial monitor update");
 +                      }
 +
 +                      // When all channels in a batched funding transaction have become ready, it is not necessary
 +                      // to track the progress of the batch anymore and the state of the channels can be updated.
 +                      if batch_completed {
 +                              let removed_batch_state = funding_batch_states.remove(&txid).into_iter().flatten();
 +                              let per_peer_state = $self.per_peer_state.read().unwrap();
 +                              let mut batch_funding_tx = None;
 +                              for (channel_id, counterparty_node_id, _) in removed_batch_state {
 +                                      if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
 +                                              let mut peer_state = peer_state_mutex.lock().unwrap();
 +                                              if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(&channel_id) {
 +                                                      batch_funding_tx = batch_funding_tx.or_else(|| chan.context.unbroadcasted_funding());
 +                                                      chan.set_batch_ready();
 +                                                      let mut pending_events = $self.pending_events.lock().unwrap();
 +                                                      emit_channel_pending_event!(pending_events, chan);
 +                                              }
 +                                      }
 +                              }
 +                              if let Some(tx) = batch_funding_tx {
 +                                      log_info!($self.logger, "Broadcasting batch funding transaction with txid {}", tx.txid());
 +                                      $self.tx_broadcaster.broadcast_transactions(&[&tx]);
 +                              }
 +                      }
 +              }
 +
                $self.handle_monitor_update_completion_actions(update_actions);
  
                if let Some(forwards) = htlc_forwards {
@@@ -2281,9 -2230,9 +2281,9 @@@ wher
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
 -
                        event_persist_notifier: Notifier::new(),
                        needs_persist_flag: AtomicBool::new(false),
 +                      funding_batch_states: Mutex::new(BTreeMap::new()),
  
                        entropy_source,
                        node_signer,
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
  
                let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
 +              let mut shutdown_result = None;
                loop {
                        let per_peer_state = self.per_peer_state.read().unwrap();
  
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
                                                let funding_txo_opt = chan.context.get_funding_txo();
                                                let their_features = &peer_state.latest_features;
 +                                              let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                let (shutdown_msg, mut monitor_update_opt, htlcs) =
                                                        chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
                                                failed_htlcs = htlcs;
                                                                        });
                                                                }
                                                                self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
 +                                                              shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                                                        }
                                                }
                                                break;
                        self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
                }
  
 +              if let Some(shutdown_result) = shutdown_result {
 +                      self.finish_close_channel(shutdown_result);
 +              }
 +
                Ok(())
        }
  
                self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script)
        }
  
 -      fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) {
 +      fn finish_close_channel(&self, shutdown_res: ShutdownResult) {
                debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
                #[cfg(debug_assertions)]
                for (_, peer) in self.per_peer_state.read().unwrap().iter() {
                        debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
                }
  
 -              let (monitor_update_option, mut failed_htlcs) = shutdown_res;
 +              let (monitor_update_option, mut failed_htlcs, unbroadcasted_batch_funding_txid) = shutdown_res;
                log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len());
                for htlc_source in failed_htlcs.drain(..) {
                        let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
                        // ignore the result here.
                        let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
                }
 +              let mut shutdown_results = Vec::new();
 +              if let Some(txid) = unbroadcasted_batch_funding_txid {
 +                      let mut funding_batch_states = self.funding_batch_states.lock().unwrap();
 +                      let affected_channels = funding_batch_states.remove(&txid).into_iter().flatten();
 +                      let per_peer_state = self.per_peer_state.read().unwrap();
 +                      let mut has_uncompleted_channel = None;
 +                      for (channel_id, counterparty_node_id, state) in affected_channels {
 +                              if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
 +                                      let mut peer_state = peer_state_mutex.lock().unwrap();
 +                                      if let Some(mut chan) = peer_state.channel_by_id.remove(&channel_id) {
 +                                              update_maps_on_chan_removal!(self, &chan.context());
 +                                              self.issue_channel_close_events(&chan.context(), ClosureReason::FundingBatchClosure);
 +                                              shutdown_results.push(chan.context_mut().force_shutdown(false));
 +                                      }
 +                              }
 +                              has_uncompleted_channel = Some(has_uncompleted_channel.map_or(!state, |v| v || !state));
 +                      }
 +                      debug_assert!(
 +                              has_uncompleted_channel.unwrap_or(true),
 +                              "Closing a batch where all channels have completed initial monitor update",
 +                      );
 +              }
 +              for shutdown_result in shutdown_results.drain(..) {
 +                      self.finish_close_channel(shutdown_result);
 +              }
        }
  
        /// `peer_msg` should be set when we receive a message from a peer, but not set when the
                                mem::drop(per_peer_state);
                                match chan_phase {
                                        ChannelPhase::Funded(mut chan) => {
 -                                              self.finish_force_close_channel(chan.context.force_shutdown(broadcast));
 +                                              self.finish_close_channel(chan.context.force_shutdown(broadcast));
                                                (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id())
                                        },
                                        ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => {
 -                                              self.finish_force_close_channel(chan_phase.context_mut().force_shutdown(false));
 +                                              self.finish_close_channel(chan_phase.context_mut().force_shutdown(false));
                                                // Unfunded channel has no update
                                                (None, chan_phase.context().get_counterparty_node_id())
                                        },
        /// In general, a path may raise:
        ///  * [`APIError::InvalidRoute`] when an invalid route or forwarding parameter (cltv_delta, fee,
        ///    node public key) is specified.
-       ///  * [`APIError::ChannelUnavailable`] if the next-hop channel is not available for updates
-       ///    (including due to previous monitor update failure or new permanent monitor update
-       ///    failure).
+       ///  * [`APIError::ChannelUnavailable`] if the next-hop channel is not available as it has been
+       ///    closed, doesn't exist, or the peer is currently disconnected.
        ///  * [`APIError::MonitorUpdateInProgress`] if a new monitor update failure prevented sending the
        ///    relevant updates.
        ///
        ///
        /// See [`ChannelManager::send_preflight_probes`] for more information.
        pub fn send_spontaneous_preflight_probes(
 -              &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32, 
 +              &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32,
                liquidity_limit_multiplier: Option<u64>,
        ) -> Result<Vec<(PaymentHash, PaymentId)>, ProbeSendFailure> {
                let payment_params =
                        PaymentParameters::from_node_id(node_id, final_cltv_expiry_delta);
  
 -              let route_params = RouteParameters { payment_params, final_value_msat: amount_msat };
 +              let route_params = RouteParameters::from_payment_params_and_value(payment_params, amount_msat);
  
                self.send_preflight_probes(route_params, liquidity_limit_multiplier)
        }
  
        /// Handles the generation of a funding transaction, optionally (for tests) with a function
        /// which checks the correctness of the funding transaction given the associated channel.
 -      fn funding_transaction_generated_intern<FundingOutput: Fn(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
 -              &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput
 +      fn funding_transaction_generated_intern<FundingOutput: FnMut(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
 +              &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, is_batch_funding: bool,
 +              mut find_funding_output: FundingOutput,
        ) -> Result<(), APIError> {
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        Some(ChannelPhase::UnfundedOutboundV1(chan)) => {
                                let funding_txo = find_funding_output(&chan, &funding_transaction)?;
  
 -                              let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger)
 +                              let funding_res = chan.get_funding_created(funding_transaction, funding_txo, is_batch_funding, &self.logger)
                                        .map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e {
                                                let channel_id = chan.context.channel_id();
                                                let user_id = chan.context.get_user_id();
  
        #[cfg(test)]
        pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> {
 -              self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |_, tx| {
 +              self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, false, |_, tx| {
                        Ok(OutPoint { txid: tx.txid(), index: output_index })
                })
        }
        /// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady
        /// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed
        pub fn funding_transaction_generated(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> {
 +              self.batch_funding_transaction_generated(&[(temporary_channel_id, counterparty_node_id)], funding_transaction)
 +      }
 +
 +      /// Call this upon creation of a batch funding transaction for the given channels.
 +      ///
 +      /// Return values are identical to [`Self::funding_transaction_generated`], respective to
 +      /// each individual channel and transaction output.
 +      ///
 +      /// Do NOT broadcast the funding transaction yourself. This batch funding transcaction
 +      /// will only be broadcast when we have safely received and persisted the counterparty's
 +      /// signature for each channel.
 +      ///
 +      /// If there is an error, all channels in the batch are to be considered closed.
 +      pub fn batch_funding_transaction_generated(&self, temporary_channels: &[(&ChannelId, &PublicKey)], funding_transaction: Transaction) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 +              let mut result = Ok(());
  
                if !funding_transaction.is_coin_base() {
                        for inp in funding_transaction.input.iter() {
                                if inp.witness.is_empty() {
 -                                      return Err(APIError::APIMisuseError {
 +                                      result = result.and(Err(APIError::APIMisuseError {
                                                err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned()
 -                                      });
 +                                      }));
                                }
                        }
                }
 +              if funding_transaction.output.len() > u16::max_value() as usize {
 +                      result = result.and(Err(APIError::APIMisuseError {
 +                              err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
 +                      }));
 +              }
                {
                        let height = self.best_block.read().unwrap().height();
                        // Transactions are evaluated as final by network mempools if their locktime is strictly
                        // node might not have perfect sync about their blockchain views. Thus, if the wallet
                        // module is ahead of LDK, only allow one more block of headroom.
                        if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 1 {
 -                              return Err(APIError::APIMisuseError {
 +                              result = result.and(Err(APIError::APIMisuseError {
                                        err: "Funding transaction absolute timelock is non-final".to_owned()
 -                              });
 +                              }));
                        }
                }
 -              self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| {
 -                      if tx.output.len() > u16::max_value() as usize {
 -                              return Err(APIError::APIMisuseError {
 -                                      err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
 -                              });
 -                      }
  
 -                      let mut output_index = None;
 -                      let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
 -                      for (idx, outp) in tx.output.iter().enumerate() {
 -                              if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
 -                                      if output_index.is_some() {
 +              let txid = funding_transaction.txid();
 +              let is_batch_funding = temporary_channels.len() > 1;
 +              let mut funding_batch_states = if is_batch_funding {
 +                      Some(self.funding_batch_states.lock().unwrap())
 +              } else {
 +                      None
 +              };
 +              let mut funding_batch_state = funding_batch_states.as_mut().and_then(|states| {
 +                      match states.entry(txid) {
 +                              btree_map::Entry::Occupied(_) => {
 +                                      result = result.clone().and(Err(APIError::APIMisuseError {
 +                                              err: "Batch funding transaction with the same txid already exists".to_owned()
 +                                      }));
 +                                      None
 +                              },
 +                              btree_map::Entry::Vacant(vacant) => Some(vacant.insert(Vec::new())),
 +                      }
 +              });
 +              for (channel_idx, &(temporary_channel_id, counterparty_node_id)) in temporary_channels.iter().enumerate() {
 +                      result = result.and_then(|_| self.funding_transaction_generated_intern(
 +                              temporary_channel_id,
 +                              counterparty_node_id,
 +                              funding_transaction.clone(),
 +                              is_batch_funding,
 +                              |chan, tx| {
 +                                      let mut output_index = None;
 +                                      let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
 +                                      for (idx, outp) in tx.output.iter().enumerate() {
 +                                              if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
 +                                                      if output_index.is_some() {
 +                                                              return Err(APIError::APIMisuseError {
 +                                                                      err: "Multiple outputs matched the expected script and value".to_owned()
 +                                                              });
 +                                                      }
 +                                                      output_index = Some(idx as u16);
 +                                              }
 +                                      }
 +                                      if output_index.is_none() {
                                                return Err(APIError::APIMisuseError {
 -                                                      err: "Multiple outputs matched the expected script and value".to_owned()
 +                                                      err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
                                                });
                                        }
 -                                      output_index = Some(idx as u16);
 +                                      let outpoint = OutPoint { txid: tx.txid(), index: output_index.unwrap() };
 +                                      if let Some(funding_batch_state) = funding_batch_state.as_mut() {
 +                                              funding_batch_state.push((outpoint.to_channel_id(), *counterparty_node_id, false));
 +                                      }
 +                                      Ok(outpoint)
 +                              })
 +                      );
 +              }
 +              if let Err(ref e) = result {
 +                      // Remaining channels need to be removed on any error.
 +                      let e = format!("Error in transaction funding: {:?}", e);
 +                      let mut channels_to_remove = Vec::new();
 +                      channels_to_remove.extend(funding_batch_states.as_mut()
 +                              .and_then(|states| states.remove(&txid))
 +                              .into_iter().flatten()
 +                              .map(|(chan_id, node_id, _state)| (chan_id, node_id))
 +                      );
 +                      channels_to_remove.extend(temporary_channels.iter()
 +                              .map(|(&chan_id, &node_id)| (chan_id, node_id))
 +                      );
 +                      let mut shutdown_results = Vec::new();
 +                      {
 +                              let per_peer_state = self.per_peer_state.read().unwrap();
 +                              for (channel_id, counterparty_node_id) in channels_to_remove {
 +                                      per_peer_state.get(&counterparty_node_id)
 +                                              .map(|peer_state_mutex| peer_state_mutex.lock().unwrap())
 +                                              .and_then(|mut peer_state| peer_state.channel_by_id.remove(&channel_id))
 +                                              .map(|mut chan| {
 +                                                      update_maps_on_chan_removal!(self, &chan.context());
 +                                                      self.issue_channel_close_events(&chan.context(), ClosureReason::ProcessingError { err: e.clone() });
 +                                                      shutdown_results.push(chan.context_mut().force_shutdown(false));
 +                                              });
                                }
                        }
 -                      if output_index.is_none() {
 -                              return Err(APIError::APIMisuseError {
 -                                      err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
 -                              });
 +                      for shutdown_result in shutdown_results.drain(..) {
 +                              self.finish_close_channel(shutdown_result);
                        }
 -                      Ok(OutPoint { txid: tx.txid(), index: output_index.unwrap() })
 -              })
 +              }
 +              result
        }
  
        /// Atomically applies partial updates to the [`ChannelConfig`] of the given channels.
                        }
  
                        for shutdown_res in shutdown_channels {
 -                              self.finish_force_close_channel(shutdown_res);
 +                              self.finish_close_channel(shutdown_res);
                        }
  
                        self.pending_outbound_payments.remove_stale_payments(&self.pending_events);
                        self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
                }
                if let Some(shutdown_res) = finish_shutdown {
 -                      self.finish_force_close_channel(shutdown_res);
 +                      self.finish_close_channel(shutdown_res);
                }
  
                Ok(())
        }
  
        fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
 +              let mut shutdown_result = None;
 +              let unbroadcasted_batch_funding_txid;
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
                        match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_phase_entry) => {
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
 +                                              unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
                                                if let Some(msg) = closing_signed {
                                                        peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
                                });
                        }
                        self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure);
 +                      shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
 +              }
 +              mem::drop(per_peer_state);
 +              if let Some(shutdown_result) = shutdown_result {
 +                      self.finish_close_channel(shutdown_result);
                }
                Ok(())
        }
                                        if were_node_one == msg_from_node_one {
                                                return Ok(NotifyOption::SkipPersistNoEvents);
                                        } else {
 -                                              log_debug!(self.logger, "Received channel_update for channel {}.", chan_id);
 +                                              log_debug!(self.logger, "Received channel_update {:?} for channel {}.", msg, chan_id);
                                                try_chan_phase_entry!(self, chan.channel_update(&msg), chan_phase_entry);
                                        }
                                } else {
                }
  
                for failure in failed_channels.drain(..) {
 -                      self.finish_force_close_channel(failure);
 +                      self.finish_close_channel(failure);
                }
  
                has_pending_monitor_events
        fn maybe_generate_initial_closing_signed(&self) -> bool {
                let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new();
                let mut has_update = false;
 +              let mut shutdown_result = None;
 +              let mut unbroadcasted_batch_funding_txid = None;
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
  
                                peer_state.channel_by_id.retain(|channel_id, phase| {
                                        match phase {
                                                ChannelPhase::Funded(chan) => {
 +                                                      unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                        match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) {
                                                                Ok((msg_opt, tx_opt)) => {
                                                                        if let Some(msg) = msg_opt {
                                                                                log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
                                                                                self.tx_broadcaster.broadcast_transactions(&[&tx]);
                                                                                update_maps_on_chan_removal!(self, &chan.context);
 +                                                                              shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                                                                                false
                                                                        } else { true }
                                                                },
                        let _ = handle_error!(self, err, counterparty_node_id);
                }
  
 +              if let Some(shutdown_result) = shutdown_result {
 +                      self.finish_close_channel(shutdown_result);
 +              }
 +
                has_update
        }
  
                                                counterparty_node_id, funding_txo, update
                                        });
                        }
 -                      self.finish_force_close_channel(failure);
 +                      self.finish_close_channel(failure);
                }
        }
  
@@@ -7998,6 -7821,7 +7997,6 @@@ wher
        fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
                let _persistence_guard = PersistenceNotifierGuard::optionally_notify(
                        self, || NotifyOption::SkipPersistHandleEvents);
 -
                let mut failed_channels = Vec::new();
                let mut per_peer_state = self.per_peer_state.write().unwrap();
                let remove_peer = {
                                peer_state.channel_by_id.retain(|_, phase| {
                                        let context = match phase {
                                                ChannelPhase::Funded(chan) => {
 -                                                      chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
 -                                                      // We only retain funded channels that are not shutdown.
 -                                                      if !chan.is_shutdown() {
 +                                                      if chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger).is_ok() {
 +                                                              // We only retain funded channels that are not shutdown.
                                                                return true;
                                                        }
 -                                                      &chan.context
 +                                                      &mut chan.context
                                                },
                                                // Unfunded channels will always be removed.
                                                ChannelPhase::UnfundedOutboundV1(chan) => {
 -                                                      &chan.context
 +                                                      &mut chan.context
                                                },
                                                ChannelPhase::UnfundedInboundV1(chan) => {
 -                                                      &chan.context
 +                                                      &mut chan.context
                                                },
                                        };
                                        // Clean up for removal.
                                        update_maps_on_chan_removal!(self, &context);
                                        self.issue_channel_close_events(&context, ClosureReason::DisconnectedPeer);
 +                                      failed_channels.push(context.force_shutdown(false));
                                        false
                                });
                                // Note that we don't bother generating any events for pre-accept channels -
                mem::drop(per_peer_state);
  
                for failure in failed_channels.drain(..) {
 -                      self.finish_force_close_channel(failure);
 +                      self.finish_close_channel(failure);
                }
        }
  
@@@ -8831,7 -8655,7 +8830,7 @@@ wher
                                }
  
                                number_of_funded_channels += peer_state.channel_by_id.iter().filter(
 -                                      |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_initiated() } else { false }
 +                                      |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_broadcast() } else { false }
                                ).count();
                        }
  
                                let peer_state = &mut *peer_state_lock;
                                for channel in peer_state.channel_by_id.iter().filter_map(
                                        |(_, phase)| if let ChannelPhase::Funded(channel) = phase {
 -                                              if channel.context.is_funding_initiated() { Some(channel) } else { None }
 +                                              if channel.context.is_funding_broadcast() { Some(channel) } else { None }
                                        } else { None }
                                ) {
                                        channel.write(writer)?;
@@@ -9266,10 -9090,7 +9265,10 @@@ wher
                                                log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
                                                        &channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
                                        }
 -                                      let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true);
 +                                      let (monitor_update, mut new_failed_htlcs, batch_funding_txid) = channel.context.force_shutdown(true);
 +                                      if batch_funding_txid.is_some() {
 +                                              return Err(DecodeError::InvalidValue);
 +                                      }
                                        if let Some((counterparty_node_id, funding_txo, update)) = monitor_update {
                                                close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
                                                        counterparty_node_id, funding_txo, update
                                        if let Some(short_channel_id) = channel.context.get_short_channel_id() {
                                                short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
                                        }
 -                                      if channel.context.is_funding_initiated() {
 +                                      if channel.context.is_funding_broadcast() {
                                                id_to_peer.insert(channel.context.channel_id(), channel.context.get_counterparty_node_id());
                                        }
                                        match funded_peer_channels.entry(channel.context.get_counterparty_node_id()) {
                                                                                pending_fee_msat: Some(path_fee),
                                                                                total_msat: path_amt,
                                                                                starting_block_height: best_block_height,
 +                                                                              remaining_max_total_routing_fee_msat: None, // only used for retries, and we'll never retry on startup
                                                                        });
                                                                        log_info!(args.logger, "Added a pending payment for {} msat with payment hash {} for path with session priv {}",
                                                                                path_amt, &htlc.payment_hash,  log_bytes!(session_priv_bytes));
                        event_persist_notifier: Notifier::new(),
                        needs_persist_flag: AtomicBool::new(false),
  
 +                      funding_batch_states: Mutex::new(BTreeMap::new()),
 +
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,
                        signer_provider: args.signer_provider,