Batch funding for v1 channel establishments
authorWillem Van Lint <noreply@wvanlint.dev>
Sat, 29 Jul 2023 00:21:47 +0000 (17:21 -0700)
committerWillem Van Lint <noreply@wvanlint.dev>
Tue, 26 Sep 2023 20:37:14 +0000 (13:37 -0700)
lightning/src/events/mod.rs
lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/reload_tests.rs

index bb98e271597309d057ca4712b394e302b35cddc3..269887a3dbac06fc1ba24dcda084f5c2c2115de2 100644 (file)
@@ -199,6 +199,9 @@ pub enum ClosureReason {
        /// The counterparty requested a cooperative close of a channel that had not been funded yet.
        /// The channel has been immediately closed.
        CounterpartyCoopClosedUnfundedChannel,
+       /// Another channel in the same funding batch closed before the funding transaction
+       /// was ready to be broadcast.
+       FundingBatchClosure,
 }
 
 impl core::fmt::Display for ClosureReason {
@@ -219,6 +222,7 @@ impl core::fmt::Display for ClosureReason {
                        ClosureReason::DisconnectedPeer => f.write_str("the peer disconnected prior to the channel being funded"),
                        ClosureReason::OutdatedChannelManager => f.write_str("the ChannelManager read from disk was stale compared to ChannelMonitor(s)"),
                        ClosureReason::CounterpartyCoopClosedUnfundedChannel => f.write_str("the peer requested the unfunded channel be closed"),
+                       ClosureReason::FundingBatchClosure => f.write_str("another channel in the same funding batch closed"),
                }
        }
 }
@@ -233,6 +237,7 @@ impl_writeable_tlv_based_enum_upgradable!(ClosureReason,
        (10, DisconnectedPeer) => {},
        (12, OutdatedChannelManager) => {},
        (13, CounterpartyCoopClosedUnfundedChannel) => {},
+       (15, FundingBatchClosure) => {}
 );
 
 /// Intended destination of a failed HTLC as indicated in [`Event::HTLCHandlingFailed`].
@@ -844,6 +849,8 @@ pub enum Event {
        },
        /// Used to indicate to the user that they can abandon the funding transaction and recycle the
        /// inputs for another purpose.
+       ///
+       /// This event is not guaranteed to be generated for channels that are closed due to a restart.
        DiscardFunding {
                /// The channel_id of the channel which has been closed.
                channel_id: ChannelId,
index 796c041f8185d78d68972370726fdccb96fb4fd5..d14a868539f68f173fcb79e481e2056aa3f8f090 100644 (file)
@@ -300,9 +300,24 @@ enum ChannelState {
        /// We've successfully negotiated a closing_signed dance. At this point ChannelManager is about
        /// to drop us, but we store this anyway.
        ShutdownComplete = 4096,
+       /// Flag which is set on `FundingSent` to indicate this channel is funded in a batch and the
+       /// broadcasting of the funding transaction is being held until all channels in the batch
+       /// have received funding_signed and have their monitors persisted.
+       WaitingForBatch = 1 << 13,
 }
-const BOTH_SIDES_SHUTDOWN_MASK: u32 = ChannelState::LocalShutdownSent as u32 | ChannelState::RemoteShutdownSent as u32;
-const MULTI_STATE_FLAGS: u32 = BOTH_SIDES_SHUTDOWN_MASK | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32;
+const BOTH_SIDES_SHUTDOWN_MASK: u32 =
+       ChannelState::LocalShutdownSent as u32 |
+       ChannelState::RemoteShutdownSent as u32;
+const MULTI_STATE_FLAGS: u32 =
+       BOTH_SIDES_SHUTDOWN_MASK |
+       ChannelState::PeerDisconnected as u32 |
+       ChannelState::MonitorUpdateInProgress as u32;
+const STATE_FLAGS: u32 =
+       MULTI_STATE_FLAGS |
+       ChannelState::TheirChannelReady as u32 |
+       ChannelState::OurChannelReady as u32 |
+       ChannelState::AwaitingRemoteRevoke as u32 |
+       ChannelState::WaitingForBatch as u32;
 
 pub const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1;
 
@@ -527,12 +542,15 @@ pub(super) struct ReestablishResponses {
 
 /// The return type of `force_shutdown`
 ///
-/// Contains a (counterparty_node_id, funding_txo, [`ChannelMonitorUpdate`]) tuple
-/// followed by a list of HTLCs to fail back in the form of the (source, payment hash, and this
-/// channel's counterparty_node_id and channel_id).
+/// Contains a tuple with the following:
+/// - An optional (counterparty_node_id, funding_txo, [`ChannelMonitorUpdate`]) tuple
+/// - A list of HTLCs to fail back in the form of the (source, payment hash, and this channel's
+/// counterparty_node_id and channel_id).
+/// - An optional transaction id identifying a corresponding batch funding transaction.
 pub(crate) type ShutdownResult = (
        Option<(PublicKey, OutPoint, ChannelMonitorUpdate)>,
-       Vec<(HTLCSource, PaymentHash, PublicKey, ChannelId)>
+       Vec<(HTLCSource, PaymentHash, PublicKey, ChannelId)>,
+       Option<Txid>
 );
 
 /// If the majority of the channels funds are to the fundee and the initiator holds only just
@@ -821,6 +839,7 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
 
        pub(crate) channel_transaction_parameters: ChannelTransactionParameters,
        funding_transaction: Option<Transaction>,
+       is_batch_funding: Option<()>,
 
        counterparty_cur_commitment_point: Option<PublicKey>,
        counterparty_prev_commitment_point: Option<PublicKey>,
@@ -945,7 +964,7 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider  {
 
        /// Returns true if we've ever received a message from the remote end for this Channel
        pub fn have_received_message(&self) -> bool {
-               self.channel_state > (ChannelState::OurInitSent as u32)
+               self.channel_state & !STATE_FLAGS > (ChannelState::OurInitSent as u32)
        }
 
        /// Returns true if this channel is fully established and not known to be closing.
@@ -1161,7 +1180,7 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider  {
 
        // Checks whether we should emit a `ChannelPending` event.
        pub(crate) fn should_emit_channel_pending_event(&mut self) -> bool {
-               self.is_funding_initiated() && !self.channel_pending_event_emitted
+               self.is_funding_broadcast() && !self.channel_pending_event_emitted
        }
 
        // Returns whether we already emitted a `ChannelPending` event.
@@ -1220,9 +1239,11 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider  {
                did_channel_update
        }
 
-       /// Returns true if funding_created was sent/received.
-       pub fn is_funding_initiated(&self) -> bool {
-               self.channel_state >= ChannelState::FundingSent as u32
+       /// Returns true if funding_signed was sent/received and the
+       /// funding transaction has been broadcast if necessary.
+       pub fn is_funding_broadcast(&self) -> bool {
+               self.channel_state & !STATE_FLAGS >= ChannelState::FundingSent as u32 &&
+                       self.channel_state & ChannelState::WaitingForBatch as u32 == 0
        }
 
        /// Transaction nomenclature is somewhat confusing here as there are many different cases - a
@@ -1952,15 +1973,41 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider  {
                res
        }
 
-       /// Returns transaction if there is pending funding transaction that is yet to broadcast
-       pub fn unbroadcasted_funding(&self) -> Option<Transaction> {
-               if self.channel_state & (ChannelState::FundingCreated as u32) != 0 {
-                       self.funding_transaction.clone()
+       fn if_unbroadcasted_funding<F, O>(&self, f: F) -> Option<O>
+               where F: Fn() -> Option<O> {
+               if self.channel_state & ChannelState::FundingCreated as u32 != 0 ||
+                  self.channel_state & ChannelState::WaitingForBatch as u32 != 0 {
+                       f()
                } else {
                        None
                }
        }
 
+       /// Returns the transaction if there is a pending funding transaction that is yet to be
+       /// broadcast.
+       pub fn unbroadcasted_funding(&self) -> Option<Transaction> {
+               self.if_unbroadcasted_funding(|| self.funding_transaction.clone())
+       }
+
+       /// Returns the transaction ID if there is a pending funding transaction that is yet to be
+       /// broadcast.
+       pub fn unbroadcasted_funding_txid(&self) -> Option<Txid> {
+               self.if_unbroadcasted_funding(||
+                       self.channel_transaction_parameters.funding_outpoint.map(|txo| txo.txid)
+               )
+       }
+
+       /// Returns whether the channel is funded in a batch.
+       pub fn is_batch_funding(&self) -> bool {
+               self.is_batch_funding.is_some()
+       }
+
+       /// Returns the transaction ID if there is a pending batch funding transaction that is yet to be
+       /// broadcast.
+       pub fn unbroadcasted_batch_funding_txid(&self) -> Option<Txid> {
+               self.unbroadcasted_funding_txid().filter(|_| self.is_batch_funding())
+       }
+
        /// Gets the latest commitment transaction and any dependent transactions for relay (forcing
        /// shutdown of this channel - no more calls into this Channel may be made afterwards except
        /// those explicitly stated to be allowed after shutdown completes, eg some simple getters).
@@ -2001,10 +2048,11 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider  {
                                }))
                        } else { None }
                } else { None };
+               let unbroadcasted_batch_funding_txid = self.unbroadcasted_batch_funding_txid();
 
                self.channel_state = ChannelState::ShutdownComplete as u32;
                self.update_time_counter += 1;
-               (monitor_update, dropped_outbound_htlcs)
+               (monitor_update, dropped_outbound_htlcs, unbroadcasted_batch_funding_txid)
        }
 }
 
@@ -2574,7 +2622,11 @@ impl<SP: Deref> Channel<SP> where
                        counterparty_initial_commitment_tx.to_countersignatory_value_sat(), logger);
 
                assert_eq!(self.context.channel_state & (ChannelState::MonitorUpdateInProgress as u32), 0); // We have no had any monitor(s) yet to fail update!
-               self.context.channel_state = ChannelState::FundingSent as u32;
+               if self.context.is_batch_funding() {
+                       self.context.channel_state = ChannelState::FundingSent as u32 | ChannelState::WaitingForBatch as u32;
+               } else {
+                       self.context.channel_state = ChannelState::FundingSent as u32;
+               }
                self.context.cur_holder_commitment_transaction_number -= 1;
                self.context.cur_counterparty_commitment_transaction_number -= 1;
 
@@ -2585,6 +2637,15 @@ impl<SP: Deref> Channel<SP> where
                Ok(channel_monitor)
        }
 
+       /// Updates the state of the channel to indicate that all channels in the batch have received
+       /// funding_signed and persisted their monitors.
+       /// The funding transaction is consequently allowed to be broadcast, and the channel can be
+       /// treated as a non-batch channel going forward.
+       pub fn set_batch_ready(&mut self) {
+               self.context.is_batch_funding = None;
+               self.context.channel_state &= !(ChannelState::WaitingForBatch as u32);
+       }
+
        /// Handles a channel_ready message from our peer. If we've already sent our channel_ready
        /// and the channel is now usable (and public), this may generate an announcement_signatures to
        /// reply with.
@@ -2612,7 +2673,13 @@ impl<SP: Deref> Channel<SP> where
 
                let non_shutdown_state = self.context.channel_state & (!MULTI_STATE_FLAGS);
 
-               if non_shutdown_state == ChannelState::FundingSent as u32 {
+               // Our channel_ready shouldn't have been sent if we are waiting for other channels in the
+               // batch, but we can receive channel_ready messages.
+               debug_assert!(
+                       non_shutdown_state & ChannelState::OurChannelReady as u32 == 0 ||
+                       non_shutdown_state & ChannelState::WaitingForBatch as u32 == 0
+               );
+               if non_shutdown_state & !(ChannelState::WaitingForBatch as u32) == ChannelState::FundingSent as u32 {
                        self.context.channel_state |= ChannelState::TheirChannelReady as u32;
                } else if non_shutdown_state == (ChannelState::FundingSent as u32 | ChannelState::OurChannelReady as u32) {
                        self.context.channel_state = ChannelState::ChannelReady as u32 | (self.context.channel_state & MULTI_STATE_FLAGS);
@@ -3111,7 +3178,7 @@ impl<SP: Deref> Channel<SP> where
        ) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>)
        where F::Target: FeeEstimator, L::Target: Logger
        {
-               if self.context.channel_state >= ChannelState::ChannelReady as u32 &&
+               if self.context.channel_state & !STATE_FLAGS >= ChannelState::ChannelReady as u32 &&
                   (self.context.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
                        self.free_holding_cell_htlcs(fee_estimator, logger)
                } else { (None, Vec::new()) }
@@ -3585,17 +3652,17 @@ impl<SP: Deref> Channel<SP> where
        /// resent.
        /// No further message handling calls may be made until a channel_reestablish dance has
        /// completed.
-       pub fn remove_uncommitted_htlcs_and_mark_paused<L: Deref>(&mut self, logger: &L)  where L::Target: Logger {
+       /// May return `Err(())`, which implies [`ChannelContext::force_shutdown`] should be called immediately.
+       pub fn remove_uncommitted_htlcs_and_mark_paused<L: Deref>(&mut self, logger: &L) -> Result<(), ()> where L::Target: Logger {
                assert_eq!(self.context.channel_state & ChannelState::ShutdownComplete as u32, 0);
-               if self.context.channel_state < ChannelState::FundingSent as u32 {
-                       self.context.channel_state = ChannelState::ShutdownComplete as u32;
-                       return;
+               if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
+                       return Err(());
                }
 
                if self.context.channel_state & (ChannelState::PeerDisconnected as u32) == (ChannelState::PeerDisconnected as u32) {
                        // While the below code should be idempotent, it's simpler to just return early, as
                        // redundant disconnect events can fire, though they should be rare.
-                       return;
+                       return Ok(());
                }
 
                if self.context.announcement_sigs_state == AnnouncementSigsState::MessageSent || self.context.announcement_sigs_state == AnnouncementSigsState::Committed {
@@ -3656,6 +3723,7 @@ impl<SP: Deref> Channel<SP> where
 
                self.context.channel_state |= ChannelState::PeerDisconnected as u32;
                log_trace!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops on channel {}", inbound_drop_count, &self.context.channel_id());
+               Ok(())
        }
 
        /// Indicates that a ChannelMonitor update is in progress and has not yet been fully persisted.
@@ -3701,12 +3769,12 @@ impl<SP: Deref> Channel<SP> where
                // (re-)broadcast the funding transaction as we may have declined to broadcast it when we
                // first received the funding_signed.
                let mut funding_broadcastable =
-                       if self.context.is_outbound() && self.context.channel_state & !MULTI_STATE_FLAGS >= ChannelState::FundingSent as u32 {
+                       if self.context.is_outbound() && self.context.channel_state & !STATE_FLAGS >= ChannelState::FundingSent as u32 && self.context.channel_state & ChannelState::WaitingForBatch as u32 == 0 {
                                self.context.funding_transaction.take()
                        } else { None };
                // That said, if the funding transaction is already confirmed (ie we're active with a
                // minimum_depth over 0) don't bother re-broadcasting the confirmed funding tx.
-               if self.context.channel_state & !MULTI_STATE_FLAGS >= ChannelState::ChannelReady as u32 && self.context.minimum_depth != Some(0) {
+               if self.context.channel_state & !STATE_FLAGS >= ChannelState::ChannelReady as u32 && self.context.minimum_depth != Some(0) {
                        funding_broadcastable = None;
                }
 
@@ -4209,7 +4277,7 @@ impl<SP: Deref> Channel<SP> where
                if self.context.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
                        return Err(ChannelError::Close("Peer sent shutdown when we needed a channel_reestablish".to_owned()));
                }
-               if self.context.channel_state < ChannelState::FundingSent as u32 {
+               if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
                        // Spec says we should fail the connection, not the channel, but that's nonsense, there
                        // are plenty of reasons you may want to fail a channel pre-funding, and spec says you
                        // can do that via error message without getting a connection fail anyway...
@@ -4603,7 +4671,7 @@ impl<SP: Deref> Channel<SP> where
        pub fn is_awaiting_initial_mon_persist(&self) -> bool {
                if !self.is_awaiting_monitor_update() { return false; }
                if self.context.channel_state &
-                       !(ChannelState::TheirChannelReady as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)
+                       !(ChannelState::TheirChannelReady as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32 | ChannelState::WaitingForBatch as u32)
                                == ChannelState::FundingSent as u32 {
                        // If we're not a 0conf channel, we'll be waiting on a monitor update with only
                        // FundingSent set, though our peer could have sent their channel_ready.
@@ -4634,7 +4702,7 @@ impl<SP: Deref> Channel<SP> where
 
        /// Returns true if our channel_ready has been sent
        pub fn is_our_channel_ready(&self) -> bool {
-               (self.context.channel_state & ChannelState::OurChannelReady as u32) != 0 || self.context.channel_state >= ChannelState::ChannelReady as u32
+               (self.context.channel_state & ChannelState::OurChannelReady as u32) != 0 || self.context.channel_state & !STATE_FLAGS >= ChannelState::ChannelReady as u32
        }
 
        /// Returns true if our peer has either initiated or agreed to shut down the channel.
@@ -4683,6 +4751,8 @@ impl<SP: Deref> Channel<SP> where
                        return None;
                }
 
+               // Note that we don't include ChannelState::WaitingForBatch as we don't want to send
+               // channel_ready until the entire batch is ready.
                let non_shutdown_state = self.context.channel_state & (!MULTI_STATE_FLAGS);
                let need_commitment_update = if non_shutdown_state == ChannelState::FundingSent as u32 {
                        self.context.channel_state |= ChannelState::OurChannelReady as u32;
@@ -4695,7 +4765,7 @@ impl<SP: Deref> Channel<SP> where
                        // We got a reorg but not enough to trigger a force close, just ignore.
                        false
                } else {
-                       if self.context.funding_tx_confirmation_height != 0 && self.context.channel_state < ChannelState::ChannelReady as u32 {
+                       if self.context.funding_tx_confirmation_height != 0 && self.context.channel_state & !STATE_FLAGS < ChannelState::ChannelReady as u32 {
                                // We should never see a funding transaction on-chain until we've received
                                // funding_signed (if we're an outbound channel), or seen funding_generated (if we're
                                // an inbound channel - before that we have no known funding TXID). The fuzzer,
@@ -4865,7 +4935,7 @@ impl<SP: Deref> Channel<SP> where
                }
 
                let non_shutdown_state = self.context.channel_state & (!MULTI_STATE_FLAGS);
-               if non_shutdown_state >= ChannelState::ChannelReady as u32 ||
+               if non_shutdown_state & !STATE_FLAGS >= ChannelState::ChannelReady as u32 ||
                   (non_shutdown_state & ChannelState::OurChannelReady as u32) == ChannelState::OurChannelReady as u32 {
                        let mut funding_tx_confirmations = height as i64 - self.context.funding_tx_confirmation_height as i64 + 1;
                        if self.context.funding_tx_confirmation_height == 0 {
@@ -4893,7 +4963,7 @@ impl<SP: Deref> Channel<SP> where
                                height >= self.context.channel_creation_height + FUNDING_CONF_DEADLINE_BLOCKS {
                        log_info!(logger, "Closing channel {} due to funding timeout", &self.context.channel_id);
                        // If funding_tx_confirmed_in is unset, the channel must not be active
-                       assert!(non_shutdown_state <= ChannelState::ChannelReady as u32);
+                       assert!(non_shutdown_state & !STATE_FLAGS <= ChannelState::ChannelReady as u32);
                        assert_eq!(non_shutdown_state & ChannelState::OurChannelReady as u32, 0);
                        return Err(ClosureReason::FundingTimedOut);
                }
@@ -5513,7 +5583,7 @@ impl<SP: Deref> Channel<SP> where
                // If we haven't funded the channel yet, we don't need to bother ensuring the shutdown
                // script is set, we just force-close and call it a day.
                let mut chan_closed = false;
-               if self.context.channel_state < ChannelState::FundingSent as u32 {
+               if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
                        chan_closed = true;
                }
 
@@ -5542,7 +5612,7 @@ impl<SP: Deref> Channel<SP> where
 
                // From here on out, we may not fail!
                self.context.target_closing_feerate_sats_per_kw = target_feerate_sats_per_kw;
-               if self.context.channel_state < ChannelState::FundingSent as u32 {
+               if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
                        self.context.channel_state = ChannelState::ShutdownComplete as u32;
                } else {
                        self.context.channel_state |= ChannelState::LocalShutdownSent as u32;
@@ -5765,6 +5835,7 @@ impl<SP: Deref> OutboundV1Channel<SP> where SP::Target: SignerProvider {
                                        channel_type_features: channel_type.clone()
                                },
                                funding_transaction: None,
+                               is_batch_funding: None,
 
                                counterparty_cur_commitment_point: None,
                                counterparty_prev_commitment_point: None,
@@ -5825,7 +5896,7 @@ impl<SP: Deref> OutboundV1Channel<SP> where SP::Target: SignerProvider {
        /// Note that channel_id changes during this call!
        /// Do NOT broadcast the funding transaction until after a successful funding_signed call!
        /// If an Err is returned, it is a ChannelError::Close.
-       pub fn get_funding_created<L: Deref>(mut self, funding_transaction: Transaction, funding_txo: OutPoint, logger: &L)
+       pub fn get_funding_created<L: Deref>(mut self, funding_transaction: Transaction, funding_txo: OutPoint, is_batch_funding: bool, logger: &L)
        -> Result<(Channel<SP>, msgs::FundingCreated), (Self, ChannelError)> where L::Target: Logger {
                if !self.context.is_outbound() {
                        panic!("Tried to create outbound funding_created message on an inbound channel!");
@@ -5867,6 +5938,7 @@ impl<SP: Deref> OutboundV1Channel<SP> where SP::Target: SignerProvider {
                }
 
                self.context.funding_transaction = Some(funding_transaction);
+               self.context.is_batch_funding = Some(()).filter(|_| is_batch_funding);
 
                let channel = Channel {
                        context: self.context,
@@ -6416,6 +6488,7 @@ impl<SP: Deref> InboundV1Channel<SP> where SP::Target: SignerProvider {
                                        channel_type_features: channel_type.clone()
                                },
                                funding_transaction: None,
+                               is_batch_funding: None,
 
                                counterparty_cur_commitment_point: Some(msg.first_per_commitment_point),
                                counterparty_prev_commitment_point: None,
@@ -7031,6 +7104,7 @@ impl<SP: Deref> Writeable for Channel<SP> where SP::Target: SignerProvider {
                        (31, channel_pending_event_emitted, option),
                        (35, pending_outbound_skimmed_fees, optional_vec),
                        (37, holding_cell_skimmed_fees, optional_vec),
+                       (38, self.context.is_batch_funding, option),
                });
 
                Ok(())
@@ -7253,7 +7327,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                };
 
                let mut channel_parameters: ChannelTransactionParameters = Readable::read(reader)?;
-               let funding_transaction = Readable::read(reader)?;
+               let funding_transaction: Option<Transaction> = Readable::read(reader)?;
 
                let counterparty_cur_commitment_point = Readable::read(reader)?;
 
@@ -7314,6 +7388,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                let mut pending_outbound_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
                let mut holding_cell_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
 
+               let mut is_batch_funding: Option<()> = None;
+
                read_tlv_fields!(reader, {
                        (0, announcement_sigs, option),
                        (1, minimum_depth, option),
@@ -7339,6 +7415,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                        (31, channel_pending_event_emitted, option),
                        (35, pending_outbound_skimmed_fees_opt, optional_vec),
                        (37, holding_cell_skimmed_fees_opt, optional_vec),
+                       (38, is_batch_funding, option),
                });
 
                let (channel_keys_id, holder_signer) = if let Some(channel_keys_id) = channel_keys_id {
@@ -7346,7 +7423,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                        // If we've gotten to the funding stage of the channel, populate the signer with its
                        // required channel parameters.
                        let non_shutdown_state = channel_state & (!MULTI_STATE_FLAGS);
-                       if non_shutdown_state >= (ChannelState::FundingCreated as u32) {
+                       if non_shutdown_state & !STATE_FLAGS >= (ChannelState::FundingCreated as u32) {
                                holder_signer.provide_channel_parameters(&channel_parameters);
                        }
                        (channel_keys_id, holder_signer)
@@ -7496,6 +7573,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
 
                                channel_transaction_parameters: channel_parameters,
                                funding_transaction,
+                               is_batch_funding,
 
                                counterparty_cur_commitment_point,
                                counterparty_prev_commitment_point,
@@ -7549,7 +7627,7 @@ mod tests {
        use crate::ln::PaymentHash;
        use crate::ln::channelmanager::{self, HTLCSource, PaymentId};
        use crate::ln::channel::InitFeatures;
-       use crate::ln::channel::{Channel, InboundHTLCOutput, OutboundV1Channel, InboundV1Channel, OutboundHTLCOutput, InboundHTLCState, OutboundHTLCState, HTLCCandidate, HTLCInitiator, commit_tx_fee_msat};
+       use crate::ln::channel::{Channel, ChannelState, InboundHTLCOutput, OutboundV1Channel, InboundV1Channel, OutboundHTLCOutput, InboundHTLCState, OutboundHTLCState, HTLCCandidate, HTLCInitiator, commit_tx_fee_msat};
        use crate::ln::channel::{MAX_FUNDING_SATOSHIS_NO_WUMBO, TOTAL_BITCOIN_SUPPLY_SATOSHIS, MIN_THEIR_CHAN_RESERVE_SATOSHIS};
        use crate::ln::features::ChannelTypeFeatures;
        use crate::ln::msgs::{ChannelUpdate, DecodeError, UnsignedChannelUpdate, MAX_VALUE_MSAT};
@@ -7728,7 +7806,7 @@ mod tests {
                        value: 10000000, script_pubkey: output_script.clone(),
                }]};
                let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
-               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap();
+               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap();
                let (_, funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).map_err(|_| ()).unwrap();
 
                // Node B --> Node A: funding signed
@@ -7855,7 +7933,7 @@ mod tests {
                        value: 10000000, script_pubkey: output_script.clone(),
                }]};
                let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
-               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap();
+               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap();
                let (mut node_b_chan, funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).map_err(|_| ()).unwrap();
 
                // Node B --> Node A: funding signed
@@ -7863,7 +7941,7 @@ mod tests {
 
                // Now disconnect the two nodes and check that the commitment point in
                // Node B's channel_reestablish message is sane.
-               node_b_chan.remove_uncommitted_htlcs_and_mark_paused(&&logger);
+               assert!(node_b_chan.remove_uncommitted_htlcs_and_mark_paused(&&logger).is_ok());
                let msg = node_b_chan.get_channel_reestablish(&&logger);
                assert_eq!(msg.next_local_commitment_number, 1); // now called next_commitment_number
                assert_eq!(msg.next_remote_commitment_number, 0); // now called next_revocation_number
@@ -7871,7 +7949,7 @@ mod tests {
 
                // Check that the commitment point in Node A's channel_reestablish message
                // is sane.
-               node_a_chan.remove_uncommitted_htlcs_and_mark_paused(&&logger);
+               assert!(node_a_chan.remove_uncommitted_htlcs_and_mark_paused(&&logger).is_ok());
                let msg = node_a_chan.get_channel_reestablish(&&logger);
                assert_eq!(msg.next_local_commitment_number, 1); // now called next_commitment_number
                assert_eq!(msg.next_remote_commitment_number, 0); // now called next_revocation_number
@@ -8043,7 +8121,7 @@ mod tests {
                        value: 10000000, script_pubkey: output_script.clone(),
                }]};
                let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
-               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap();
+               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap();
                let (_, funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).map_err(|_| ()).unwrap();
 
                // Node B --> Node A: funding signed
@@ -9023,4 +9101,146 @@ mod tests {
                );
                assert!(res.is_err());
        }
+
+       #[test]
+       fn test_waiting_for_batch() {
+               let feeest = LowerBoundedFeeEstimator::new(&TestFeeEstimator{fee_est: 15000});
+               let logger = test_utils::TestLogger::new();
+               let secp_ctx = Secp256k1::new();
+               let seed = [42; 32];
+               let network = Network::Testnet;
+               let best_block = BestBlock::from_network(network);
+               let chain_hash = genesis_block(network).header.block_hash();
+               let keys_provider = test_utils::TestKeysInterface::new(&seed, network);
+
+               let mut config = UserConfig::default();
+               // Set trust_own_funding_0conf while ensuring we don't send channel_ready for a
+               // channel in a batch before all channels are ready.
+               config.channel_handshake_limits.trust_own_funding_0conf = true;
+
+               // Create a channel from node a to node b that will be part of batch funding.
+               let node_b_node_id = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap());
+               let mut node_a_chan = OutboundV1Channel::<&TestKeysInterface>::new(
+                       &feeest,
+                       &&keys_provider,
+                       &&keys_provider,
+                       node_b_node_id,
+                       &channelmanager::provided_init_features(&config),
+                       10000000,
+                       100000,
+                       42,
+                       &config,
+                       0,
+                       42,
+               ).unwrap();
+
+               let open_channel_msg = node_a_chan.get_open_channel(genesis_block(network).header.block_hash());
+               let node_b_node_id = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[7; 32]).unwrap());
+               let mut node_b_chan = InboundV1Channel::<&TestKeysInterface>::new(
+                       &feeest,
+                       &&keys_provider,
+                       &&keys_provider,
+                       node_b_node_id,
+                       &channelmanager::provided_channel_type_features(&config),
+                       &channelmanager::provided_init_features(&config),
+                       &open_channel_msg,
+                       7,
+                       &config,
+                       0,
+                       &&logger,
+                       true,  // Allow node b to send a 0conf channel_ready.
+               ).unwrap();
+
+               let accept_channel_msg = node_b_chan.accept_inbound_channel();
+               node_a_chan.accept_channel(
+                       &accept_channel_msg,
+                       &config.channel_handshake_limits,
+                       &channelmanager::provided_init_features(&config),
+               ).unwrap();
+
+               // Fund the channel with a batch funding transaction.
+               let output_script = node_a_chan.context.get_funding_redeemscript();
+               let tx = Transaction {
+                       version: 1,
+                       lock_time: PackedLockTime::ZERO,
+                       input: Vec::new(),
+                       output: vec![
+                               TxOut {
+                                       value: 10000000, script_pubkey: output_script.clone(),
+                               },
+                               TxOut {
+                                       value: 10000000, script_pubkey: Builder::new().into_script(),
+                               },
+                       ]};
+               let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
+               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(
+                       tx.clone(),
+                       funding_outpoint,
+                       true,
+                       &&logger,
+               ).map_err(|_| ()).unwrap();
+               let (mut node_b_chan, funding_signed_msg, _) = node_b_chan.funding_created(
+                       &funding_created_msg,
+                       best_block,
+                       &&keys_provider,
+                       &&logger,
+               ).map_err(|_| ()).unwrap();
+               let node_b_updates = node_b_chan.monitor_updating_restored(
+                       &&logger,
+                       &&keys_provider,
+                       chain_hash,
+                       &config,
+                       0,
+               );
+
+               // Receive funding_signed, but the channel will be configured to hold sending channel_ready and
+               // broadcasting the funding transaction until the batch is ready.
+               let _ = node_a_chan.funding_signed(
+                       &funding_signed_msg,
+                       best_block,
+                       &&keys_provider,
+                       &&logger,
+               ).unwrap();
+               let node_a_updates = node_a_chan.monitor_updating_restored(
+                       &&logger,
+                       &&keys_provider,
+                       chain_hash,
+                       &config,
+                       0,
+               );
+               // Our channel_ready shouldn't be sent yet, even with trust_own_funding_0conf set,
+               // as the funding transaction depends on all channels in the batch becoming ready.
+               assert!(node_a_updates.channel_ready.is_none());
+               assert!(node_a_updates.funding_broadcastable.is_none());
+               assert_eq!(
+                       node_a_chan.context.channel_state,
+                       ChannelState::FundingSent as u32 |
+                       ChannelState::WaitingForBatch as u32,
+               );
+
+               // It is possible to receive a 0conf channel_ready from the remote node.
+               node_a_chan.channel_ready(
+                       &node_b_updates.channel_ready.unwrap(),
+                       &&keys_provider,
+                       chain_hash,
+                       &config,
+                       &best_block,
+                       &&logger,
+               ).unwrap();
+               assert_eq!(
+                       node_a_chan.context.channel_state,
+                       ChannelState::FundingSent as u32 |
+                       ChannelState::WaitingForBatch as u32 |
+                       ChannelState::TheirChannelReady as u32,
+               );
+
+               // Clear the ChannelState::WaitingForBatch only when called by ChannelManager.
+               node_a_chan.set_batch_ready();
+               assert_eq!(
+                       node_a_chan.context.channel_state,
+                       ChannelState::FundingSent as u32 |
+                       ChannelState::TheirChannelReady as u32,
+               );
+               assert!(node_a_chan.check_get_channel_ready(0).is_some());
+       }
 }
index 2d194f06a0c905b514ebe6df2b4b5f0d4a70da8f..e0fc3ed2c83e2f3fae3f4958f1c31181e7f8e633 100644 (file)
@@ -64,7 +64,7 @@ use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, Maybe
 use crate::util::logger::{Level, Logger};
 use crate::util::errors::APIError;
 
-use alloc::collections::BTreeMap;
+use alloc::collections::{btree_map, BTreeMap};
 
 use crate::io;
 use crate::prelude::*;
@@ -1201,6 +1201,12 @@ where
        /// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
        /// Notifier the lock contains sends out a notification when the lock is released.
        total_consistency_lock: RwLock<()>,
+       /// Tracks the progress of channels going through batch funding by whether funding_signed was
+       /// received and the monitor has been persisted.
+       ///
+       /// This information does not need to be persisted as funding nodes can forget
+       /// unfunded channels upon disconnection.
+       funding_batch_states: Mutex<BTreeMap<Txid, Vec<(ChannelId, PublicKey, bool)>>>,
 
        background_events_processed_since_startup: AtomicBool,
 
@@ -2025,9 +2031,54 @@ macro_rules! handle_monitor_update_completion {
                }
 
                let channel_id = $chan.context.channel_id();
+               let unbroadcasted_batch_funding_txid = $chan.context.unbroadcasted_batch_funding_txid();
                core::mem::drop($peer_state_lock);
                core::mem::drop($per_peer_state_lock);
 
+               // If the channel belongs to a batch funding transaction, the progress of the batch
+               // should be updated as we have received funding_signed and persisted the monitor.
+               if let Some(txid) = unbroadcasted_batch_funding_txid {
+                       let mut funding_batch_states = $self.funding_batch_states.lock().unwrap();
+                       let mut batch_completed = false;
+                       if let Some(batch_state) = funding_batch_states.get_mut(&txid) {
+                               let channel_state = batch_state.iter_mut().find(|(chan_id, pubkey, _)| (
+                                       *chan_id == channel_id &&
+                                       *pubkey == counterparty_node_id
+                               ));
+                               if let Some(channel_state) = channel_state {
+                                       channel_state.2 = true;
+                               } else {
+                                       debug_assert!(false, "Missing channel batch state for channel which completed initial monitor update");
+                               }
+                               batch_completed = batch_state.iter().all(|(_, _, completed)| *completed);
+                       } else {
+                               debug_assert!(false, "Missing batch state for channel which completed initial monitor update");
+                       }
+
+                       // When all channels in a batched funding transaction have become ready, it is not necessary
+                       // to track the progress of the batch anymore and the state of the channels can be updated.
+                       if batch_completed {
+                               let removed_batch_state = funding_batch_states.remove(&txid).into_iter().flatten();
+                               let per_peer_state = $self.per_peer_state.read().unwrap();
+                               let mut batch_funding_tx = None;
+                               for (channel_id, counterparty_node_id, _) in removed_batch_state {
+                                       if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+                                               let mut peer_state = peer_state_mutex.lock().unwrap();
+                                               if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(&channel_id) {
+                                                       batch_funding_tx = batch_funding_tx.or_else(|| chan.context.unbroadcasted_funding());
+                                                       chan.set_batch_ready();
+                                                       let mut pending_events = $self.pending_events.lock().unwrap();
+                                                       emit_channel_pending_event!(pending_events, chan);
+                                               }
+                                       }
+                               }
+                               if let Some(tx) = batch_funding_tx {
+                                       log_info!($self.logger, "Broadcasting batch funding transaction with txid {}", tx.txid());
+                                       $self.tx_broadcaster.broadcast_transactions(&[&tx]);
+                               }
+                       }
+               }
+
                $self.handle_monitor_update_completion_actions(update_actions);
 
                if let Some(forwards) = htlc_forwards {
@@ -2230,9 +2281,9 @@ where
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
-
                        event_persist_notifier: Notifier::new(),
                        needs_persist_flag: AtomicBool::new(false),
+                       funding_batch_states: Mutex::new(BTreeMap::new()),
 
                        entropy_source,
                        node_signer,
@@ -2512,6 +2563,7 @@ where
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
                                                let funding_txo_opt = chan.context.get_funding_txo();
                                                let their_features = &peer_state.latest_features;
+                                               let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                let (shutdown_msg, mut monitor_update_opt, htlcs) =
                                                        chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
                                                failed_htlcs = htlcs;
@@ -2542,7 +2594,7 @@ where
                                                                        });
                                                                }
                                                                self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
-                                                               shutdown_result = Some((None, Vec::new()));
+                                                               shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                                                        }
                                                }
                                                break;
@@ -2639,7 +2691,7 @@ where
                        debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
                }
 
-               let (monitor_update_option, mut failed_htlcs) = shutdown_res;
+               let (monitor_update_option, mut failed_htlcs, unbroadcasted_batch_funding_txid) = shutdown_res;
                log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len());
                for htlc_source in failed_htlcs.drain(..) {
                        let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
@@ -2654,6 +2706,31 @@ where
                        // ignore the result here.
                        let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
                }
+               let mut shutdown_results = Vec::new();
+               if let Some(txid) = unbroadcasted_batch_funding_txid {
+                       let mut funding_batch_states = self.funding_batch_states.lock().unwrap();
+                       let affected_channels = funding_batch_states.remove(&txid).into_iter().flatten();
+                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       let mut has_uncompleted_channel = None;
+                       for (channel_id, counterparty_node_id, state) in affected_channels {
+                               if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+                                       let mut peer_state = peer_state_mutex.lock().unwrap();
+                                       if let Some(mut chan) = peer_state.channel_by_id.remove(&channel_id) {
+                                               update_maps_on_chan_removal!(self, &chan.context());
+                                               self.issue_channel_close_events(&chan.context(), ClosureReason::FundingBatchClosure);
+                                               shutdown_results.push(chan.context_mut().force_shutdown(false));
+                                       }
+                               }
+                               has_uncompleted_channel = Some(has_uncompleted_channel.map_or(!state, |v| v || !state));
+                       }
+                       debug_assert!(
+                               has_uncompleted_channel.unwrap_or(true),
+                               "Closing a batch where all channels have completed initial monitor update",
+                       );
+               }
+               for shutdown_result in shutdown_results.drain(..) {
+                       self.finish_close_channel(shutdown_result);
+               }
        }
 
        /// `peer_msg` should be set when we receive a message from a peer, but not set when the
@@ -3650,8 +3727,9 @@ where
 
        /// Handles the generation of a funding transaction, optionally (for tests) with a function
        /// which checks the correctness of the funding transaction given the associated channel.
-       fn funding_transaction_generated_intern<FundingOutput: Fn(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
-               &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput
+       fn funding_transaction_generated_intern<FundingOutput: FnMut(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
+               &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, is_batch_funding: bool,
+               mut find_funding_output: FundingOutput,
        ) -> Result<(), APIError> {
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
@@ -3663,7 +3741,7 @@ where
                        Some(ChannelPhase::UnfundedOutboundV1(chan)) => {
                                let funding_txo = find_funding_output(&chan, &funding_transaction)?;
 
-                               let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger)
+                               let funding_res = chan.get_funding_created(funding_transaction, funding_txo, is_batch_funding, &self.logger)
                                        .map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e {
                                                let channel_id = chan.context.channel_id();
                                                let user_id = chan.context.get_user_id();
@@ -3719,7 +3797,7 @@ where
 
        #[cfg(test)]
        pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> {
-               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |_, tx| {
+               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, false, |_, tx| {
                        Ok(OutPoint { txid: tx.txid(), index: output_index })
                })
        }
@@ -3755,17 +3833,37 @@ where
        /// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady
        /// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed
        pub fn funding_transaction_generated(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> {
+               self.batch_funding_transaction_generated(&[(temporary_channel_id, counterparty_node_id)], funding_transaction)
+       }
+
+       /// Call this upon creation of a batch funding transaction for the given channels.
+       ///
+       /// Return values are identical to [`Self::funding_transaction_generated`], respective to
+       /// each individual channel and transaction output.
+       ///
+       /// Do NOT broadcast the funding transaction yourself. This batch funding transcaction
+       /// will only be broadcast when we have safely received and persisted the counterparty's
+       /// signature for each channel.
+       ///
+       /// If there is an error, all channels in the batch are to be considered closed.
+       pub fn batch_funding_transaction_generated(&self, temporary_channels: &[(&ChannelId, &PublicKey)], funding_transaction: Transaction) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+               let mut result = Ok(());
 
                if !funding_transaction.is_coin_base() {
                        for inp in funding_transaction.input.iter() {
                                if inp.witness.is_empty() {
-                                       return Err(APIError::APIMisuseError {
+                                       result = result.and(Err(APIError::APIMisuseError {
                                                err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned()
-                                       });
+                                       }));
                                }
                        }
                }
+               if funding_transaction.output.len() > u16::max_value() as usize {
+                       result = result.and(Err(APIError::APIMisuseError {
+                               err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
+                       }));
+               }
                {
                        let height = self.best_block.read().unwrap().height();
                        // Transactions are evaluated as final by network mempools if their locktime is strictly
@@ -3773,37 +3871,93 @@ where
                        // node might not have perfect sync about their blockchain views. Thus, if the wallet
                        // module is ahead of LDK, only allow one more block of headroom.
                        if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 1 {
-                               return Err(APIError::APIMisuseError {
+                               result = result.and(Err(APIError::APIMisuseError {
                                        err: "Funding transaction absolute timelock is non-final".to_owned()
-                               });
+                               }));
                        }
                }
-               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| {
-                       if tx.output.len() > u16::max_value() as usize {
-                               return Err(APIError::APIMisuseError {
-                                       err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
-                               });
-                       }
 
-                       let mut output_index = None;
-                       let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
-                       for (idx, outp) in tx.output.iter().enumerate() {
-                               if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
-                                       if output_index.is_some() {
+               let txid = funding_transaction.txid();
+               let is_batch_funding = temporary_channels.len() > 1;
+               let mut funding_batch_states = if is_batch_funding {
+                       Some(self.funding_batch_states.lock().unwrap())
+               } else {
+                       None
+               };
+               let mut funding_batch_state = funding_batch_states.as_mut().and_then(|states| {
+                       match states.entry(txid) {
+                               btree_map::Entry::Occupied(_) => {
+                                       result = result.clone().and(Err(APIError::APIMisuseError {
+                                               err: "Batch funding transaction with the same txid already exists".to_owned()
+                                       }));
+                                       None
+                               },
+                               btree_map::Entry::Vacant(vacant) => Some(vacant.insert(Vec::new())),
+                       }
+               });
+               for (channel_idx, &(temporary_channel_id, counterparty_node_id)) in temporary_channels.iter().enumerate() {
+                       result = result.and_then(|_| self.funding_transaction_generated_intern(
+                               temporary_channel_id,
+                               counterparty_node_id,
+                               funding_transaction.clone(),
+                               is_batch_funding,
+                               |chan, tx| {
+                                       let mut output_index = None;
+                                       let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
+                                       for (idx, outp) in tx.output.iter().enumerate() {
+                                               if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
+                                                       if output_index.is_some() {
+                                                               return Err(APIError::APIMisuseError {
+                                                                       err: "Multiple outputs matched the expected script and value".to_owned()
+                                                               });
+                                                       }
+                                                       output_index = Some(idx as u16);
+                                               }
+                                       }
+                                       if output_index.is_none() {
                                                return Err(APIError::APIMisuseError {
-                                                       err: "Multiple outputs matched the expected script and value".to_owned()
+                                                       err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
                                                });
                                        }
-                                       output_index = Some(idx as u16);
+                                       let outpoint = OutPoint { txid: tx.txid(), index: output_index.unwrap() };
+                                       if let Some(funding_batch_state) = funding_batch_state.as_mut() {
+                                               funding_batch_state.push((outpoint.to_channel_id(), *counterparty_node_id, false));
+                                       }
+                                       Ok(outpoint)
+                               })
+                       );
+               }
+               if let Err(ref e) = result {
+                       // Remaining channels need to be removed on any error.
+                       let e = format!("Error in transaction funding: {:?}", e);
+                       let mut channels_to_remove = Vec::new();
+                       channels_to_remove.extend(funding_batch_states.as_mut()
+                               .and_then(|states| states.remove(&txid))
+                               .into_iter().flatten()
+                               .map(|(chan_id, node_id, _state)| (chan_id, node_id))
+                       );
+                       channels_to_remove.extend(temporary_channels.iter()
+                               .map(|(&chan_id, &node_id)| (chan_id, node_id))
+                       );
+                       let mut shutdown_results = Vec::new();
+                       {
+                               let per_peer_state = self.per_peer_state.read().unwrap();
+                               for (channel_id, counterparty_node_id) in channels_to_remove {
+                                       per_peer_state.get(&counterparty_node_id)
+                                               .map(|peer_state_mutex| peer_state_mutex.lock().unwrap())
+                                               .and_then(|mut peer_state| peer_state.channel_by_id.remove(&channel_id))
+                                               .map(|mut chan| {
+                                                       update_maps_on_chan_removal!(self, &chan.context());
+                                                       self.issue_channel_close_events(&chan.context(), ClosureReason::ProcessingError { err: e.clone() });
+                                                       shutdown_results.push(chan.context_mut().force_shutdown(false));
+                                               });
                                }
                        }
-                       if output_index.is_none() {
-                               return Err(APIError::APIMisuseError {
-                                       err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
-                               });
+                       for shutdown_result in shutdown_results.drain(..) {
+                               self.finish_close_channel(shutdown_result);
                        }
-                       Ok(OutPoint { txid: tx.txid(), index: output_index.unwrap() })
-               })
+               }
+               result
        }
 
        /// Atomically applies partial updates to the [`ChannelConfig`] of the given channels.
@@ -6089,6 +6243,7 @@ where
 
        fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
                let mut shutdown_result = None;
+               let unbroadcasted_batch_funding_txid;
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
@@ -6101,6 +6256,7 @@ where
                        match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_phase_entry) => {
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
+                                               unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
                                                if let Some(msg) = closing_signed {
                                                        peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
@@ -6137,9 +6293,8 @@ where
                                });
                        }
                        self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure);
-                       shutdown_result = Some((None, Vec::new()));
+                       shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                }
-               mem::drop(peer_state_mutex);
                mem::drop(per_peer_state);
                if let Some(shutdown_result) = shutdown_result {
                        self.finish_close_channel(shutdown_result);
@@ -6818,6 +6973,7 @@ where
                let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new();
                let mut has_update = false;
                let mut shutdown_result = None;
+               let mut unbroadcasted_batch_funding_txid = None;
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
 
@@ -6828,6 +6984,7 @@ where
                                peer_state.channel_by_id.retain(|channel_id, phase| {
                                        match phase {
                                                ChannelPhase::Funded(chan) => {
+                                                       unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                        match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) {
                                                                Ok((msg_opt, tx_opt)) => {
                                                                        if let Some(msg) = msg_opt {
@@ -6850,7 +7007,7 @@ where
                                                                                log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
                                                                                self.tx_broadcaster.broadcast_transactions(&[&tx]);
                                                                                update_maps_on_chan_removal!(self, &chan.context);
-                                                                               shutdown_result = Some((None, Vec::new()));
+                                                                               shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                                                                                false
                                                                        } else { true }
                                                                },
@@ -7841,7 +7998,6 @@ where
        fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
                let _persistence_guard = PersistenceNotifierGuard::optionally_notify(
                        self, || NotifyOption::SkipPersistHandleEvents);
-
                let mut failed_channels = Vec::new();
                let mut per_peer_state = self.per_peer_state.write().unwrap();
                let remove_peer = {
@@ -7854,25 +8010,24 @@ where
                                peer_state.channel_by_id.retain(|_, phase| {
                                        let context = match phase {
                                                ChannelPhase::Funded(chan) => {
-                                                       chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
-                                                       // We only retain funded channels that are not shutdown.
-                                                       if !chan.is_shutdown() {
+                                                       if chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger).is_ok() {
+                                                               // We only retain funded channels that are not shutdown.
                                                                return true;
                                                        }
-                                                       &chan.context
+                                                       &mut chan.context
                                                },
                                                // Unfunded channels will always be removed.
                                                ChannelPhase::UnfundedOutboundV1(chan) => {
-                                                       &chan.context
+                                                       &mut chan.context
                                                },
                                                ChannelPhase::UnfundedInboundV1(chan) => {
-                                                       &chan.context
+                                                       &mut chan.context
                                                },
                                        };
                                        // Clean up for removal.
                                        update_maps_on_chan_removal!(self, &context);
                                        self.issue_channel_close_events(&context, ClosureReason::DisconnectedPeer);
-                                       failed_channels.push((None, Vec::new()));
+                                       failed_channels.push(context.force_shutdown(false));
                                        false
                                });
                                // Note that we don't bother generating any events for pre-accept channels -
@@ -8676,7 +8831,7 @@ where
                                }
 
                                number_of_funded_channels += peer_state.channel_by_id.iter().filter(
-                                       |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_initiated() } else { false }
+                                       |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_broadcast() } else { false }
                                ).count();
                        }
 
@@ -8687,7 +8842,7 @@ where
                                let peer_state = &mut *peer_state_lock;
                                for channel in peer_state.channel_by_id.iter().filter_map(
                                        |(_, phase)| if let ChannelPhase::Funded(channel) = phase {
-                                               if channel.context.is_funding_initiated() { Some(channel) } else { None }
+                                               if channel.context.is_funding_broadcast() { Some(channel) } else { None }
                                        } else { None }
                                ) {
                                        channel.write(writer)?;
@@ -9111,7 +9266,10 @@ where
                                                log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
                                                        &channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
                                        }
-                                       let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true);
+                                       let (monitor_update, mut new_failed_htlcs, batch_funding_txid) = channel.context.force_shutdown(true);
+                                       if batch_funding_txid.is_some() {
+                                               return Err(DecodeError::InvalidValue);
+                                       }
                                        if let Some((counterparty_node_id, funding_txo, update)) = monitor_update {
                                                close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
                                                        counterparty_node_id, funding_txo, update
@@ -9151,7 +9309,7 @@ where
                                        if let Some(short_channel_id) = channel.context.get_short_channel_id() {
                                                short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
                                        }
-                                       if channel.context.is_funding_initiated() {
+                                       if channel.context.is_funding_broadcast() {
                                                id_to_peer.insert(channel.context.channel_id(), channel.context.get_counterparty_node_id());
                                        }
                                        match funded_peer_channels.entry(channel.context.get_counterparty_node_id()) {
@@ -9865,6 +10023,8 @@ where
                        event_persist_notifier: Notifier::new(),
                        needs_persist_flag: AtomicBool::new(false),
 
+                       funding_batch_states: Mutex::new(BTreeMap::new()),
+
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,
                        signer_provider: args.signer_provider,
index 0e19ceb65832e47db0e3fb7d5165361acf7282b1..207c0692031a7c89b9f2f6c75da9fc71997abcc3 100644 (file)
@@ -1473,12 +1473,12 @@ pub fn check_closed_event(node: &Node, events_count: usize, expected_reason: Clo
        let events = node.node.get_and_clear_pending_events();
        assert_eq!(events.len(), events_count, "{:?}", events);
        let mut issues_discard_funding = false;
-       for (idx, event) in events.into_iter().enumerate() {
+       for event in events {
                match event {
-                       Event::ChannelClosed { ref reason, counterparty_node_id, 
+                       Event::ChannelClosed { ref reason, counterparty_node_id,
                                channel_capacity_sats, .. } => {
                                assert_eq!(*reason, expected_reason);
-                               assert_eq!(counterparty_node_id.unwrap(), expected_counterparty_node_ids[idx]);
+                               assert!(expected_counterparty_node_ids.iter().any(|id| id == &counterparty_node_id.unwrap()));
                                assert_eq!(channel_capacity_sats.unwrap(), expected_channel_capacity);
                        },
                        Event::DiscardFunding { .. } => {
@@ -1499,7 +1499,7 @@ macro_rules! check_closed_event {
                check_closed_event!($node, $events, $reason, false, $counterparty_node_ids, $channel_capacity);
        };
        ($node: expr, $events: expr, $reason: expr, $is_check_discard_funding: expr, $counterparty_node_ids: expr, $channel_capacity: expr) => {
-               $crate::ln::functional_test_utils::check_closed_event(&$node, $events, $reason, 
+               $crate::ln::functional_test_utils::check_closed_event(&$node, $events, $reason,
                        $is_check_discard_funding, &$counterparty_node_ids, $channel_capacity);
        }
 }
@@ -3266,3 +3266,76 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) {
                }
        }
 }
+
+/// Initiates channel opening and creates a single batch funding transaction.
+/// This will go through the open_channel / accept_channel flow, and return the batch funding
+/// transaction with corresponding funding_created messages.
+pub fn create_batch_channel_funding<'a, 'b, 'c>(
+       funding_node: &Node<'a, 'b, 'c>,
+       params: &[(&Node<'a, 'b, 'c>, u64, u64, u128, Option<UserConfig>)],
+) -> (Transaction, Vec<msgs::FundingCreated>) {
+       let mut tx_outs = Vec::new();
+       let mut temp_chan_ids = Vec::new();
+       let mut funding_created_msgs = Vec::new();
+
+       for (other_node, channel_value_satoshis, push_msat, user_channel_id, override_config) in params {
+               // Initialize channel opening.
+               let temp_chan_id = funding_node.node.create_channel(
+                       other_node.node.get_our_node_id(), *channel_value_satoshis, *push_msat, *user_channel_id,
+                       *override_config,
+               ).unwrap();
+               let open_channel_msg = get_event_msg!(funding_node, MessageSendEvent::SendOpenChannel, other_node.node.get_our_node_id());
+               other_node.node.handle_open_channel(&funding_node.node.get_our_node_id(), &open_channel_msg);
+               let accept_channel_msg = get_event_msg!(other_node, MessageSendEvent::SendAcceptChannel, funding_node.node.get_our_node_id());
+               funding_node.node.handle_accept_channel(&other_node.node.get_our_node_id(), &accept_channel_msg);
+
+               // Create the corresponding funding output.
+               let events = funding_node.node.get_and_clear_pending_events();
+               assert_eq!(events.len(), 1);
+               match events[0] {
+                       Event::FundingGenerationReady {
+                               ref temporary_channel_id,
+                               ref counterparty_node_id,
+                               channel_value_satoshis: ref event_channel_value_satoshis,
+                               ref output_script,
+                               user_channel_id: ref event_user_channel_id
+                       } => {
+                               assert_eq!(temporary_channel_id, &temp_chan_id);
+                               assert_eq!(counterparty_node_id, &other_node.node.get_our_node_id());
+                               assert_eq!(channel_value_satoshis, event_channel_value_satoshis);
+                               assert_eq!(user_channel_id, event_user_channel_id);
+                               tx_outs.push(TxOut {
+                                       value: *channel_value_satoshis, script_pubkey: output_script.clone(),
+                               });
+                       },
+                       _ => panic!("Unexpected event"),
+               };
+               temp_chan_ids.push((temp_chan_id, other_node.node.get_our_node_id()));
+       }
+
+       // Compose the batch funding transaction and give it to the ChannelManager.
+       let tx = Transaction {
+               version: 2,
+               lock_time: PackedLockTime::ZERO,
+               input: Vec::new(),
+               output: tx_outs,
+       };
+       assert!(funding_node.node.batch_funding_transaction_generated(
+               temp_chan_ids.iter().map(|(a, b)| (a, b)).collect::<Vec<_>>().as_slice(),
+               tx.clone(),
+       ).is_ok());
+       check_added_monitors!(funding_node, 0);
+       let events = funding_node.node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), params.len());
+       for (other_node, ..) in params {
+               let funding_created = events
+                       .iter()
+                       .find_map(|event| match event {
+                               MessageSendEvent::SendFundingCreated { node_id, msg } if node_id == &other_node.node.get_our_node_id() => Some(msg.clone()),
+                               _ => None,
+                       })
+                       .unwrap();
+               funding_created_msgs.push(funding_created);
+       }
+       return (tx, funding_created_msgs);
+}
index 1066362a4c4c101791a0c12a051e36c7986c2741..19ba81235f2df3bb8bad508ab39787aebb95b053 100644 (file)
@@ -15,7 +15,7 @@ use crate::chain;
 use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen, Watch};
 use crate::chain::chaininterface::LowerBoundedFeeEstimator;
 use crate::chain::channelmonitor;
-use crate::chain::channelmonitor::{CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
+use crate::chain::channelmonitor::{CLOSED_CHANNEL_UPDATE_ID, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
 use crate::chain::transaction::OutPoint;
 use crate::sign::{EcdsaChannelSigner, EntropySource, SignerProvider};
 use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, PaymentPurpose, ClosureReason, HTLCDestination, PaymentFailureReason};
@@ -3721,7 +3721,7 @@ fn test_peer_disconnected_before_funding_broadcasted() {
        nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
        nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
 
-       check_closed_event!(&nodes[0], 1, ClosureReason::DisconnectedPeer, false
+       check_closed_event!(&nodes[0], 2, ClosureReason::DisconnectedPeer, true
                , [nodes[1].node.get_our_node_id()], 1000000);
        check_closed_event!(&nodes[1], 1, ClosureReason::DisconnectedPeer, false
                , [nodes[0].node.get_our_node_id()], 1000000);
@@ -9038,7 +9038,7 @@ fn test_duplicate_chan_id() {
                match a_peer_state.channel_by_id.remove(&open_chan_2_msg.temporary_channel_id).unwrap() {
                        ChannelPhase::UnfundedOutboundV1(chan) => {
                                let logger = test_utils::TestLogger::new();
-                               chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap()
+                               chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap()
                        },
                        _ => panic!("Unexpected ChannelPhase variant"),
                }
@@ -9900,9 +9900,46 @@ fn test_non_final_funding_tx() {
                },
                _ => panic!()
        }
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 1);
+       match events[0] {
+               Event::ChannelClosed { channel_id, .. } => {
+                       assert_eq!(channel_id, temp_channel_id);
+               },
+               _ => panic!("Unexpected event"),
+       }
+}
+
+#[test]
+fn test_non_final_funding_tx_within_headroom() {
+       let chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+       let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+       let temp_channel_id = nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+       let open_channel_message = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+       nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_message);
+       let accept_channel_message = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), &accept_channel_message);
+
+       let best_height = nodes[0].node.best_block.read().unwrap().height();
+
+       let chan_id = *nodes[0].network_chan_count.borrow();
+       let events = nodes[0].node.get_and_clear_pending_events();
+       let input = TxIn { previous_output: BitcoinOutPoint::null(), script_sig: bitcoin::Script::new(), sequence: Sequence(1), witness: Witness::from_vec(vec!(vec!(1))) };
+       assert_eq!(events.len(), 1);
+       let mut tx = match events[0] {
+               Event::FundingGenerationReady { ref channel_value_satoshis, ref output_script, .. } => {
+                       // Timelock the transaction within a +1 headroom from the best block.
+                       Transaction { version: chan_id as i32, lock_time: PackedLockTime(best_height + 1), input: vec![input], output: vec![TxOut {
+                               value: *channel_value_satoshis, script_pubkey: output_script.clone(),
+                       }]}
+               },
+               _ => panic!("Unexpected event"),
+       };
 
-       // However, transaction should be accepted if it's in a +1 headroom from best block.
-       tx.lock_time = PackedLockTime(tx.lock_time.0 - 1);
+       // Transaction should be accepted if it's in a +1 headroom from best block.
        assert!(nodes[0].node.funding_transaction_generated(&temp_channel_id, &nodes[1].node.get_our_node_id(), tx.clone()).is_ok());
        get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id());
 }
@@ -10361,3 +10398,230 @@ fn test_multi_post_event_actions() {
        do_test_multi_post_event_actions(true);
        do_test_multi_post_event_actions(false);
 }
+
+#[test]
+fn test_batch_channel_open() {
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+       // Initiate channel opening and create the batch channel funding transaction.
+       let (tx, funding_created_msgs) = create_batch_channel_funding(&nodes[0], &[
+               (&nodes[1], 100_000, 0, 42, None),
+               (&nodes[2], 200_000, 0, 43, None),
+       ]);
+
+       // Go through the funding_created and funding_signed flow with node 1.
+       nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[0]);
+       check_added_monitors(&nodes[1], 1);
+       expect_channel_pending_event(&nodes[1], &nodes[0].node.get_our_node_id());
+
+       let funding_signed_msg = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // The transaction should not have been broadcast before all channels are ready.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+
+       // Go through the funding_created and funding_signed flow with node 2.
+       nodes[2].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[1]);
+       check_added_monitors(&nodes[2], 1);
+       expect_channel_pending_event(&nodes[2], &nodes[0].node.get_our_node_id());
+
+       let funding_signed_msg = get_event_msg!(nodes[2], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
+       nodes[0].node.handle_funding_signed(&nodes[2].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // The transaction should not have been broadcast before persisting all monitors has been
+       // completed.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);
+       assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
+
+       // Complete the persistence of the monitor.
+       nodes[0].chain_monitor.complete_sole_pending_chan_update(
+               &OutPoint { txid: tx.txid(), index: 1 }.to_channel_id()
+       );
+       let events = nodes[0].node.get_and_clear_pending_events();
+
+       // The transaction should only have been broadcast now.
+       let broadcasted_txs = nodes[0].tx_broadcaster.txn_broadcast();
+       assert_eq!(broadcasted_txs.len(), 1);
+       assert_eq!(broadcasted_txs[0], tx);
+
+       assert_eq!(events.len(), 2);
+       assert!(events.iter().any(|e| matches!(
+               *e,
+               crate::events::Event::ChannelPending {
+                       ref counterparty_node_id,
+                       ..
+               } if counterparty_node_id == &nodes[1].node.get_our_node_id(),
+       )));
+       assert!(events.iter().any(|e| matches!(
+               *e,
+               crate::events::Event::ChannelPending {
+                       ref counterparty_node_id,
+                       ..
+               } if counterparty_node_id == &nodes[2].node.get_our_node_id(),
+       )));
+}
+
+#[test]
+fn test_disconnect_in_funding_batch() {
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+       // Initiate channel opening and create the batch channel funding transaction.
+       let (tx, funding_created_msgs) = create_batch_channel_funding(&nodes[0], &[
+               (&nodes[1], 100_000, 0, 42, None),
+               (&nodes[2], 200_000, 0, 43, None),
+       ]);
+
+       // Go through the funding_created and funding_signed flow with node 1.
+       nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[0]);
+       check_added_monitors(&nodes[1], 1);
+       expect_channel_pending_event(&nodes[1], &nodes[0].node.get_our_node_id());
+
+       let funding_signed_msg = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // The transaction should not have been broadcast before all channels are ready.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);
+
+       // The remaining peer in the batch disconnects.
+       nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
+
+       // The channels in the batch will close immediately.
+       let channel_id_1 = OutPoint { txid: tx.txid(), index: 0 }.to_channel_id();
+       let channel_id_2 = OutPoint { txid: tx.txid(), index: 1 }.to_channel_id();
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 4);
+       assert!(events.iter().any(|e| matches!(
+               e,
+               Event::ChannelClosed {
+                       channel_id,
+                       ..
+               } if channel_id == &channel_id_1
+       )));
+       assert!(events.iter().any(|e| matches!(
+               e,
+               Event::ChannelClosed {
+                       channel_id,
+                       ..
+               } if channel_id == &channel_id_2
+       )));
+       assert_eq!(events.iter().filter(|e| matches!(
+               e,
+               Event::DiscardFunding { .. },
+       )).count(), 2);
+
+       // The monitor should become closed.
+       check_added_monitors(&nodes[0], 1);
+       {
+               let mut monitor_updates = nodes[0].chain_monitor.monitor_updates.lock().unwrap();
+               let monitor_updates_1 = monitor_updates.get(&channel_id_1).unwrap();
+               assert_eq!(monitor_updates_1.len(), 1);
+               assert_eq!(monitor_updates_1[0].update_id, CLOSED_CHANNEL_UPDATE_ID);
+       }
+
+       // The funding transaction should not have been broadcast, and therefore, we don't need
+       // to broadcast a force-close transaction for the closed monitor.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);
+
+       // Ensure the channels don't exist anymore.
+       assert!(nodes[0].node.list_channels().is_empty());
+}
+
+#[test]
+fn test_batch_funding_close_after_funding_signed() {
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+       // Initiate channel opening and create the batch channel funding transaction.
+       let (tx, funding_created_msgs) = create_batch_channel_funding(&nodes[0], &[
+               (&nodes[1], 100_000, 0, 42, None),
+               (&nodes[2], 200_000, 0, 43, None),
+       ]);
+
+       // Go through the funding_created and funding_signed flow with node 1.
+       nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[0]);
+       check_added_monitors(&nodes[1], 1);
+       expect_channel_pending_event(&nodes[1], &nodes[0].node.get_our_node_id());
+
+       let funding_signed_msg = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // Go through the funding_created and funding_signed flow with node 2.
+       nodes[2].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[1]);
+       check_added_monitors(&nodes[2], 1);
+       expect_channel_pending_event(&nodes[2], &nodes[0].node.get_our_node_id());
+
+       let funding_signed_msg = get_event_msg!(nodes[2], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
+       nodes[0].node.handle_funding_signed(&nodes[2].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // The transaction should not have been broadcast before all channels are ready.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);
+
+       // Force-close the channel for which we've completed the initial monitor.
+       let channel_id_1 = OutPoint { txid: tx.txid(), index: 0 }.to_channel_id();
+       let channel_id_2 = OutPoint { txid: tx.txid(), index: 1 }.to_channel_id();
+       nodes[0].node.force_close_broadcasting_latest_txn(&channel_id_1, &nodes[1].node.get_our_node_id()).unwrap();
+       check_added_monitors(&nodes[0], 2);
+       {
+               let mut monitor_updates = nodes[0].chain_monitor.monitor_updates.lock().unwrap();
+               let monitor_updates_1 = monitor_updates.get(&channel_id_1).unwrap();
+               assert_eq!(monitor_updates_1.len(), 1);
+               assert_eq!(monitor_updates_1[0].update_id, CLOSED_CHANNEL_UPDATE_ID);
+               let monitor_updates_2 = monitor_updates.get(&channel_id_2).unwrap();
+               assert_eq!(monitor_updates_2.len(), 1);
+               assert_eq!(monitor_updates_2[0].update_id, CLOSED_CHANNEL_UPDATE_ID);
+       }
+       let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
+       match msg_events[0] {
+               MessageSendEvent::HandleError { .. } => (),
+               _ => panic!("Unexpected message."),
+       }
+
+       // We broadcast the commitment transaction as part of the force-close.
+       {
+               let broadcasted_txs = nodes[0].tx_broadcaster.txn_broadcast();
+               assert_eq!(broadcasted_txs.len(), 1);
+               assert!(broadcasted_txs[0].txid() != tx.txid());
+               assert_eq!(broadcasted_txs[0].input.len(), 1);
+               assert_eq!(broadcasted_txs[0].input[0].previous_output.txid, tx.txid());
+       }
+
+       // All channels in the batch should close immediately.
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 4);
+       assert!(events.iter().any(|e| matches!(
+               e,
+               Event::ChannelClosed {
+                       channel_id,
+                       ..
+               } if channel_id == &channel_id_1
+       )));
+       assert!(events.iter().any(|e| matches!(
+               e,
+               Event::ChannelClosed {
+                       channel_id,
+                       ..
+               } if channel_id == &channel_id_2
+       )));
+       assert_eq!(events.iter().filter(|e| matches!(
+               e,
+               Event::DiscardFunding { .. },
+       )).count(), 2);
+
+       // Ensure the channels don't exist anymore.
+       assert!(nodes[0].node.list_channels().is_empty());
+}
index fa08fba99fabda640739f714dd9eb6818d2ab374..151861411b997002de4792d5c031ed8079faee2e 100644 (file)
@@ -11,7 +11,7 @@
 
 use crate::chain::{ChannelMonitorUpdateStatus, Watch};
 use crate::chain::chaininterface::LowerBoundedFeeEstimator;
-use crate::chain::channelmonitor::ChannelMonitor;
+use crate::chain::channelmonitor::{CLOSED_CHANNEL_UPDATE_ID, ChannelMonitor};
 use crate::sign::EntropySource;
 use crate::chain::transaction::OutPoint;
 use crate::events::{ClosureReason, Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider};
@@ -25,6 +25,7 @@ use crate::util::ser::{Writeable, ReadableArgs};
 use crate::util::config::UserConfig;
 use crate::util::string::UntrustedString;
 
+use bitcoin::{PackedLockTime, Transaction, TxOut};
 use bitcoin::hash_types::BlockHash;
 
 use crate::prelude::*;
@@ -1114,3 +1115,65 @@ fn removed_payment_no_manager_persistence() {
 
        expect_payment_failed!(nodes[0], payment_hash, false);
 }
+
+#[test]
+fn test_reload_partial_funding_batch() {
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+       let new_persister;
+       let new_chain_monitor;
+
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let new_channel_manager;
+       let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+       // Initiate channel opening and create the batch channel funding transaction.
+       let (tx, funding_created_msgs) = create_batch_channel_funding(&nodes[0], &[
+               (&nodes[1], 100_000, 0, 42, None),
+               (&nodes[2], 200_000, 0, 43, None),
+       ]);
+
+       // Go through the funding_created and funding_signed flow with node 1.
+       nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[0]);
+       check_added_monitors(&nodes[1], 1);
+       expect_channel_pending_event(&nodes[1], &nodes[0].node.get_our_node_id());
+
+       // The monitor is persisted when receiving funding_signed.
+       let funding_signed_msg = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // The transaction should not have been broadcast before all channels are ready.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);
+
+       // Reload the node while a subset of the channels in the funding batch have persisted monitors.
+       let channel_id_1 = OutPoint { txid: tx.txid(), index: 0 }.to_channel_id();
+       let node_encoded = nodes[0].node.encode();
+       let channel_monitor_1_serialized = get_monitor!(nodes[0], channel_id_1).encode();
+       reload_node!(nodes[0], node_encoded, &[&channel_monitor_1_serialized], new_persister, new_chain_monitor, new_channel_manager);
+
+       // Process monitor events.
+       assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+
+       // The monitor should become closed.
+       check_added_monitors(&nodes[0], 1);
+       {
+               let mut monitor_updates = nodes[0].chain_monitor.monitor_updates.lock().unwrap();
+               let monitor_updates_1 = monitor_updates.get(&channel_id_1).unwrap();
+               assert_eq!(monitor_updates_1.len(), 1);
+               assert_eq!(monitor_updates_1[0].update_id, CLOSED_CHANNEL_UPDATE_ID);
+       }
+
+       // The funding transaction should not have been broadcast, but we broadcast the force-close
+       // transaction as part of closing the monitor.
+       {
+               let broadcasted_txs = nodes[0].tx_broadcaster.txn_broadcast();
+               assert_eq!(broadcasted_txs.len(), 1);
+               assert!(broadcasted_txs[0].txid() != tx.txid());
+               assert_eq!(broadcasted_txs[0].input.len(), 1);
+               assert_eq!(broadcasted_txs[0].input[0].previous_output.txid, tx.txid());
+       }
+
+       // Ensure the channels don't exist anymore.
+       assert!(nodes[0].node.list_channels().is_empty());
+}