Merge pull request #2601 from TheBlueMatt/2023-09-117-alpha2 v0.0.117-alpha2
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Wed, 27 Sep 2023 03:51:12 +0000 (03:51 +0000)
committerGitHub <noreply@github.com>
Wed, 27 Sep 2023 03:51:12 +0000 (03:51 +0000)
Bump versions to 0.0.117-alpha2/invoice 0.25.0-alpha2

12 files changed:
lightning/src/events/mod.rs
lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/outbound_payment.rs
lightning/src/ln/payment_tests.rs
lightning/src/ln/reload_tests.rs
lightning/src/routing/router.rs
lightning/src/sign/mod.rs
lightning/src/util/persist.rs
pending_changelog/monitorupdatingpersister.txt [new file with mode: 0644]

index bb98e271597309d057ca4712b394e302b35cddc3..269887a3dbac06fc1ba24dcda084f5c2c2115de2 100644 (file)
@@ -199,6 +199,9 @@ pub enum ClosureReason {
        /// The counterparty requested a cooperative close of a channel that had not been funded yet.
        /// The channel has been immediately closed.
        CounterpartyCoopClosedUnfundedChannel,
+       /// Another channel in the same funding batch closed before the funding transaction
+       /// was ready to be broadcast.
+       FundingBatchClosure,
 }
 
 impl core::fmt::Display for ClosureReason {
@@ -219,6 +222,7 @@ impl core::fmt::Display for ClosureReason {
                        ClosureReason::DisconnectedPeer => f.write_str("the peer disconnected prior to the channel being funded"),
                        ClosureReason::OutdatedChannelManager => f.write_str("the ChannelManager read from disk was stale compared to ChannelMonitor(s)"),
                        ClosureReason::CounterpartyCoopClosedUnfundedChannel => f.write_str("the peer requested the unfunded channel be closed"),
+                       ClosureReason::FundingBatchClosure => f.write_str("another channel in the same funding batch closed"),
                }
        }
 }
@@ -233,6 +237,7 @@ impl_writeable_tlv_based_enum_upgradable!(ClosureReason,
        (10, DisconnectedPeer) => {},
        (12, OutdatedChannelManager) => {},
        (13, CounterpartyCoopClosedUnfundedChannel) => {},
+       (15, FundingBatchClosure) => {}
 );
 
 /// Intended destination of a failed HTLC as indicated in [`Event::HTLCHandlingFailed`].
@@ -844,6 +849,8 @@ pub enum Event {
        },
        /// Used to indicate to the user that they can abandon the funding transaction and recycle the
        /// inputs for another purpose.
+       ///
+       /// This event is not guaranteed to be generated for channels that are closed due to a restart.
        DiscardFunding {
                /// The channel_id of the channel which has been closed.
                channel_id: ChannelId,
index 796c041f8185d78d68972370726fdccb96fb4fd5..d14a868539f68f173fcb79e481e2056aa3f8f090 100644 (file)
@@ -300,9 +300,24 @@ enum ChannelState {
        /// We've successfully negotiated a closing_signed dance. At this point ChannelManager is about
        /// to drop us, but we store this anyway.
        ShutdownComplete = 4096,
+       /// Flag which is set on `FundingSent` to indicate this channel is funded in a batch and the
+       /// broadcasting of the funding transaction is being held until all channels in the batch
+       /// have received funding_signed and have their monitors persisted.
+       WaitingForBatch = 1 << 13,
 }
-const BOTH_SIDES_SHUTDOWN_MASK: u32 = ChannelState::LocalShutdownSent as u32 | ChannelState::RemoteShutdownSent as u32;
-const MULTI_STATE_FLAGS: u32 = BOTH_SIDES_SHUTDOWN_MASK | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32;
+const BOTH_SIDES_SHUTDOWN_MASK: u32 =
+       ChannelState::LocalShutdownSent as u32 |
+       ChannelState::RemoteShutdownSent as u32;
+const MULTI_STATE_FLAGS: u32 =
+       BOTH_SIDES_SHUTDOWN_MASK |
+       ChannelState::PeerDisconnected as u32 |
+       ChannelState::MonitorUpdateInProgress as u32;
+const STATE_FLAGS: u32 =
+       MULTI_STATE_FLAGS |
+       ChannelState::TheirChannelReady as u32 |
+       ChannelState::OurChannelReady as u32 |
+       ChannelState::AwaitingRemoteRevoke as u32 |
+       ChannelState::WaitingForBatch as u32;
 
 pub const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1;
 
@@ -527,12 +542,15 @@ pub(super) struct ReestablishResponses {
 
 /// The return type of `force_shutdown`
 ///
-/// Contains a (counterparty_node_id, funding_txo, [`ChannelMonitorUpdate`]) tuple
-/// followed by a list of HTLCs to fail back in the form of the (source, payment hash, and this
-/// channel's counterparty_node_id and channel_id).
+/// Contains a tuple with the following:
+/// - An optional (counterparty_node_id, funding_txo, [`ChannelMonitorUpdate`]) tuple
+/// - A list of HTLCs to fail back in the form of the (source, payment hash, and this channel's
+/// counterparty_node_id and channel_id).
+/// - An optional transaction id identifying a corresponding batch funding transaction.
 pub(crate) type ShutdownResult = (
        Option<(PublicKey, OutPoint, ChannelMonitorUpdate)>,
-       Vec<(HTLCSource, PaymentHash, PublicKey, ChannelId)>
+       Vec<(HTLCSource, PaymentHash, PublicKey, ChannelId)>,
+       Option<Txid>
 );
 
 /// If the majority of the channels funds are to the fundee and the initiator holds only just
@@ -821,6 +839,7 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
 
        pub(crate) channel_transaction_parameters: ChannelTransactionParameters,
        funding_transaction: Option<Transaction>,
+       is_batch_funding: Option<()>,
 
        counterparty_cur_commitment_point: Option<PublicKey>,
        counterparty_prev_commitment_point: Option<PublicKey>,
@@ -945,7 +964,7 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider  {
 
        /// Returns true if we've ever received a message from the remote end for this Channel
        pub fn have_received_message(&self) -> bool {
-               self.channel_state > (ChannelState::OurInitSent as u32)
+               self.channel_state & !STATE_FLAGS > (ChannelState::OurInitSent as u32)
        }
 
        /// Returns true if this channel is fully established and not known to be closing.
@@ -1161,7 +1180,7 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider  {
 
        // Checks whether we should emit a `ChannelPending` event.
        pub(crate) fn should_emit_channel_pending_event(&mut self) -> bool {
-               self.is_funding_initiated() && !self.channel_pending_event_emitted
+               self.is_funding_broadcast() && !self.channel_pending_event_emitted
        }
 
        // Returns whether we already emitted a `ChannelPending` event.
@@ -1220,9 +1239,11 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider  {
                did_channel_update
        }
 
-       /// Returns true if funding_created was sent/received.
-       pub fn is_funding_initiated(&self) -> bool {
-               self.channel_state >= ChannelState::FundingSent as u32
+       /// Returns true if funding_signed was sent/received and the
+       /// funding transaction has been broadcast if necessary.
+       pub fn is_funding_broadcast(&self) -> bool {
+               self.channel_state & !STATE_FLAGS >= ChannelState::FundingSent as u32 &&
+                       self.channel_state & ChannelState::WaitingForBatch as u32 == 0
        }
 
        /// Transaction nomenclature is somewhat confusing here as there are many different cases - a
@@ -1952,15 +1973,41 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider  {
                res
        }
 
-       /// Returns transaction if there is pending funding transaction that is yet to broadcast
-       pub fn unbroadcasted_funding(&self) -> Option<Transaction> {
-               if self.channel_state & (ChannelState::FundingCreated as u32) != 0 {
-                       self.funding_transaction.clone()
+       fn if_unbroadcasted_funding<F, O>(&self, f: F) -> Option<O>
+               where F: Fn() -> Option<O> {
+               if self.channel_state & ChannelState::FundingCreated as u32 != 0 ||
+                  self.channel_state & ChannelState::WaitingForBatch as u32 != 0 {
+                       f()
                } else {
                        None
                }
        }
 
+       /// Returns the transaction if there is a pending funding transaction that is yet to be
+       /// broadcast.
+       pub fn unbroadcasted_funding(&self) -> Option<Transaction> {
+               self.if_unbroadcasted_funding(|| self.funding_transaction.clone())
+       }
+
+       /// Returns the transaction ID if there is a pending funding transaction that is yet to be
+       /// broadcast.
+       pub fn unbroadcasted_funding_txid(&self) -> Option<Txid> {
+               self.if_unbroadcasted_funding(||
+                       self.channel_transaction_parameters.funding_outpoint.map(|txo| txo.txid)
+               )
+       }
+
+       /// Returns whether the channel is funded in a batch.
+       pub fn is_batch_funding(&self) -> bool {
+               self.is_batch_funding.is_some()
+       }
+
+       /// Returns the transaction ID if there is a pending batch funding transaction that is yet to be
+       /// broadcast.
+       pub fn unbroadcasted_batch_funding_txid(&self) -> Option<Txid> {
+               self.unbroadcasted_funding_txid().filter(|_| self.is_batch_funding())
+       }
+
        /// Gets the latest commitment transaction and any dependent transactions for relay (forcing
        /// shutdown of this channel - no more calls into this Channel may be made afterwards except
        /// those explicitly stated to be allowed after shutdown completes, eg some simple getters).
@@ -2001,10 +2048,11 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider  {
                                }))
                        } else { None }
                } else { None };
+               let unbroadcasted_batch_funding_txid = self.unbroadcasted_batch_funding_txid();
 
                self.channel_state = ChannelState::ShutdownComplete as u32;
                self.update_time_counter += 1;
-               (monitor_update, dropped_outbound_htlcs)
+               (monitor_update, dropped_outbound_htlcs, unbroadcasted_batch_funding_txid)
        }
 }
 
@@ -2574,7 +2622,11 @@ impl<SP: Deref> Channel<SP> where
                        counterparty_initial_commitment_tx.to_countersignatory_value_sat(), logger);
 
                assert_eq!(self.context.channel_state & (ChannelState::MonitorUpdateInProgress as u32), 0); // We have no had any monitor(s) yet to fail update!
-               self.context.channel_state = ChannelState::FundingSent as u32;
+               if self.context.is_batch_funding() {
+                       self.context.channel_state = ChannelState::FundingSent as u32 | ChannelState::WaitingForBatch as u32;
+               } else {
+                       self.context.channel_state = ChannelState::FundingSent as u32;
+               }
                self.context.cur_holder_commitment_transaction_number -= 1;
                self.context.cur_counterparty_commitment_transaction_number -= 1;
 
@@ -2585,6 +2637,15 @@ impl<SP: Deref> Channel<SP> where
                Ok(channel_monitor)
        }
 
+       /// Updates the state of the channel to indicate that all channels in the batch have received
+       /// funding_signed and persisted their monitors.
+       /// The funding transaction is consequently allowed to be broadcast, and the channel can be
+       /// treated as a non-batch channel going forward.
+       pub fn set_batch_ready(&mut self) {
+               self.context.is_batch_funding = None;
+               self.context.channel_state &= !(ChannelState::WaitingForBatch as u32);
+       }
+
        /// Handles a channel_ready message from our peer. If we've already sent our channel_ready
        /// and the channel is now usable (and public), this may generate an announcement_signatures to
        /// reply with.
@@ -2612,7 +2673,13 @@ impl<SP: Deref> Channel<SP> where
 
                let non_shutdown_state = self.context.channel_state & (!MULTI_STATE_FLAGS);
 
-               if non_shutdown_state == ChannelState::FundingSent as u32 {
+               // Our channel_ready shouldn't have been sent if we are waiting for other channels in the
+               // batch, but we can receive channel_ready messages.
+               debug_assert!(
+                       non_shutdown_state & ChannelState::OurChannelReady as u32 == 0 ||
+                       non_shutdown_state & ChannelState::WaitingForBatch as u32 == 0
+               );
+               if non_shutdown_state & !(ChannelState::WaitingForBatch as u32) == ChannelState::FundingSent as u32 {
                        self.context.channel_state |= ChannelState::TheirChannelReady as u32;
                } else if non_shutdown_state == (ChannelState::FundingSent as u32 | ChannelState::OurChannelReady as u32) {
                        self.context.channel_state = ChannelState::ChannelReady as u32 | (self.context.channel_state & MULTI_STATE_FLAGS);
@@ -3111,7 +3178,7 @@ impl<SP: Deref> Channel<SP> where
        ) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>)
        where F::Target: FeeEstimator, L::Target: Logger
        {
-               if self.context.channel_state >= ChannelState::ChannelReady as u32 &&
+               if self.context.channel_state & !STATE_FLAGS >= ChannelState::ChannelReady as u32 &&
                   (self.context.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
                        self.free_holding_cell_htlcs(fee_estimator, logger)
                } else { (None, Vec::new()) }
@@ -3585,17 +3652,17 @@ impl<SP: Deref> Channel<SP> where
        /// resent.
        /// No further message handling calls may be made until a channel_reestablish dance has
        /// completed.
-       pub fn remove_uncommitted_htlcs_and_mark_paused<L: Deref>(&mut self, logger: &L)  where L::Target: Logger {
+       /// May return `Err(())`, which implies [`ChannelContext::force_shutdown`] should be called immediately.
+       pub fn remove_uncommitted_htlcs_and_mark_paused<L: Deref>(&mut self, logger: &L) -> Result<(), ()> where L::Target: Logger {
                assert_eq!(self.context.channel_state & ChannelState::ShutdownComplete as u32, 0);
-               if self.context.channel_state < ChannelState::FundingSent as u32 {
-                       self.context.channel_state = ChannelState::ShutdownComplete as u32;
-                       return;
+               if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
+                       return Err(());
                }
 
                if self.context.channel_state & (ChannelState::PeerDisconnected as u32) == (ChannelState::PeerDisconnected as u32) {
                        // While the below code should be idempotent, it's simpler to just return early, as
                        // redundant disconnect events can fire, though they should be rare.
-                       return;
+                       return Ok(());
                }
 
                if self.context.announcement_sigs_state == AnnouncementSigsState::MessageSent || self.context.announcement_sigs_state == AnnouncementSigsState::Committed {
@@ -3656,6 +3723,7 @@ impl<SP: Deref> Channel<SP> where
 
                self.context.channel_state |= ChannelState::PeerDisconnected as u32;
                log_trace!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops on channel {}", inbound_drop_count, &self.context.channel_id());
+               Ok(())
        }
 
        /// Indicates that a ChannelMonitor update is in progress and has not yet been fully persisted.
@@ -3701,12 +3769,12 @@ impl<SP: Deref> Channel<SP> where
                // (re-)broadcast the funding transaction as we may have declined to broadcast it when we
                // first received the funding_signed.
                let mut funding_broadcastable =
-                       if self.context.is_outbound() && self.context.channel_state & !MULTI_STATE_FLAGS >= ChannelState::FundingSent as u32 {
+                       if self.context.is_outbound() && self.context.channel_state & !STATE_FLAGS >= ChannelState::FundingSent as u32 && self.context.channel_state & ChannelState::WaitingForBatch as u32 == 0 {
                                self.context.funding_transaction.take()
                        } else { None };
                // That said, if the funding transaction is already confirmed (ie we're active with a
                // minimum_depth over 0) don't bother re-broadcasting the confirmed funding tx.
-               if self.context.channel_state & !MULTI_STATE_FLAGS >= ChannelState::ChannelReady as u32 && self.context.minimum_depth != Some(0) {
+               if self.context.channel_state & !STATE_FLAGS >= ChannelState::ChannelReady as u32 && self.context.minimum_depth != Some(0) {
                        funding_broadcastable = None;
                }
 
@@ -4209,7 +4277,7 @@ impl<SP: Deref> Channel<SP> where
                if self.context.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
                        return Err(ChannelError::Close("Peer sent shutdown when we needed a channel_reestablish".to_owned()));
                }
-               if self.context.channel_state < ChannelState::FundingSent as u32 {
+               if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
                        // Spec says we should fail the connection, not the channel, but that's nonsense, there
                        // are plenty of reasons you may want to fail a channel pre-funding, and spec says you
                        // can do that via error message without getting a connection fail anyway...
@@ -4603,7 +4671,7 @@ impl<SP: Deref> Channel<SP> where
        pub fn is_awaiting_initial_mon_persist(&self) -> bool {
                if !self.is_awaiting_monitor_update() { return false; }
                if self.context.channel_state &
-                       !(ChannelState::TheirChannelReady as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)
+                       !(ChannelState::TheirChannelReady as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32 | ChannelState::WaitingForBatch as u32)
                                == ChannelState::FundingSent as u32 {
                        // If we're not a 0conf channel, we'll be waiting on a monitor update with only
                        // FundingSent set, though our peer could have sent their channel_ready.
@@ -4634,7 +4702,7 @@ impl<SP: Deref> Channel<SP> where
 
        /// Returns true if our channel_ready has been sent
        pub fn is_our_channel_ready(&self) -> bool {
-               (self.context.channel_state & ChannelState::OurChannelReady as u32) != 0 || self.context.channel_state >= ChannelState::ChannelReady as u32
+               (self.context.channel_state & ChannelState::OurChannelReady as u32) != 0 || self.context.channel_state & !STATE_FLAGS >= ChannelState::ChannelReady as u32
        }
 
        /// Returns true if our peer has either initiated or agreed to shut down the channel.
@@ -4683,6 +4751,8 @@ impl<SP: Deref> Channel<SP> where
                        return None;
                }
 
+               // Note that we don't include ChannelState::WaitingForBatch as we don't want to send
+               // channel_ready until the entire batch is ready.
                let non_shutdown_state = self.context.channel_state & (!MULTI_STATE_FLAGS);
                let need_commitment_update = if non_shutdown_state == ChannelState::FundingSent as u32 {
                        self.context.channel_state |= ChannelState::OurChannelReady as u32;
@@ -4695,7 +4765,7 @@ impl<SP: Deref> Channel<SP> where
                        // We got a reorg but not enough to trigger a force close, just ignore.
                        false
                } else {
-                       if self.context.funding_tx_confirmation_height != 0 && self.context.channel_state < ChannelState::ChannelReady as u32 {
+                       if self.context.funding_tx_confirmation_height != 0 && self.context.channel_state & !STATE_FLAGS < ChannelState::ChannelReady as u32 {
                                // We should never see a funding transaction on-chain until we've received
                                // funding_signed (if we're an outbound channel), or seen funding_generated (if we're
                                // an inbound channel - before that we have no known funding TXID). The fuzzer,
@@ -4865,7 +4935,7 @@ impl<SP: Deref> Channel<SP> where
                }
 
                let non_shutdown_state = self.context.channel_state & (!MULTI_STATE_FLAGS);
-               if non_shutdown_state >= ChannelState::ChannelReady as u32 ||
+               if non_shutdown_state & !STATE_FLAGS >= ChannelState::ChannelReady as u32 ||
                   (non_shutdown_state & ChannelState::OurChannelReady as u32) == ChannelState::OurChannelReady as u32 {
                        let mut funding_tx_confirmations = height as i64 - self.context.funding_tx_confirmation_height as i64 + 1;
                        if self.context.funding_tx_confirmation_height == 0 {
@@ -4893,7 +4963,7 @@ impl<SP: Deref> Channel<SP> where
                                height >= self.context.channel_creation_height + FUNDING_CONF_DEADLINE_BLOCKS {
                        log_info!(logger, "Closing channel {} due to funding timeout", &self.context.channel_id);
                        // If funding_tx_confirmed_in is unset, the channel must not be active
-                       assert!(non_shutdown_state <= ChannelState::ChannelReady as u32);
+                       assert!(non_shutdown_state & !STATE_FLAGS <= ChannelState::ChannelReady as u32);
                        assert_eq!(non_shutdown_state & ChannelState::OurChannelReady as u32, 0);
                        return Err(ClosureReason::FundingTimedOut);
                }
@@ -5513,7 +5583,7 @@ impl<SP: Deref> Channel<SP> where
                // If we haven't funded the channel yet, we don't need to bother ensuring the shutdown
                // script is set, we just force-close and call it a day.
                let mut chan_closed = false;
-               if self.context.channel_state < ChannelState::FundingSent as u32 {
+               if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
                        chan_closed = true;
                }
 
@@ -5542,7 +5612,7 @@ impl<SP: Deref> Channel<SP> where
 
                // From here on out, we may not fail!
                self.context.target_closing_feerate_sats_per_kw = target_feerate_sats_per_kw;
-               if self.context.channel_state < ChannelState::FundingSent as u32 {
+               if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
                        self.context.channel_state = ChannelState::ShutdownComplete as u32;
                } else {
                        self.context.channel_state |= ChannelState::LocalShutdownSent as u32;
@@ -5765,6 +5835,7 @@ impl<SP: Deref> OutboundV1Channel<SP> where SP::Target: SignerProvider {
                                        channel_type_features: channel_type.clone()
                                },
                                funding_transaction: None,
+                               is_batch_funding: None,
 
                                counterparty_cur_commitment_point: None,
                                counterparty_prev_commitment_point: None,
@@ -5825,7 +5896,7 @@ impl<SP: Deref> OutboundV1Channel<SP> where SP::Target: SignerProvider {
        /// Note that channel_id changes during this call!
        /// Do NOT broadcast the funding transaction until after a successful funding_signed call!
        /// If an Err is returned, it is a ChannelError::Close.
-       pub fn get_funding_created<L: Deref>(mut self, funding_transaction: Transaction, funding_txo: OutPoint, logger: &L)
+       pub fn get_funding_created<L: Deref>(mut self, funding_transaction: Transaction, funding_txo: OutPoint, is_batch_funding: bool, logger: &L)
        -> Result<(Channel<SP>, msgs::FundingCreated), (Self, ChannelError)> where L::Target: Logger {
                if !self.context.is_outbound() {
                        panic!("Tried to create outbound funding_created message on an inbound channel!");
@@ -5867,6 +5938,7 @@ impl<SP: Deref> OutboundV1Channel<SP> where SP::Target: SignerProvider {
                }
 
                self.context.funding_transaction = Some(funding_transaction);
+               self.context.is_batch_funding = Some(()).filter(|_| is_batch_funding);
 
                let channel = Channel {
                        context: self.context,
@@ -6416,6 +6488,7 @@ impl<SP: Deref> InboundV1Channel<SP> where SP::Target: SignerProvider {
                                        channel_type_features: channel_type.clone()
                                },
                                funding_transaction: None,
+                               is_batch_funding: None,
 
                                counterparty_cur_commitment_point: Some(msg.first_per_commitment_point),
                                counterparty_prev_commitment_point: None,
@@ -7031,6 +7104,7 @@ impl<SP: Deref> Writeable for Channel<SP> where SP::Target: SignerProvider {
                        (31, channel_pending_event_emitted, option),
                        (35, pending_outbound_skimmed_fees, optional_vec),
                        (37, holding_cell_skimmed_fees, optional_vec),
+                       (38, self.context.is_batch_funding, option),
                });
 
                Ok(())
@@ -7253,7 +7327,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                };
 
                let mut channel_parameters: ChannelTransactionParameters = Readable::read(reader)?;
-               let funding_transaction = Readable::read(reader)?;
+               let funding_transaction: Option<Transaction> = Readable::read(reader)?;
 
                let counterparty_cur_commitment_point = Readable::read(reader)?;
 
@@ -7314,6 +7388,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                let mut pending_outbound_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
                let mut holding_cell_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
 
+               let mut is_batch_funding: Option<()> = None;
+
                read_tlv_fields!(reader, {
                        (0, announcement_sigs, option),
                        (1, minimum_depth, option),
@@ -7339,6 +7415,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                        (31, channel_pending_event_emitted, option),
                        (35, pending_outbound_skimmed_fees_opt, optional_vec),
                        (37, holding_cell_skimmed_fees_opt, optional_vec),
+                       (38, is_batch_funding, option),
                });
 
                let (channel_keys_id, holder_signer) = if let Some(channel_keys_id) = channel_keys_id {
@@ -7346,7 +7423,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                        // If we've gotten to the funding stage of the channel, populate the signer with its
                        // required channel parameters.
                        let non_shutdown_state = channel_state & (!MULTI_STATE_FLAGS);
-                       if non_shutdown_state >= (ChannelState::FundingCreated as u32) {
+                       if non_shutdown_state & !STATE_FLAGS >= (ChannelState::FundingCreated as u32) {
                                holder_signer.provide_channel_parameters(&channel_parameters);
                        }
                        (channel_keys_id, holder_signer)
@@ -7496,6 +7573,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
 
                                channel_transaction_parameters: channel_parameters,
                                funding_transaction,
+                               is_batch_funding,
 
                                counterparty_cur_commitment_point,
                                counterparty_prev_commitment_point,
@@ -7549,7 +7627,7 @@ mod tests {
        use crate::ln::PaymentHash;
        use crate::ln::channelmanager::{self, HTLCSource, PaymentId};
        use crate::ln::channel::InitFeatures;
-       use crate::ln::channel::{Channel, InboundHTLCOutput, OutboundV1Channel, InboundV1Channel, OutboundHTLCOutput, InboundHTLCState, OutboundHTLCState, HTLCCandidate, HTLCInitiator, commit_tx_fee_msat};
+       use crate::ln::channel::{Channel, ChannelState, InboundHTLCOutput, OutboundV1Channel, InboundV1Channel, OutboundHTLCOutput, InboundHTLCState, OutboundHTLCState, HTLCCandidate, HTLCInitiator, commit_tx_fee_msat};
        use crate::ln::channel::{MAX_FUNDING_SATOSHIS_NO_WUMBO, TOTAL_BITCOIN_SUPPLY_SATOSHIS, MIN_THEIR_CHAN_RESERVE_SATOSHIS};
        use crate::ln::features::ChannelTypeFeatures;
        use crate::ln::msgs::{ChannelUpdate, DecodeError, UnsignedChannelUpdate, MAX_VALUE_MSAT};
@@ -7728,7 +7806,7 @@ mod tests {
                        value: 10000000, script_pubkey: output_script.clone(),
                }]};
                let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
-               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap();
+               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap();
                let (_, funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).map_err(|_| ()).unwrap();
 
                // Node B --> Node A: funding signed
@@ -7855,7 +7933,7 @@ mod tests {
                        value: 10000000, script_pubkey: output_script.clone(),
                }]};
                let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
-               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap();
+               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap();
                let (mut node_b_chan, funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).map_err(|_| ()).unwrap();
 
                // Node B --> Node A: funding signed
@@ -7863,7 +7941,7 @@ mod tests {
 
                // Now disconnect the two nodes and check that the commitment point in
                // Node B's channel_reestablish message is sane.
-               node_b_chan.remove_uncommitted_htlcs_and_mark_paused(&&logger);
+               assert!(node_b_chan.remove_uncommitted_htlcs_and_mark_paused(&&logger).is_ok());
                let msg = node_b_chan.get_channel_reestablish(&&logger);
                assert_eq!(msg.next_local_commitment_number, 1); // now called next_commitment_number
                assert_eq!(msg.next_remote_commitment_number, 0); // now called next_revocation_number
@@ -7871,7 +7949,7 @@ mod tests {
 
                // Check that the commitment point in Node A's channel_reestablish message
                // is sane.
-               node_a_chan.remove_uncommitted_htlcs_and_mark_paused(&&logger);
+               assert!(node_a_chan.remove_uncommitted_htlcs_and_mark_paused(&&logger).is_ok());
                let msg = node_a_chan.get_channel_reestablish(&&logger);
                assert_eq!(msg.next_local_commitment_number, 1); // now called next_commitment_number
                assert_eq!(msg.next_remote_commitment_number, 0); // now called next_revocation_number
@@ -8043,7 +8121,7 @@ mod tests {
                        value: 10000000, script_pubkey: output_script.clone(),
                }]};
                let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
-               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap();
+               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap();
                let (_, funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).map_err(|_| ()).unwrap();
 
                // Node B --> Node A: funding signed
@@ -9023,4 +9101,146 @@ mod tests {
                );
                assert!(res.is_err());
        }
+
+       #[test]
+       fn test_waiting_for_batch() {
+               let feeest = LowerBoundedFeeEstimator::new(&TestFeeEstimator{fee_est: 15000});
+               let logger = test_utils::TestLogger::new();
+               let secp_ctx = Secp256k1::new();
+               let seed = [42; 32];
+               let network = Network::Testnet;
+               let best_block = BestBlock::from_network(network);
+               let chain_hash = genesis_block(network).header.block_hash();
+               let keys_provider = test_utils::TestKeysInterface::new(&seed, network);
+
+               let mut config = UserConfig::default();
+               // Set trust_own_funding_0conf while ensuring we don't send channel_ready for a
+               // channel in a batch before all channels are ready.
+               config.channel_handshake_limits.trust_own_funding_0conf = true;
+
+               // Create a channel from node a to node b that will be part of batch funding.
+               let node_b_node_id = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap());
+               let mut node_a_chan = OutboundV1Channel::<&TestKeysInterface>::new(
+                       &feeest,
+                       &&keys_provider,
+                       &&keys_provider,
+                       node_b_node_id,
+                       &channelmanager::provided_init_features(&config),
+                       10000000,
+                       100000,
+                       42,
+                       &config,
+                       0,
+                       42,
+               ).unwrap();
+
+               let open_channel_msg = node_a_chan.get_open_channel(genesis_block(network).header.block_hash());
+               let node_b_node_id = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[7; 32]).unwrap());
+               let mut node_b_chan = InboundV1Channel::<&TestKeysInterface>::new(
+                       &feeest,
+                       &&keys_provider,
+                       &&keys_provider,
+                       node_b_node_id,
+                       &channelmanager::provided_channel_type_features(&config),
+                       &channelmanager::provided_init_features(&config),
+                       &open_channel_msg,
+                       7,
+                       &config,
+                       0,
+                       &&logger,
+                       true,  // Allow node b to send a 0conf channel_ready.
+               ).unwrap();
+
+               let accept_channel_msg = node_b_chan.accept_inbound_channel();
+               node_a_chan.accept_channel(
+                       &accept_channel_msg,
+                       &config.channel_handshake_limits,
+                       &channelmanager::provided_init_features(&config),
+               ).unwrap();
+
+               // Fund the channel with a batch funding transaction.
+               let output_script = node_a_chan.context.get_funding_redeemscript();
+               let tx = Transaction {
+                       version: 1,
+                       lock_time: PackedLockTime::ZERO,
+                       input: Vec::new(),
+                       output: vec![
+                               TxOut {
+                                       value: 10000000, script_pubkey: output_script.clone(),
+                               },
+                               TxOut {
+                                       value: 10000000, script_pubkey: Builder::new().into_script(),
+                               },
+                       ]};
+               let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
+               let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(
+                       tx.clone(),
+                       funding_outpoint,
+                       true,
+                       &&logger,
+               ).map_err(|_| ()).unwrap();
+               let (mut node_b_chan, funding_signed_msg, _) = node_b_chan.funding_created(
+                       &funding_created_msg,
+                       best_block,
+                       &&keys_provider,
+                       &&logger,
+               ).map_err(|_| ()).unwrap();
+               let node_b_updates = node_b_chan.monitor_updating_restored(
+                       &&logger,
+                       &&keys_provider,
+                       chain_hash,
+                       &config,
+                       0,
+               );
+
+               // Receive funding_signed, but the channel will be configured to hold sending channel_ready and
+               // broadcasting the funding transaction until the batch is ready.
+               let _ = node_a_chan.funding_signed(
+                       &funding_signed_msg,
+                       best_block,
+                       &&keys_provider,
+                       &&logger,
+               ).unwrap();
+               let node_a_updates = node_a_chan.monitor_updating_restored(
+                       &&logger,
+                       &&keys_provider,
+                       chain_hash,
+                       &config,
+                       0,
+               );
+               // Our channel_ready shouldn't be sent yet, even with trust_own_funding_0conf set,
+               // as the funding transaction depends on all channels in the batch becoming ready.
+               assert!(node_a_updates.channel_ready.is_none());
+               assert!(node_a_updates.funding_broadcastable.is_none());
+               assert_eq!(
+                       node_a_chan.context.channel_state,
+                       ChannelState::FundingSent as u32 |
+                       ChannelState::WaitingForBatch as u32,
+               );
+
+               // It is possible to receive a 0conf channel_ready from the remote node.
+               node_a_chan.channel_ready(
+                       &node_b_updates.channel_ready.unwrap(),
+                       &&keys_provider,
+                       chain_hash,
+                       &config,
+                       &best_block,
+                       &&logger,
+               ).unwrap();
+               assert_eq!(
+                       node_a_chan.context.channel_state,
+                       ChannelState::FundingSent as u32 |
+                       ChannelState::WaitingForBatch as u32 |
+                       ChannelState::TheirChannelReady as u32,
+               );
+
+               // Clear the ChannelState::WaitingForBatch only when called by ChannelManager.
+               node_a_chan.set_batch_ready();
+               assert_eq!(
+                       node_a_chan.context.channel_state,
+                       ChannelState::FundingSent as u32 |
+                       ChannelState::TheirChannelReady as u32,
+               );
+               assert!(node_a_chan.check_get_channel_ready(0).is_some());
+       }
 }
index 3ef57c5b87f024e8861ecabf7226c4f6eae19ad7..e0fc3ed2c83e2f3fae3f4958f1c31181e7f8e633 100644 (file)
@@ -64,7 +64,7 @@ use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, Maybe
 use crate::util::logger::{Level, Logger};
 use crate::util::errors::APIError;
 
-use alloc::collections::BTreeMap;
+use alloc::collections::{btree_map, BTreeMap};
 
 use crate::io;
 use crate::prelude::*;
@@ -1201,6 +1201,12 @@ where
        /// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
        /// Notifier the lock contains sends out a notification when the lock is released.
        total_consistency_lock: RwLock<()>,
+       /// Tracks the progress of channels going through batch funding by whether funding_signed was
+       /// received and the monitor has been persisted.
+       ///
+       /// This information does not need to be persisted as funding nodes can forget
+       /// unfunded channels upon disconnection.
+       funding_batch_states: Mutex<BTreeMap<Txid, Vec<(ChannelId, PublicKey, bool)>>>,
 
        background_events_processed_since_startup: AtomicBool,
 
@@ -1788,7 +1794,7 @@ macro_rules! handle_error {
                                let mut msg_events = Vec::with_capacity(2);
 
                                if let Some((shutdown_res, update_option)) = shutdown_finish {
-                                       $self.finish_force_close_channel(shutdown_res);
+                                       $self.finish_close_channel(shutdown_res);
                                        if let Some(update) = update_option {
                                                msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
@@ -2025,9 +2031,54 @@ macro_rules! handle_monitor_update_completion {
                }
 
                let channel_id = $chan.context.channel_id();
+               let unbroadcasted_batch_funding_txid = $chan.context.unbroadcasted_batch_funding_txid();
                core::mem::drop($peer_state_lock);
                core::mem::drop($per_peer_state_lock);
 
+               // If the channel belongs to a batch funding transaction, the progress of the batch
+               // should be updated as we have received funding_signed and persisted the monitor.
+               if let Some(txid) = unbroadcasted_batch_funding_txid {
+                       let mut funding_batch_states = $self.funding_batch_states.lock().unwrap();
+                       let mut batch_completed = false;
+                       if let Some(batch_state) = funding_batch_states.get_mut(&txid) {
+                               let channel_state = batch_state.iter_mut().find(|(chan_id, pubkey, _)| (
+                                       *chan_id == channel_id &&
+                                       *pubkey == counterparty_node_id
+                               ));
+                               if let Some(channel_state) = channel_state {
+                                       channel_state.2 = true;
+                               } else {
+                                       debug_assert!(false, "Missing channel batch state for channel which completed initial monitor update");
+                               }
+                               batch_completed = batch_state.iter().all(|(_, _, completed)| *completed);
+                       } else {
+                               debug_assert!(false, "Missing batch state for channel which completed initial monitor update");
+                       }
+
+                       // When all channels in a batched funding transaction have become ready, it is not necessary
+                       // to track the progress of the batch anymore and the state of the channels can be updated.
+                       if batch_completed {
+                               let removed_batch_state = funding_batch_states.remove(&txid).into_iter().flatten();
+                               let per_peer_state = $self.per_peer_state.read().unwrap();
+                               let mut batch_funding_tx = None;
+                               for (channel_id, counterparty_node_id, _) in removed_batch_state {
+                                       if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+                                               let mut peer_state = peer_state_mutex.lock().unwrap();
+                                               if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get_mut(&channel_id) {
+                                                       batch_funding_tx = batch_funding_tx.or_else(|| chan.context.unbroadcasted_funding());
+                                                       chan.set_batch_ready();
+                                                       let mut pending_events = $self.pending_events.lock().unwrap();
+                                                       emit_channel_pending_event!(pending_events, chan);
+                                               }
+                                       }
+                               }
+                               if let Some(tx) = batch_funding_tx {
+                                       log_info!($self.logger, "Broadcasting batch funding transaction with txid {}", tx.txid());
+                                       $self.tx_broadcaster.broadcast_transactions(&[&tx]);
+                               }
+                       }
+               }
+
                $self.handle_monitor_update_completion_actions(update_actions);
 
                if let Some(forwards) = htlc_forwards {
@@ -2230,9 +2281,9 @@ where
                        pending_background_events: Mutex::new(Vec::new()),
                        total_consistency_lock: RwLock::new(()),
                        background_events_processed_since_startup: AtomicBool::new(false),
-
                        event_persist_notifier: Notifier::new(),
                        needs_persist_flag: AtomicBool::new(false),
+                       funding_batch_states: Mutex::new(BTreeMap::new()),
 
                        entropy_source,
                        node_signer,
@@ -2497,6 +2548,7 @@ where
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
                let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
+               let mut shutdown_result = None;
                loop {
                        let per_peer_state = self.per_peer_state.read().unwrap();
 
@@ -2511,6 +2563,7 @@ where
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
                                                let funding_txo_opt = chan.context.get_funding_txo();
                                                let their_features = &peer_state.latest_features;
+                                               let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                let (shutdown_msg, mut monitor_update_opt, htlcs) =
                                                        chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
                                                failed_htlcs = htlcs;
@@ -2541,6 +2594,7 @@ where
                                                                        });
                                                                }
                                                                self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
+                                                               shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                                                        }
                                                }
                                                break;
@@ -2562,6 +2616,10 @@ where
                        self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
                }
 
+               if let Some(shutdown_result) = shutdown_result {
+                       self.finish_close_channel(shutdown_result);
+               }
+
                Ok(())
        }
 
@@ -2626,14 +2684,14 @@ where
                self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script)
        }
 
-       fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) {
+       fn finish_close_channel(&self, shutdown_res: ShutdownResult) {
                debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
                #[cfg(debug_assertions)]
                for (_, peer) in self.per_peer_state.read().unwrap().iter() {
                        debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
                }
 
-               let (monitor_update_option, mut failed_htlcs) = shutdown_res;
+               let (monitor_update_option, mut failed_htlcs, unbroadcasted_batch_funding_txid) = shutdown_res;
                log_debug!(self.logger, "Finishing force-closure of channel with {} HTLCs to fail", failed_htlcs.len());
                for htlc_source in failed_htlcs.drain(..) {
                        let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
@@ -2648,6 +2706,31 @@ where
                        // ignore the result here.
                        let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
                }
+               let mut shutdown_results = Vec::new();
+               if let Some(txid) = unbroadcasted_batch_funding_txid {
+                       let mut funding_batch_states = self.funding_batch_states.lock().unwrap();
+                       let affected_channels = funding_batch_states.remove(&txid).into_iter().flatten();
+                       let per_peer_state = self.per_peer_state.read().unwrap();
+                       let mut has_uncompleted_channel = None;
+                       for (channel_id, counterparty_node_id, state) in affected_channels {
+                               if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
+                                       let mut peer_state = peer_state_mutex.lock().unwrap();
+                                       if let Some(mut chan) = peer_state.channel_by_id.remove(&channel_id) {
+                                               update_maps_on_chan_removal!(self, &chan.context());
+                                               self.issue_channel_close_events(&chan.context(), ClosureReason::FundingBatchClosure);
+                                               shutdown_results.push(chan.context_mut().force_shutdown(false));
+                                       }
+                               }
+                               has_uncompleted_channel = Some(has_uncompleted_channel.map_or(!state, |v| v || !state));
+                       }
+                       debug_assert!(
+                               has_uncompleted_channel.unwrap_or(true),
+                               "Closing a batch where all channels have completed initial monitor update",
+                       );
+               }
+               for shutdown_result in shutdown_results.drain(..) {
+                       self.finish_close_channel(shutdown_result);
+               }
        }
 
        /// `peer_msg` should be set when we receive a message from a peer, but not set when the
@@ -2672,11 +2755,11 @@ where
                                mem::drop(per_peer_state);
                                match chan_phase {
                                        ChannelPhase::Funded(mut chan) => {
-                                               self.finish_force_close_channel(chan.context.force_shutdown(broadcast));
+                                               self.finish_close_channel(chan.context.force_shutdown(broadcast));
                                                (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id())
                                        },
                                        ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => {
-                                               self.finish_force_close_channel(chan_phase.context_mut().force_shutdown(false));
+                                               self.finish_close_channel(chan_phase.context_mut().force_shutdown(false));
                                                // Unfunded channel has no update
                                                (None, chan_phase.context().get_counterparty_node_id())
                                        },
@@ -3537,7 +3620,7 @@ where
        ///
        /// See [`ChannelManager::send_preflight_probes`] for more information.
        pub fn send_spontaneous_preflight_probes(
-               &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32, 
+               &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32,
                liquidity_limit_multiplier: Option<u64>,
        ) -> Result<Vec<(PaymentHash, PaymentId)>, ProbeSendFailure> {
                let payment_params =
@@ -3644,8 +3727,9 @@ where
 
        /// Handles the generation of a funding transaction, optionally (for tests) with a function
        /// which checks the correctness of the funding transaction given the associated channel.
-       fn funding_transaction_generated_intern<FundingOutput: Fn(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
-               &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput
+       fn funding_transaction_generated_intern<FundingOutput: FnMut(&OutboundV1Channel<SP>, &Transaction) -> Result<OutPoint, APIError>>(
+               &self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, is_batch_funding: bool,
+               mut find_funding_output: FundingOutput,
        ) -> Result<(), APIError> {
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
@@ -3657,7 +3741,7 @@ where
                        Some(ChannelPhase::UnfundedOutboundV1(chan)) => {
                                let funding_txo = find_funding_output(&chan, &funding_transaction)?;
 
-                               let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger)
+                               let funding_res = chan.get_funding_created(funding_transaction, funding_txo, is_batch_funding, &self.logger)
                                        .map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e {
                                                let channel_id = chan.context.channel_id();
                                                let user_id = chan.context.get_user_id();
@@ -3713,7 +3797,7 @@ where
 
        #[cfg(test)]
        pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> {
-               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |_, tx| {
+               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, false, |_, tx| {
                        Ok(OutPoint { txid: tx.txid(), index: output_index })
                })
        }
@@ -3749,17 +3833,37 @@ where
        /// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady
        /// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed
        pub fn funding_transaction_generated(&self, temporary_channel_id: &ChannelId, counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> {
+               self.batch_funding_transaction_generated(&[(temporary_channel_id, counterparty_node_id)], funding_transaction)
+       }
+
+       /// Call this upon creation of a batch funding transaction for the given channels.
+       ///
+       /// Return values are identical to [`Self::funding_transaction_generated`], respective to
+       /// each individual channel and transaction output.
+       ///
+       /// Do NOT broadcast the funding transaction yourself. This batch funding transcaction
+       /// will only be broadcast when we have safely received and persisted the counterparty's
+       /// signature for each channel.
+       ///
+       /// If there is an error, all channels in the batch are to be considered closed.
+       pub fn batch_funding_transaction_generated(&self, temporary_channels: &[(&ChannelId, &PublicKey)], funding_transaction: Transaction) -> Result<(), APIError> {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+               let mut result = Ok(());
 
                if !funding_transaction.is_coin_base() {
                        for inp in funding_transaction.input.iter() {
                                if inp.witness.is_empty() {
-                                       return Err(APIError::APIMisuseError {
+                                       result = result.and(Err(APIError::APIMisuseError {
                                                err: "Funding transaction must be fully signed and spend Segwit outputs".to_owned()
-                                       });
+                                       }));
                                }
                        }
                }
+               if funding_transaction.output.len() > u16::max_value() as usize {
+                       result = result.and(Err(APIError::APIMisuseError {
+                               err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
+                       }));
+               }
                {
                        let height = self.best_block.read().unwrap().height();
                        // Transactions are evaluated as final by network mempools if their locktime is strictly
@@ -3767,37 +3871,93 @@ where
                        // node might not have perfect sync about their blockchain views. Thus, if the wallet
                        // module is ahead of LDK, only allow one more block of headroom.
                        if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 1 {
-                               return Err(APIError::APIMisuseError {
+                               result = result.and(Err(APIError::APIMisuseError {
                                        err: "Funding transaction absolute timelock is non-final".to_owned()
-                               });
+                               }));
                        }
                }
-               self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| {
-                       if tx.output.len() > u16::max_value() as usize {
-                               return Err(APIError::APIMisuseError {
-                                       err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
-                               });
-                       }
 
-                       let mut output_index = None;
-                       let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
-                       for (idx, outp) in tx.output.iter().enumerate() {
-                               if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
-                                       if output_index.is_some() {
+               let txid = funding_transaction.txid();
+               let is_batch_funding = temporary_channels.len() > 1;
+               let mut funding_batch_states = if is_batch_funding {
+                       Some(self.funding_batch_states.lock().unwrap())
+               } else {
+                       None
+               };
+               let mut funding_batch_state = funding_batch_states.as_mut().and_then(|states| {
+                       match states.entry(txid) {
+                               btree_map::Entry::Occupied(_) => {
+                                       result = result.clone().and(Err(APIError::APIMisuseError {
+                                               err: "Batch funding transaction with the same txid already exists".to_owned()
+                                       }));
+                                       None
+                               },
+                               btree_map::Entry::Vacant(vacant) => Some(vacant.insert(Vec::new())),
+                       }
+               });
+               for (channel_idx, &(temporary_channel_id, counterparty_node_id)) in temporary_channels.iter().enumerate() {
+                       result = result.and_then(|_| self.funding_transaction_generated_intern(
+                               temporary_channel_id,
+                               counterparty_node_id,
+                               funding_transaction.clone(),
+                               is_batch_funding,
+                               |chan, tx| {
+                                       let mut output_index = None;
+                                       let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
+                                       for (idx, outp) in tx.output.iter().enumerate() {
+                                               if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
+                                                       if output_index.is_some() {
+                                                               return Err(APIError::APIMisuseError {
+                                                                       err: "Multiple outputs matched the expected script and value".to_owned()
+                                                               });
+                                                       }
+                                                       output_index = Some(idx as u16);
+                                               }
+                                       }
+                                       if output_index.is_none() {
                                                return Err(APIError::APIMisuseError {
-                                                       err: "Multiple outputs matched the expected script and value".to_owned()
+                                                       err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
                                                });
                                        }
-                                       output_index = Some(idx as u16);
+                                       let outpoint = OutPoint { txid: tx.txid(), index: output_index.unwrap() };
+                                       if let Some(funding_batch_state) = funding_batch_state.as_mut() {
+                                               funding_batch_state.push((outpoint.to_channel_id(), *counterparty_node_id, false));
+                                       }
+                                       Ok(outpoint)
+                               })
+                       );
+               }
+               if let Err(ref e) = result {
+                       // Remaining channels need to be removed on any error.
+                       let e = format!("Error in transaction funding: {:?}", e);
+                       let mut channels_to_remove = Vec::new();
+                       channels_to_remove.extend(funding_batch_states.as_mut()
+                               .and_then(|states| states.remove(&txid))
+                               .into_iter().flatten()
+                               .map(|(chan_id, node_id, _state)| (chan_id, node_id))
+                       );
+                       channels_to_remove.extend(temporary_channels.iter()
+                               .map(|(&chan_id, &node_id)| (chan_id, node_id))
+                       );
+                       let mut shutdown_results = Vec::new();
+                       {
+                               let per_peer_state = self.per_peer_state.read().unwrap();
+                               for (channel_id, counterparty_node_id) in channels_to_remove {
+                                       per_peer_state.get(&counterparty_node_id)
+                                               .map(|peer_state_mutex| peer_state_mutex.lock().unwrap())
+                                               .and_then(|mut peer_state| peer_state.channel_by_id.remove(&channel_id))
+                                               .map(|mut chan| {
+                                                       update_maps_on_chan_removal!(self, &chan.context());
+                                                       self.issue_channel_close_events(&chan.context(), ClosureReason::ProcessingError { err: e.clone() });
+                                                       shutdown_results.push(chan.context_mut().force_shutdown(false));
+                                               });
                                }
                        }
-                       if output_index.is_none() {
-                               return Err(APIError::APIMisuseError {
-                                       err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
-                               });
+                       for shutdown_result in shutdown_results.drain(..) {
+                               self.finish_close_channel(shutdown_result);
                        }
-                       Ok(OutPoint { txid: tx.txid(), index: output_index.unwrap() })
-               })
+               }
+               result
        }
 
        /// Atomically applies partial updates to the [`ChannelConfig`] of the given channels.
@@ -4849,7 +5009,7 @@ where
                        }
 
                        for shutdown_res in shutdown_channels {
-                               self.finish_force_close_channel(shutdown_res);
+                               self.finish_close_channel(shutdown_res);
                        }
 
                        self.pending_outbound_payments.remove_stale_payments(&self.pending_events);
@@ -6075,13 +6235,15 @@ where
                        self.fail_htlc_backwards_internal(&htlc_source.0, &htlc_source.1, &reason, receiver);
                }
                if let Some(shutdown_res) = finish_shutdown {
-                       self.finish_force_close_channel(shutdown_res);
+                       self.finish_close_channel(shutdown_res);
                }
 
                Ok(())
        }
 
        fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
+               let mut shutdown_result = None;
+               let unbroadcasted_batch_funding_txid;
                let per_peer_state = self.per_peer_state.read().unwrap();
                let peer_state_mutex = per_peer_state.get(counterparty_node_id)
                        .ok_or_else(|| {
@@ -6094,6 +6256,7 @@ where
                        match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
                                hash_map::Entry::Occupied(mut chan_phase_entry) => {
                                        if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
+                                               unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
                                                if let Some(msg) = closing_signed {
                                                        peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
@@ -6130,6 +6293,11 @@ where
                                });
                        }
                        self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure);
+                       shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
+               }
+               mem::drop(per_peer_state);
+               if let Some(shutdown_result) = shutdown_result {
+                       self.finish_close_channel(shutdown_result);
                }
                Ok(())
        }
@@ -6734,7 +6902,7 @@ where
                }
 
                for failure in failed_channels.drain(..) {
-                       self.finish_force_close_channel(failure);
+                       self.finish_close_channel(failure);
                }
 
                has_pending_monitor_events
@@ -6804,6 +6972,8 @@ where
        fn maybe_generate_initial_closing_signed(&self) -> bool {
                let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new();
                let mut has_update = false;
+               let mut shutdown_result = None;
+               let mut unbroadcasted_batch_funding_txid = None;
                {
                        let per_peer_state = self.per_peer_state.read().unwrap();
 
@@ -6814,6 +6984,7 @@ where
                                peer_state.channel_by_id.retain(|channel_id, phase| {
                                        match phase {
                                                ChannelPhase::Funded(chan) => {
+                                                       unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
                                                        match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) {
                                                                Ok((msg_opt, tx_opt)) => {
                                                                        if let Some(msg) = msg_opt {
@@ -6836,6 +7007,7 @@ where
                                                                                log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
                                                                                self.tx_broadcaster.broadcast_transactions(&[&tx]);
                                                                                update_maps_on_chan_removal!(self, &chan.context);
+                                                                               shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
                                                                                false
                                                                        } else { true }
                                                                },
@@ -6857,6 +7029,10 @@ where
                        let _ = handle_error!(self, err, counterparty_node_id);
                }
 
+               if let Some(shutdown_result) = shutdown_result {
+                       self.finish_close_channel(shutdown_result);
+               }
+
                has_update
        }
 
@@ -6882,7 +7058,7 @@ where
                                                counterparty_node_id, funding_txo, update
                                        });
                        }
-                       self.finish_force_close_channel(failure);
+                       self.finish_close_channel(failure);
                }
        }
 
@@ -7822,7 +7998,6 @@ where
        fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
                let _persistence_guard = PersistenceNotifierGuard::optionally_notify(
                        self, || NotifyOption::SkipPersistHandleEvents);
-
                let mut failed_channels = Vec::new();
                let mut per_peer_state = self.per_peer_state.write().unwrap();
                let remove_peer = {
@@ -7835,24 +8010,24 @@ where
                                peer_state.channel_by_id.retain(|_, phase| {
                                        let context = match phase {
                                                ChannelPhase::Funded(chan) => {
-                                                       chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger);
-                                                       // We only retain funded channels that are not shutdown.
-                                                       if !chan.is_shutdown() {
+                                                       if chan.remove_uncommitted_htlcs_and_mark_paused(&self.logger).is_ok() {
+                                                               // We only retain funded channels that are not shutdown.
                                                                return true;
                                                        }
-                                                       &chan.context
+                                                       &mut chan.context
                                                },
                                                // Unfunded channels will always be removed.
                                                ChannelPhase::UnfundedOutboundV1(chan) => {
-                                                       &chan.context
+                                                       &mut chan.context
                                                },
                                                ChannelPhase::UnfundedInboundV1(chan) => {
-                                                       &chan.context
+                                                       &mut chan.context
                                                },
                                        };
                                        // Clean up for removal.
                                        update_maps_on_chan_removal!(self, &context);
                                        self.issue_channel_close_events(&context, ClosureReason::DisconnectedPeer);
+                                       failed_channels.push(context.force_shutdown(false));
                                        false
                                });
                                // Note that we don't bother generating any events for pre-accept channels -
@@ -7911,7 +8086,7 @@ where
                mem::drop(per_peer_state);
 
                for failure in failed_channels.drain(..) {
-                       self.finish_force_close_channel(failure);
+                       self.finish_close_channel(failure);
                }
        }
 
@@ -8656,7 +8831,7 @@ where
                                }
 
                                number_of_funded_channels += peer_state.channel_by_id.iter().filter(
-                                       |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_initiated() } else { false }
+                                       |(_, phase)| if let ChannelPhase::Funded(chan) = phase { chan.context.is_funding_broadcast() } else { false }
                                ).count();
                        }
 
@@ -8667,7 +8842,7 @@ where
                                let peer_state = &mut *peer_state_lock;
                                for channel in peer_state.channel_by_id.iter().filter_map(
                                        |(_, phase)| if let ChannelPhase::Funded(channel) = phase {
-                                               if channel.context.is_funding_initiated() { Some(channel) } else { None }
+                                               if channel.context.is_funding_broadcast() { Some(channel) } else { None }
                                        } else { None }
                                ) {
                                        channel.write(writer)?;
@@ -9091,7 +9266,10 @@ where
                                                log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
                                                        &channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
                                        }
-                                       let (monitor_update, mut new_failed_htlcs) = channel.context.force_shutdown(true);
+                                       let (monitor_update, mut new_failed_htlcs, batch_funding_txid) = channel.context.force_shutdown(true);
+                                       if batch_funding_txid.is_some() {
+                                               return Err(DecodeError::InvalidValue);
+                                       }
                                        if let Some((counterparty_node_id, funding_txo, update)) = monitor_update {
                                                close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
                                                        counterparty_node_id, funding_txo, update
@@ -9131,7 +9309,7 @@ where
                                        if let Some(short_channel_id) = channel.context.get_short_channel_id() {
                                                short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
                                        }
-                                       if channel.context.is_funding_initiated() {
+                                       if channel.context.is_funding_broadcast() {
                                                id_to_peer.insert(channel.context.channel_id(), channel.context.get_counterparty_node_id());
                                        }
                                        match funded_peer_channels.entry(channel.context.get_counterparty_node_id()) {
@@ -9845,6 +10023,8 @@ where
                        event_persist_notifier: Notifier::new(),
                        needs_persist_flag: AtomicBool::new(false),
 
+                       funding_batch_states: Mutex::new(BTreeMap::new()),
+
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,
                        signer_provider: args.signer_provider,
index 0e19ceb65832e47db0e3fb7d5165361acf7282b1..207c0692031a7c89b9f2f6c75da9fc71997abcc3 100644 (file)
@@ -1473,12 +1473,12 @@ pub fn check_closed_event(node: &Node, events_count: usize, expected_reason: Clo
        let events = node.node.get_and_clear_pending_events();
        assert_eq!(events.len(), events_count, "{:?}", events);
        let mut issues_discard_funding = false;
-       for (idx, event) in events.into_iter().enumerate() {
+       for event in events {
                match event {
-                       Event::ChannelClosed { ref reason, counterparty_node_id, 
+                       Event::ChannelClosed { ref reason, counterparty_node_id,
                                channel_capacity_sats, .. } => {
                                assert_eq!(*reason, expected_reason);
-                               assert_eq!(counterparty_node_id.unwrap(), expected_counterparty_node_ids[idx]);
+                               assert!(expected_counterparty_node_ids.iter().any(|id| id == &counterparty_node_id.unwrap()));
                                assert_eq!(channel_capacity_sats.unwrap(), expected_channel_capacity);
                        },
                        Event::DiscardFunding { .. } => {
@@ -1499,7 +1499,7 @@ macro_rules! check_closed_event {
                check_closed_event!($node, $events, $reason, false, $counterparty_node_ids, $channel_capacity);
        };
        ($node: expr, $events: expr, $reason: expr, $is_check_discard_funding: expr, $counterparty_node_ids: expr, $channel_capacity: expr) => {
-               $crate::ln::functional_test_utils::check_closed_event(&$node, $events, $reason, 
+               $crate::ln::functional_test_utils::check_closed_event(&$node, $events, $reason,
                        $is_check_discard_funding, &$counterparty_node_ids, $channel_capacity);
        }
 }
@@ -3266,3 +3266,76 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) {
                }
        }
 }
+
+/// Initiates channel opening and creates a single batch funding transaction.
+/// This will go through the open_channel / accept_channel flow, and return the batch funding
+/// transaction with corresponding funding_created messages.
+pub fn create_batch_channel_funding<'a, 'b, 'c>(
+       funding_node: &Node<'a, 'b, 'c>,
+       params: &[(&Node<'a, 'b, 'c>, u64, u64, u128, Option<UserConfig>)],
+) -> (Transaction, Vec<msgs::FundingCreated>) {
+       let mut tx_outs = Vec::new();
+       let mut temp_chan_ids = Vec::new();
+       let mut funding_created_msgs = Vec::new();
+
+       for (other_node, channel_value_satoshis, push_msat, user_channel_id, override_config) in params {
+               // Initialize channel opening.
+               let temp_chan_id = funding_node.node.create_channel(
+                       other_node.node.get_our_node_id(), *channel_value_satoshis, *push_msat, *user_channel_id,
+                       *override_config,
+               ).unwrap();
+               let open_channel_msg = get_event_msg!(funding_node, MessageSendEvent::SendOpenChannel, other_node.node.get_our_node_id());
+               other_node.node.handle_open_channel(&funding_node.node.get_our_node_id(), &open_channel_msg);
+               let accept_channel_msg = get_event_msg!(other_node, MessageSendEvent::SendAcceptChannel, funding_node.node.get_our_node_id());
+               funding_node.node.handle_accept_channel(&other_node.node.get_our_node_id(), &accept_channel_msg);
+
+               // Create the corresponding funding output.
+               let events = funding_node.node.get_and_clear_pending_events();
+               assert_eq!(events.len(), 1);
+               match events[0] {
+                       Event::FundingGenerationReady {
+                               ref temporary_channel_id,
+                               ref counterparty_node_id,
+                               channel_value_satoshis: ref event_channel_value_satoshis,
+                               ref output_script,
+                               user_channel_id: ref event_user_channel_id
+                       } => {
+                               assert_eq!(temporary_channel_id, &temp_chan_id);
+                               assert_eq!(counterparty_node_id, &other_node.node.get_our_node_id());
+                               assert_eq!(channel_value_satoshis, event_channel_value_satoshis);
+                               assert_eq!(user_channel_id, event_user_channel_id);
+                               tx_outs.push(TxOut {
+                                       value: *channel_value_satoshis, script_pubkey: output_script.clone(),
+                               });
+                       },
+                       _ => panic!("Unexpected event"),
+               };
+               temp_chan_ids.push((temp_chan_id, other_node.node.get_our_node_id()));
+       }
+
+       // Compose the batch funding transaction and give it to the ChannelManager.
+       let tx = Transaction {
+               version: 2,
+               lock_time: PackedLockTime::ZERO,
+               input: Vec::new(),
+               output: tx_outs,
+       };
+       assert!(funding_node.node.batch_funding_transaction_generated(
+               temp_chan_ids.iter().map(|(a, b)| (a, b)).collect::<Vec<_>>().as_slice(),
+               tx.clone(),
+       ).is_ok());
+       check_added_monitors!(funding_node, 0);
+       let events = funding_node.node.get_and_clear_pending_msg_events();
+       assert_eq!(events.len(), params.len());
+       for (other_node, ..) in params {
+               let funding_created = events
+                       .iter()
+                       .find_map(|event| match event {
+                               MessageSendEvent::SendFundingCreated { node_id, msg } if node_id == &other_node.node.get_our_node_id() => Some(msg.clone()),
+                               _ => None,
+                       })
+                       .unwrap();
+               funding_created_msgs.push(funding_created);
+       }
+       return (tx, funding_created_msgs);
+}
index 1066362a4c4c101791a0c12a051e36c7986c2741..19ba81235f2df3bb8bad508ab39787aebb95b053 100644 (file)
@@ -15,7 +15,7 @@ use crate::chain;
 use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen, Watch};
 use crate::chain::chaininterface::LowerBoundedFeeEstimator;
 use crate::chain::channelmonitor;
-use crate::chain::channelmonitor::{CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
+use crate::chain::channelmonitor::{CLOSED_CHANNEL_UPDATE_ID, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
 use crate::chain::transaction::OutPoint;
 use crate::sign::{EcdsaChannelSigner, EntropySource, SignerProvider};
 use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, PaymentPurpose, ClosureReason, HTLCDestination, PaymentFailureReason};
@@ -3721,7 +3721,7 @@ fn test_peer_disconnected_before_funding_broadcasted() {
        nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
        nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
 
-       check_closed_event!(&nodes[0], 1, ClosureReason::DisconnectedPeer, false
+       check_closed_event!(&nodes[0], 2, ClosureReason::DisconnectedPeer, true
                , [nodes[1].node.get_our_node_id()], 1000000);
        check_closed_event!(&nodes[1], 1, ClosureReason::DisconnectedPeer, false
                , [nodes[0].node.get_our_node_id()], 1000000);
@@ -9038,7 +9038,7 @@ fn test_duplicate_chan_id() {
                match a_peer_state.channel_by_id.remove(&open_chan_2_msg.temporary_channel_id).unwrap() {
                        ChannelPhase::UnfundedOutboundV1(chan) => {
                                let logger = test_utils::TestLogger::new();
-                               chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap()
+                               chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap()
                        },
                        _ => panic!("Unexpected ChannelPhase variant"),
                }
@@ -9900,9 +9900,46 @@ fn test_non_final_funding_tx() {
                },
                _ => panic!()
        }
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 1);
+       match events[0] {
+               Event::ChannelClosed { channel_id, .. } => {
+                       assert_eq!(channel_id, temp_channel_id);
+               },
+               _ => panic!("Unexpected event"),
+       }
+}
+
+#[test]
+fn test_non_final_funding_tx_within_headroom() {
+       let chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+       let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+       let temp_channel_id = nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100_000, 0, 42, None).unwrap();
+       let open_channel_message = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
+       nodes[1].node.handle_open_channel(&nodes[0].node.get_our_node_id(), &open_channel_message);
+       let accept_channel_message = get_event_msg!(nodes[1], MessageSendEvent::SendAcceptChannel, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), &accept_channel_message);
+
+       let best_height = nodes[0].node.best_block.read().unwrap().height();
+
+       let chan_id = *nodes[0].network_chan_count.borrow();
+       let events = nodes[0].node.get_and_clear_pending_events();
+       let input = TxIn { previous_output: BitcoinOutPoint::null(), script_sig: bitcoin::Script::new(), sequence: Sequence(1), witness: Witness::from_vec(vec!(vec!(1))) };
+       assert_eq!(events.len(), 1);
+       let mut tx = match events[0] {
+               Event::FundingGenerationReady { ref channel_value_satoshis, ref output_script, .. } => {
+                       // Timelock the transaction within a +1 headroom from the best block.
+                       Transaction { version: chan_id as i32, lock_time: PackedLockTime(best_height + 1), input: vec![input], output: vec![TxOut {
+                               value: *channel_value_satoshis, script_pubkey: output_script.clone(),
+                       }]}
+               },
+               _ => panic!("Unexpected event"),
+       };
 
-       // However, transaction should be accepted if it's in a +1 headroom from best block.
-       tx.lock_time = PackedLockTime(tx.lock_time.0 - 1);
+       // Transaction should be accepted if it's in a +1 headroom from best block.
        assert!(nodes[0].node.funding_transaction_generated(&temp_channel_id, &nodes[1].node.get_our_node_id(), tx.clone()).is_ok());
        get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id());
 }
@@ -10361,3 +10398,230 @@ fn test_multi_post_event_actions() {
        do_test_multi_post_event_actions(true);
        do_test_multi_post_event_actions(false);
 }
+
+#[test]
+fn test_batch_channel_open() {
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+       // Initiate channel opening and create the batch channel funding transaction.
+       let (tx, funding_created_msgs) = create_batch_channel_funding(&nodes[0], &[
+               (&nodes[1], 100_000, 0, 42, None),
+               (&nodes[2], 200_000, 0, 43, None),
+       ]);
+
+       // Go through the funding_created and funding_signed flow with node 1.
+       nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[0]);
+       check_added_monitors(&nodes[1], 1);
+       expect_channel_pending_event(&nodes[1], &nodes[0].node.get_our_node_id());
+
+       let funding_signed_msg = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // The transaction should not have been broadcast before all channels are ready.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
+
+       // Go through the funding_created and funding_signed flow with node 2.
+       nodes[2].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[1]);
+       check_added_monitors(&nodes[2], 1);
+       expect_channel_pending_event(&nodes[2], &nodes[0].node.get_our_node_id());
+
+       let funding_signed_msg = get_event_msg!(nodes[2], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
+       nodes[0].node.handle_funding_signed(&nodes[2].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // The transaction should not have been broadcast before persisting all monitors has been
+       // completed.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);
+       assert_eq!(nodes[0].node.get_and_clear_pending_events().len(), 0);
+
+       // Complete the persistence of the monitor.
+       nodes[0].chain_monitor.complete_sole_pending_chan_update(
+               &OutPoint { txid: tx.txid(), index: 1 }.to_channel_id()
+       );
+       let events = nodes[0].node.get_and_clear_pending_events();
+
+       // The transaction should only have been broadcast now.
+       let broadcasted_txs = nodes[0].tx_broadcaster.txn_broadcast();
+       assert_eq!(broadcasted_txs.len(), 1);
+       assert_eq!(broadcasted_txs[0], tx);
+
+       assert_eq!(events.len(), 2);
+       assert!(events.iter().any(|e| matches!(
+               *e,
+               crate::events::Event::ChannelPending {
+                       ref counterparty_node_id,
+                       ..
+               } if counterparty_node_id == &nodes[1].node.get_our_node_id(),
+       )));
+       assert!(events.iter().any(|e| matches!(
+               *e,
+               crate::events::Event::ChannelPending {
+                       ref counterparty_node_id,
+                       ..
+               } if counterparty_node_id == &nodes[2].node.get_our_node_id(),
+       )));
+}
+
+#[test]
+fn test_disconnect_in_funding_batch() {
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+       // Initiate channel opening and create the batch channel funding transaction.
+       let (tx, funding_created_msgs) = create_batch_channel_funding(&nodes[0], &[
+               (&nodes[1], 100_000, 0, 42, None),
+               (&nodes[2], 200_000, 0, 43, None),
+       ]);
+
+       // Go through the funding_created and funding_signed flow with node 1.
+       nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[0]);
+       check_added_monitors(&nodes[1], 1);
+       expect_channel_pending_event(&nodes[1], &nodes[0].node.get_our_node_id());
+
+       let funding_signed_msg = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // The transaction should not have been broadcast before all channels are ready.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);
+
+       // The remaining peer in the batch disconnects.
+       nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
+
+       // The channels in the batch will close immediately.
+       let channel_id_1 = OutPoint { txid: tx.txid(), index: 0 }.to_channel_id();
+       let channel_id_2 = OutPoint { txid: tx.txid(), index: 1 }.to_channel_id();
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 4);
+       assert!(events.iter().any(|e| matches!(
+               e,
+               Event::ChannelClosed {
+                       channel_id,
+                       ..
+               } if channel_id == &channel_id_1
+       )));
+       assert!(events.iter().any(|e| matches!(
+               e,
+               Event::ChannelClosed {
+                       channel_id,
+                       ..
+               } if channel_id == &channel_id_2
+       )));
+       assert_eq!(events.iter().filter(|e| matches!(
+               e,
+               Event::DiscardFunding { .. },
+       )).count(), 2);
+
+       // The monitor should become closed.
+       check_added_monitors(&nodes[0], 1);
+       {
+               let mut monitor_updates = nodes[0].chain_monitor.monitor_updates.lock().unwrap();
+               let monitor_updates_1 = monitor_updates.get(&channel_id_1).unwrap();
+               assert_eq!(monitor_updates_1.len(), 1);
+               assert_eq!(monitor_updates_1[0].update_id, CLOSED_CHANNEL_UPDATE_ID);
+       }
+
+       // The funding transaction should not have been broadcast, and therefore, we don't need
+       // to broadcast a force-close transaction for the closed monitor.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);
+
+       // Ensure the channels don't exist anymore.
+       assert!(nodes[0].node.list_channels().is_empty());
+}
+
+#[test]
+fn test_batch_funding_close_after_funding_signed() {
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+       // Initiate channel opening and create the batch channel funding transaction.
+       let (tx, funding_created_msgs) = create_batch_channel_funding(&nodes[0], &[
+               (&nodes[1], 100_000, 0, 42, None),
+               (&nodes[2], 200_000, 0, 43, None),
+       ]);
+
+       // Go through the funding_created and funding_signed flow with node 1.
+       nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[0]);
+       check_added_monitors(&nodes[1], 1);
+       expect_channel_pending_event(&nodes[1], &nodes[0].node.get_our_node_id());
+
+       let funding_signed_msg = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // Go through the funding_created and funding_signed flow with node 2.
+       nodes[2].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[1]);
+       check_added_monitors(&nodes[2], 1);
+       expect_channel_pending_event(&nodes[2], &nodes[0].node.get_our_node_id());
+
+       let funding_signed_msg = get_event_msg!(nodes[2], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
+       nodes[0].node.handle_funding_signed(&nodes[2].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // The transaction should not have been broadcast before all channels are ready.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);
+
+       // Force-close the channel for which we've completed the initial monitor.
+       let channel_id_1 = OutPoint { txid: tx.txid(), index: 0 }.to_channel_id();
+       let channel_id_2 = OutPoint { txid: tx.txid(), index: 1 }.to_channel_id();
+       nodes[0].node.force_close_broadcasting_latest_txn(&channel_id_1, &nodes[1].node.get_our_node_id()).unwrap();
+       check_added_monitors(&nodes[0], 2);
+       {
+               let mut monitor_updates = nodes[0].chain_monitor.monitor_updates.lock().unwrap();
+               let monitor_updates_1 = monitor_updates.get(&channel_id_1).unwrap();
+               assert_eq!(monitor_updates_1.len(), 1);
+               assert_eq!(monitor_updates_1[0].update_id, CLOSED_CHANNEL_UPDATE_ID);
+               let monitor_updates_2 = monitor_updates.get(&channel_id_2).unwrap();
+               assert_eq!(monitor_updates_2.len(), 1);
+               assert_eq!(monitor_updates_2[0].update_id, CLOSED_CHANNEL_UPDATE_ID);
+       }
+       let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
+       match msg_events[0] {
+               MessageSendEvent::HandleError { .. } => (),
+               _ => panic!("Unexpected message."),
+       }
+
+       // We broadcast the commitment transaction as part of the force-close.
+       {
+               let broadcasted_txs = nodes[0].tx_broadcaster.txn_broadcast();
+               assert_eq!(broadcasted_txs.len(), 1);
+               assert!(broadcasted_txs[0].txid() != tx.txid());
+               assert_eq!(broadcasted_txs[0].input.len(), 1);
+               assert_eq!(broadcasted_txs[0].input[0].previous_output.txid, tx.txid());
+       }
+
+       // All channels in the batch should close immediately.
+       let events = nodes[0].node.get_and_clear_pending_events();
+       assert_eq!(events.len(), 4);
+       assert!(events.iter().any(|e| matches!(
+               e,
+               Event::ChannelClosed {
+                       channel_id,
+                       ..
+               } if channel_id == &channel_id_1
+       )));
+       assert!(events.iter().any(|e| matches!(
+               e,
+               Event::ChannelClosed {
+                       channel_id,
+                       ..
+               } if channel_id == &channel_id_2
+       )));
+       assert_eq!(events.iter().filter(|e| matches!(
+               e,
+               Event::DiscardFunding { .. },
+       )).count(), 2);
+
+       // Ensure the channels don't exist anymore.
+       assert!(nodes[0].node.list_channels().is_empty());
+}
index cdf5627a7aac6013371cd2b55e3cf14224a8423a..025a197348ad18c616ffc0ea114ecaa6c3f116f5 100644 (file)
@@ -2084,11 +2084,6 @@ mod tests {
                let outbound_payments = OutboundPayments::new();
                let payment_id = PaymentId([0; 32]);
 
-               assert!(
-                       outbound_payments.add_new_awaiting_invoice(payment_id, Retry::Attempts(0), None).is_ok()
-               );
-               assert!(outbound_payments.has_pending_payments());
-
                let invoice = OfferBuilder::new("foo".into(), recipient_pubkey())
                        .amount_msats(1000)
                        .build().unwrap()
@@ -2099,6 +2094,12 @@ mod tests {
                        .build().unwrap()
                        .sign(recipient_sign).unwrap();
 
+               assert!(outbound_payments.add_new_awaiting_invoice(
+                               payment_id, Retry::Attempts(0), Some(invoice.amount_msats() / 100 + 50_000))
+                       .is_ok()
+               );
+               assert!(outbound_payments.has_pending_payments());
+
                router.expect_find_route(
                        RouteParameters::from_payment_params_and_value(
                                PaymentParameters::from_bolt12_invoice(&invoice),
@@ -2139,11 +2140,6 @@ mod tests {
                let outbound_payments = OutboundPayments::new();
                let payment_id = PaymentId([0; 32]);
 
-               assert!(
-                       outbound_payments.add_new_awaiting_invoice(payment_id, Retry::Attempts(0), None).is_ok()
-               );
-               assert!(outbound_payments.has_pending_payments());
-
                let invoice = OfferBuilder::new("foo".into(), recipient_pubkey())
                        .amount_msats(1000)
                        .build().unwrap()
@@ -2154,6 +2150,12 @@ mod tests {
                        .build().unwrap()
                        .sign(recipient_sign).unwrap();
 
+               assert!(outbound_payments.add_new_awaiting_invoice(
+                               payment_id, Retry::Attempts(0), Some(invoice.amount_msats() / 100 + 50_000))
+                       .is_ok()
+               );
+               assert!(outbound_payments.has_pending_payments());
+
                let route_params = RouteParameters::from_payment_params_and_value(
                        PaymentParameters::from_bolt12_invoice(&invoice),
                        invoice.amount_msats(),
index 21c4881ab1a08295cf7989ea99d3a47561f9656f..22d85ecc76c3af0a5bbb8d5df0a74fb30db6fe0e 100644 (file)
@@ -2421,7 +2421,8 @@ fn auto_retry_partial_failure() {
        let payment_params = PaymentParameters::from_node_id(nodes[1].node.get_our_node_id(), TEST_FINAL_CLTV)
                .with_expiry_time(payment_expiry_secs as u64)
                .with_bolt11_features(invoice_features).unwrap();
-       let route_params = RouteParameters::from_payment_params_and_value(payment_params, amt_msat);
+       let mut route_params = RouteParameters::from_payment_params_and_value(payment_params, amt_msat);
+       route_params.max_total_routing_fee_msat = None;
 
        // Configure the initial send, retry1 and retry2's paths.
        let send_route = Route {
@@ -2487,14 +2488,16 @@ fn auto_retry_partial_failure() {
        nodes[0].router.expect_find_route(route_params.clone(), Ok(send_route));
        let mut payment_params = route_params.payment_params.clone();
        payment_params.previously_failed_channels.push(chan_2_id);
-       nodes[0].router.expect_find_route(
-               RouteParameters::from_payment_params_and_value(payment_params, amt_msat / 2),
-               Ok(retry_1_route));
+
+       let mut retry_1_params = RouteParameters::from_payment_params_and_value(payment_params, amt_msat / 2);
+       retry_1_params.max_total_routing_fee_msat = None;
+       nodes[0].router.expect_find_route(retry_1_params, Ok(retry_1_route));
+
        let mut payment_params = route_params.payment_params.clone();
        payment_params.previously_failed_channels.push(chan_3_id);
-       nodes[0].router.expect_find_route(
-               RouteParameters::from_payment_params_and_value(payment_params, amt_msat / 4),
-               Ok(retry_2_route));
+       let mut retry_2_params = RouteParameters::from_payment_params_and_value(payment_params, amt_msat / 4);
+       retry_2_params.max_total_routing_fee_msat = None;
+       nodes[0].router.expect_find_route(retry_2_params, Ok(retry_2_route));
 
        // Send a payment that will partially fail on send, then partially fail on retry, then succeed.
        nodes[0].node.send_payment(payment_hash, RecipientOnionFields::secret_only(payment_secret),
@@ -2718,8 +2721,9 @@ fn retry_multi_path_single_failed_payment() {
        let payment_params = PaymentParameters::from_node_id(nodes[1].node.get_our_node_id(), TEST_FINAL_CLTV)
                .with_expiry_time(payment_expiry_secs as u64)
                .with_bolt11_features(invoice_features).unwrap();
-       let route_params = RouteParameters::from_payment_params_and_value(
+       let mut route_params = RouteParameters::from_payment_params_and_value(
                payment_params.clone(), amt_msat);
+       route_params.max_total_routing_fee_msat = None;
 
        let chans = nodes[0].node.list_usable_channels();
        let mut route = Route {
@@ -2751,11 +2755,12 @@ fn retry_multi_path_single_failed_payment() {
        route.paths[1].hops[0].fee_msat = 50_000_000;
        let mut pay_params = route.route_params.clone().unwrap().payment_params;
        pay_params.previously_failed_channels.push(chans[1].short_channel_id.unwrap());
-       nodes[0].router.expect_find_route(
-               // Note that the second request here requests the amount we originally failed to send,
-               // not the amount remaining on the full payment, which should be changed.
-               RouteParameters::from_payment_params_and_value(pay_params, 100_000_001),
-               Ok(route.clone()));
+
+       // Note that the second request here requests the amount we originally failed to send,
+       // not the amount remaining on the full payment, which should be changed.
+       let mut retry_params = RouteParameters::from_payment_params_and_value(pay_params, 100_000_001);
+       retry_params.max_total_routing_fee_msat = None;
+       nodes[0].router.expect_find_route(retry_params, Ok(route.clone()));
 
        {
                let scorer = chanmon_cfgs[0].scorer.read().unwrap();
@@ -2898,7 +2903,8 @@ fn no_extra_retries_on_back_to_back_fail() {
        let payment_params = PaymentParameters::from_node_id(nodes[1].node.get_our_node_id(), TEST_FINAL_CLTV)
                .with_expiry_time(payment_expiry_secs as u64)
                .with_bolt11_features(invoice_features).unwrap();
-       let route_params = RouteParameters::from_payment_params_and_value(payment_params, amt_msat);
+       let mut route_params = RouteParameters::from_payment_params_and_value(payment_params, amt_msat);
+       route_params.max_total_routing_fee_msat = None;
 
        let mut route = Route {
                paths: vec![
@@ -2941,15 +2947,16 @@ fn no_extra_retries_on_back_to_back_fail() {
                        PaymentParameters::from_node_id(nodes[2].node.get_our_node_id(), TEST_FINAL_CLTV),
                        100_000_000)),
        };
+       route.route_params.as_mut().unwrap().max_total_routing_fee_msat = None;
        nodes[0].router.expect_find_route(route_params.clone(), Ok(route.clone()));
        let mut second_payment_params = route_params.payment_params.clone();
        second_payment_params.previously_failed_channels = vec![chan_2_scid, chan_2_scid];
        // On retry, we'll only return one path
        route.paths.remove(1);
        route.paths[0].hops[1].fee_msat = amt_msat;
-       nodes[0].router.expect_find_route(
-               RouteParameters::from_payment_params_and_value(second_payment_params, amt_msat),
-               Ok(route.clone()));
+       let mut retry_params = RouteParameters::from_payment_params_and_value(second_payment_params, amt_msat);
+       retry_params.max_total_routing_fee_msat = None;
+       nodes[0].router.expect_find_route(retry_params, Ok(route.clone()));
 
        nodes[0].node.send_payment(payment_hash, RecipientOnionFields::secret_only(payment_secret),
                PaymentId(payment_hash.0), route_params, Retry::Attempts(1)).unwrap();
@@ -3102,7 +3109,8 @@ fn test_simple_partial_retry() {
        let payment_params = PaymentParameters::from_node_id(nodes[1].node.get_our_node_id(), TEST_FINAL_CLTV)
                .with_expiry_time(payment_expiry_secs as u64)
                .with_bolt11_features(invoice_features).unwrap();
-       let route_params = RouteParameters::from_payment_params_and_value(payment_params, amt_msat);
+       let mut route_params = RouteParameters::from_payment_params_and_value(payment_params, amt_msat);
+       route_params.max_total_routing_fee_msat = None;
 
        let mut route = Route {
                paths: vec![
@@ -3145,14 +3153,15 @@ fn test_simple_partial_retry() {
                        PaymentParameters::from_node_id(nodes[2].node.get_our_node_id(), TEST_FINAL_CLTV),
                        100_000_000)),
        };
+       route.route_params.as_mut().unwrap().max_total_routing_fee_msat = None;
        nodes[0].router.expect_find_route(route_params.clone(), Ok(route.clone()));
        let mut second_payment_params = route_params.payment_params.clone();
        second_payment_params.previously_failed_channels = vec![chan_2_scid];
        // On retry, we'll only be asked for one path (or 100k sats)
        route.paths.remove(0);
-       nodes[0].router.expect_find_route(
-               RouteParameters::from_payment_params_and_value(second_payment_params, amt_msat / 2),
-               Ok(route.clone()));
+       let mut retry_params = RouteParameters::from_payment_params_and_value(second_payment_params, amt_msat / 2);
+       retry_params.max_total_routing_fee_msat = None;
+       nodes[0].router.expect_find_route(retry_params, Ok(route.clone()));
 
        nodes[0].node.send_payment(payment_hash, RecipientOnionFields::secret_only(payment_secret),
                PaymentId(payment_hash.0), route_params, Retry::Attempts(1)).unwrap();
index fa08fba99fabda640739f714dd9eb6818d2ab374..151861411b997002de4792d5c031ed8079faee2e 100644 (file)
@@ -11,7 +11,7 @@
 
 use crate::chain::{ChannelMonitorUpdateStatus, Watch};
 use crate::chain::chaininterface::LowerBoundedFeeEstimator;
-use crate::chain::channelmonitor::ChannelMonitor;
+use crate::chain::channelmonitor::{CLOSED_CHANNEL_UPDATE_ID, ChannelMonitor};
 use crate::sign::EntropySource;
 use crate::chain::transaction::OutPoint;
 use crate::events::{ClosureReason, Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider};
@@ -25,6 +25,7 @@ use crate::util::ser::{Writeable, ReadableArgs};
 use crate::util::config::UserConfig;
 use crate::util::string::UntrustedString;
 
+use bitcoin::{PackedLockTime, Transaction, TxOut};
 use bitcoin::hash_types::BlockHash;
 
 use crate::prelude::*;
@@ -1114,3 +1115,65 @@ fn removed_payment_no_manager_persistence() {
 
        expect_payment_failed!(nodes[0], payment_hash, false);
 }
+
+#[test]
+fn test_reload_partial_funding_batch() {
+       let chanmon_cfgs = create_chanmon_cfgs(3);
+       let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+       let new_persister;
+       let new_chain_monitor;
+
+       let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+       let new_channel_manager;
+       let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+       // Initiate channel opening and create the batch channel funding transaction.
+       let (tx, funding_created_msgs) = create_batch_channel_funding(&nodes[0], &[
+               (&nodes[1], 100_000, 0, 42, None),
+               (&nodes[2], 200_000, 0, 43, None),
+       ]);
+
+       // Go through the funding_created and funding_signed flow with node 1.
+       nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msgs[0]);
+       check_added_monitors(&nodes[1], 1);
+       expect_channel_pending_event(&nodes[1], &nodes[0].node.get_our_node_id());
+
+       // The monitor is persisted when receiving funding_signed.
+       let funding_signed_msg = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
+       nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed_msg);
+       check_added_monitors(&nodes[0], 1);
+
+       // The transaction should not have been broadcast before all channels are ready.
+       assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);
+
+       // Reload the node while a subset of the channels in the funding batch have persisted monitors.
+       let channel_id_1 = OutPoint { txid: tx.txid(), index: 0 }.to_channel_id();
+       let node_encoded = nodes[0].node.encode();
+       let channel_monitor_1_serialized = get_monitor!(nodes[0], channel_id_1).encode();
+       reload_node!(nodes[0], node_encoded, &[&channel_monitor_1_serialized], new_persister, new_chain_monitor, new_channel_manager);
+
+       // Process monitor events.
+       assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+
+       // The monitor should become closed.
+       check_added_monitors(&nodes[0], 1);
+       {
+               let mut monitor_updates = nodes[0].chain_monitor.monitor_updates.lock().unwrap();
+               let monitor_updates_1 = monitor_updates.get(&channel_id_1).unwrap();
+               assert_eq!(monitor_updates_1.len(), 1);
+               assert_eq!(monitor_updates_1[0].update_id, CLOSED_CHANNEL_UPDATE_ID);
+       }
+
+       // The funding transaction should not have been broadcast, but we broadcast the force-close
+       // transaction as part of closing the monitor.
+       {
+               let broadcasted_txs = nodes[0].tx_broadcaster.txn_broadcast();
+               assert_eq!(broadcasted_txs.len(), 1);
+               assert!(broadcasted_txs[0].txid() != tx.txid());
+               assert_eq!(broadcasted_txs[0].input.len(), 1);
+               assert_eq!(broadcasted_txs[0].input[0].previous_output.txid, tx.txid());
+       }
+
+       // Ensure the channels don't exist anymore.
+       assert!(nodes[0].node.list_channels().is_empty());
+}
index c20ce2e97ee835ca0cf2c77b8c7526c08dba5855..92e5dadf979e391371d8dd05d4a5258c73238184 100644 (file)
@@ -341,7 +341,7 @@ impl Path {
 
 /// A route directs a payment from the sender (us) to the recipient. If the recipient supports MPP,
 /// it can take multiple paths. Each path is composed of one or more hops through the network.
-#[derive(Clone, Hash, PartialEq, Eq)]
+#[derive(Clone, Debug, Hash, PartialEq, Eq)]
 pub struct Route {
        /// The list of [`Path`]s taken for a single (potentially-)multi-part payment. If no
        /// [`BlindedTail`]s are present, then the pubkey of the last [`RouteHop`] in each path must be
@@ -380,6 +380,12 @@ impl Route {
        }
 }
 
+impl fmt::Display for Route {
+       fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
+               log_route!(self).fmt(f)
+       }
+}
+
 const SERIALIZATION_VERSION: u8 = 1;
 const MIN_SERIALIZATION_VERSION: u8 = 1;
 
@@ -475,14 +481,16 @@ pub struct RouteParameters {
        /// This limit also applies to the total fees that may arise while retrying failed payment
        /// paths.
        ///
-       /// Default value: `None`
+       /// Note that values below a few sats may result in some paths being spuriously ignored.
        pub max_total_routing_fee_msat: Option<u64>,
 }
 
 impl RouteParameters {
        /// Constructs [`RouteParameters`] from the given [`PaymentParameters`] and a payment amount.
+       ///
+       /// [`Self::max_total_routing_fee_msat`] defaults to 1% of the payment amount + 50 sats
        pub fn from_payment_params_and_value(payment_params: PaymentParameters, final_value_msat: u64) -> Self {
-               Self { payment_params, final_value_msat, max_total_routing_fee_msat: None }
+               Self { payment_params, final_value_msat, max_total_routing_fee_msat: Some(final_value_msat / 100 + 50_000) }
        }
 }
 
@@ -3092,8 +3100,9 @@ mod tests {
                        excess_data: Vec::new()
                });
 
-               let route_params = RouteParameters::from_payment_params_and_value(
+               let mut route_params = RouteParameters::from_payment_params_and_value(
                        payment_params.clone(), 60_000);
+               route_params.max_total_routing_fee_msat = Some(15_000);
                let route = get_route(&our_id, &route_params, &network_graph.read_only(), None,
                        Arc::clone(&logger), &scorer, &Default::default(), &random_seed_bytes).unwrap();
                // Overpay fees to hit htlc_minimum_msat.
@@ -5751,8 +5760,9 @@ mod tests {
                {
                        // Now, attempt to route 90 sats, which is exactly 90 sats at the last hop, plus the
                        // 200% fee charged channel 13 in the 1-to-2 direction.
-                       let route_params = RouteParameters::from_payment_params_and_value(
+                       let mut route_params = RouteParameters::from_payment_params_and_value(
                                payment_params, 90_000);
+                       route_params.max_total_routing_fee_msat = Some(90_000*2);
                        let route = get_route(&our_id, &route_params, &network_graph.read_only(), None,
                                Arc::clone(&logger), &scorer, &Default::default(), &random_seed_bytes).unwrap();
                        assert_eq!(route.paths.len(), 1);
@@ -5820,8 +5830,9 @@ mod tests {
                        // Now, attempt to route 90 sats, hitting the htlc_minimum on channel 4, but
                        // overshooting the htlc_maximum on channel 2. Thus, we should pick the (absurdly
                        // expensive) channels 12-13 path.
-                       let route_params = RouteParameters::from_payment_params_and_value(
+                       let mut route_params = RouteParameters::from_payment_params_and_value(
                                payment_params, 90_000);
+                       route_params.max_total_routing_fee_msat = Some(90_000*2);
                        let route = get_route(&our_id, &route_params, &network_graph.read_only(), None,
                                Arc::clone(&logger), &scorer, &Default::default(), &random_seed_bytes).unwrap();
                        assert_eq!(route.paths.len(), 1);
@@ -6526,8 +6537,9 @@ mod tests {
 
                // Make sure we'll error if our route hints don't have enough liquidity according to their
                // htlc_maximum_msat.
-               let route_params = RouteParameters::from_payment_params_and_value(
+               let mut route_params = RouteParameters::from_payment_params_and_value(
                        payment_params, max_htlc_msat + 1);
+               route_params.max_total_routing_fee_msat = None;
                if let Err(LightningError{err, action: ErrorAction::IgnoreError}) = get_route(&our_id,
                        &route_params, &netgraph, None, Arc::clone(&logger), &scorer, &Default::default(),
                        &random_seed_bytes)
@@ -6541,8 +6553,9 @@ mod tests {
                let payment_params = PaymentParameters::from_node_id(dest_node_id, 42)
                        .with_route_hints(vec![route_hint_1, route_hint_2]).unwrap()
                        .with_bolt11_features(channelmanager::provided_invoice_features(&config)).unwrap();
-               let route_params = RouteParameters::from_payment_params_and_value(
+               let mut route_params = RouteParameters::from_payment_params_and_value(
                        payment_params, max_htlc_msat + 1);
+               route_params.max_total_routing_fee_msat = Some(max_htlc_msat * 2);
                let route = get_route(&our_id, &route_params, &netgraph, None, Arc::clone(&logger),
                        &scorer, &Default::default(), &random_seed_bytes).unwrap();
                assert_eq!(route.paths.len(), 2);
@@ -6977,7 +6990,8 @@ mod tests {
                let payment_params = PaymentParameters::blinded(blinded_hints.clone())
                        .with_bolt12_features(bolt12_features.clone()).unwrap();
 
-               let route_params = RouteParameters::from_payment_params_and_value(payment_params, 100_000);
+               let mut route_params = RouteParameters::from_payment_params_and_value(payment_params, 100_000);
+               route_params.max_total_routing_fee_msat = Some(100_000);
                let route = get_route(&our_id, &route_params, &network_graph, None, Arc::clone(&logger),
                        &scorer, &Default::default(), &random_seed_bytes).unwrap();
                assert_eq!(route.paths.len(), 2);
index f25c947644838f1dfaec843c74c26c8b4f935f72..5b42796a94f9586fe2dd42fd7210fada4be7297f 100644 (file)
@@ -68,7 +68,7 @@ pub struct KeyMaterial(pub [u8; 32]);
 /// Information about a spendable output to a P2WSH script.
 ///
 /// See [`SpendableOutputDescriptor::DelayedPaymentOutput`] for more details on how to spend this.
-#[derive(Clone, Debug, PartialEq, Eq)]
+#[derive(Clone, Debug, Hash, PartialEq, Eq)]
 pub struct DelayedPaymentOutputDescriptor {
        /// The outpoint which is spendable.
        pub outpoint: OutPoint,
@@ -110,7 +110,7 @@ impl_writeable_tlv_based!(DelayedPaymentOutputDescriptor, {
 /// Information about a spendable output to our "payment key".
 ///
 /// See [`SpendableOutputDescriptor::StaticPaymentOutput`] for more details on how to spend this.
-#[derive(Clone, Debug, PartialEq, Eq)]
+#[derive(Clone, Debug, Hash, PartialEq, Eq)]
 pub struct StaticPaymentOutputDescriptor {
        /// The outpoint which is spendable.
        pub outpoint: OutPoint,
@@ -146,7 +146,7 @@ impl_writeable_tlv_based!(StaticPaymentOutputDescriptor, {
 /// at that `txid`/`index`, and any keys or other information required to sign.
 ///
 /// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs
-#[derive(Clone, Debug, PartialEq, Eq)]
+#[derive(Clone, Debug, Hash, PartialEq, Eq)]
 pub enum SpendableOutputDescriptor {
        /// An output to a script which was provided via [`SignerProvider`] directly, either from
        /// [`get_destination_script`] or [`get_shutdown_scriptpubkey`], thus you should already
index 431c62c9fb83de88830319e6b87b2f7370ac7f90..dbe3ee8161ef4498904709a0c7ad5080d1bcb7ac 100644 (file)
@@ -8,25 +8,28 @@
 //! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`],
 //! and [`ChannelMonitor`] all in one place.
 
+use core::cmp;
+use core::convert::{TryFrom, TryInto};
 use core::ops::Deref;
 use bitcoin::hashes::hex::{FromHex, ToHex};
 use bitcoin::{BlockHash, Txid};
 
-use crate::io;
+use crate::{io, log_error};
+use crate::alloc::string::ToString;
 use crate::prelude::{Vec, String};
-use crate::routing::scoring::WriteableScore;
 
 use crate::chain;
 use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use crate::chain::chainmonitor::{Persist, MonitorUpdateId};
 use crate::sign::{EntropySource, NodeSigner, WriteableEcdsaChannelSigner, SignerProvider};
 use crate::chain::transaction::OutPoint;
-use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
+use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID};
 use crate::ln::channelmanager::ChannelManager;
 use crate::routing::router::Router;
 use crate::routing::gossip::NetworkGraph;
+use crate::routing::scoring::WriteableScore;
 use crate::util::logger::Logger;
-use crate::util::ser::{ReadableArgs, Writeable};
+use crate::util::ser::{Readable, ReadableArgs, Writeable};
 
 /// The alphabet of characters allowed for namespaces and keys.
 pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
@@ -45,6 +48,8 @@ pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
 pub const CHANNEL_MONITOR_PERSISTENCE_NAMESPACE: &str = "monitors";
 /// The sub-namespace under which [`ChannelMonitor`]s will be persisted.
 pub const CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE: &str = "";
+/// The namespace under which [`ChannelMonitorUpdate`]s will be persisted.
+pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE: &str = "monitor_updates";
 
 /// The namespace under which the [`NetworkGraph`] will be persisted.
 pub const NETWORK_GRAPH_PERSISTENCE_NAMESPACE: &str = "";
@@ -60,6 +65,12 @@ pub const SCORER_PERSISTENCE_SUB_NAMESPACE: &str = "";
 /// The key under which the [`WriteableScore`] will be persisted.
 pub const SCORER_PERSISTENCE_KEY: &str = "scorer";
 
+/// A sentinel value to be prepended to monitors persisted by the [`MonitorUpdatingPersister`].
+///
+/// This serves to prevent someone from accidentally loading such monitors (which may need
+/// updates applied to be current) with another implementation.
+pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2];
+
 /// Provides an interface that allows storage and retrieval of persisted values that are associated
 /// with given keys.
 ///
@@ -205,7 +216,7 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSign
 /// Read previously persisted [`ChannelMonitor`]s from the store.
 pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
        kv_store: K, entropy_source: ES, signer_provider: SP,
-) -> io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
+) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>, io::Error>
 where
        K::Target: KVStore,
        ES::Target: EntropySource + Sized,
@@ -249,10 +260,939 @@ where
                        Err(_) => {
                                return Err(io::Error::new(
                                        io::ErrorKind::InvalidData,
-                                       "Failed to deserialize ChannelMonitor"
+                                       "Failed to read ChannelMonitor"
                                ))
                        }
                }
        }
        Ok(res)
 }
+
+/// Implements [`Persist`] in a way that writes and reads both [`ChannelMonitor`]s and
+/// [`ChannelMonitorUpdate`]s.
+///
+/// # Overview
+///
+/// The main benefit this provides over the [`KVStore`]'s [`Persist`] implementation is decreased
+/// I/O bandwidth and storage churn, at the expense of more IOPS (including listing, reading, and
+/// deleting) and complexity. This is because it writes channel monitor differential updates,
+/// whereas the other (default) implementation rewrites the entire monitor on each update. For
+/// routing nodes, updates can happen many times per second to a channel, and monitors can be tens
+/// of megabytes (or more). Updates can be as small as a few hundred bytes.
+///
+/// Note that monitors written with `MonitorUpdatingPersister` are _not_ backward-compatible with
+/// the default [`KVStore`]'s [`Persist`] implementation. They have a prepended byte sequence,
+/// [`MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL`], applied to prevent deserialization with other
+/// persisters. This is because monitors written by this struct _may_ have unapplied updates. In
+/// order to downgrade, you must ensure that all updates are applied to the monitor, and remove the
+/// sentinel bytes.
+///
+/// # Storing monitors
+///
+/// Monitors are stored by implementing the [`Persist`] trait, which has two functions:
+///
+///   - [`Persist::persist_new_channel`], which persists whole [`ChannelMonitor`]s.
+///   - [`Persist::update_persisted_channel`], which persists only a [`ChannelMonitorUpdate`]
+///
+/// Whole [`ChannelMonitor`]s are stored in the [`CHANNEL_MONITOR_PERSISTENCE_NAMESPACE`], using the
+/// familiar encoding of an [`OutPoint`] (for example, `[SOME-64-CHAR-HEX-STRING]_1`).
+///
+/// Each [`ChannelMonitorUpdate`] is stored in a dynamic sub-namespace, as follows:
+///
+///   - namespace: [`CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE`]
+///   - sub-namespace: [the monitor's encoded outpoint name]
+///
+/// Under that sub-namespace, each update is stored with a number string, like `21`, which
+/// represents its `update_id` value.
+///
+/// For example, consider this channel, named for its transaction ID and index, or [`OutPoint`]:
+///
+///   - Transaction ID: `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef`
+///   - Index: `1`
+///
+/// Full channel monitors would be stored at a single key:
+///
+/// `[CHANNEL_MONITOR_PERSISTENCE_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1`
+///
+/// Updates would be stored as follows (with `/` delimiting namespace/sub-namespace/key):
+///
+/// ```text
+/// [CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1/1
+/// [CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1/2
+/// [CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE]/deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1/3
+/// ```
+/// ... and so on.
+///
+/// # Reading channel state from storage
+///
+/// Channel state can be reconstructed by calling
+/// [`MonitorUpdatingPersister::read_all_channel_monitors_with_updates`]. Alternatively, users can
+/// list channel monitors themselves and load channels individually using
+/// [`MonitorUpdatingPersister::read_channel_monitor_with_updates`].
+/// 
+/// ## EXTREMELY IMPORTANT
+/// 
+/// It is extremely important that your [`KVStore::read`] implementation uses the
+/// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in
+/// that circumstance (not when there is really a permissions error, for example). This is because
+/// neither channel monitor reading function lists updates. Instead, either reads the monitor, and
+/// using its stored `update_id`, synthesizes update storage keys, and tries them in sequence until
+/// one is not found. All _other_ errors will be bubbled up in the function's [`Result`].
+///
+/// # Pruning stale channel updates
+///
+/// Stale updates are pruned when a full monitor is written. The old monitor is first read, and if
+/// that succeeds, updates in the range between the old and new monitors are deleted. The `lazy`
+/// flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
+/// will complete. However, stale updates are not a problem for data integrity, since updates are
+/// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`.
+///
+/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
+/// would like to get rid of them, consider using the
+/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
+pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref>
+where
+       K::Target: KVStore,
+       L::Target: Logger,
+       ES::Target: EntropySource + Sized,
+       SP::Target: SignerProvider + Sized,
+{
+       kv_store: K,
+       logger: L,
+       maximum_pending_updates: u64,
+       entropy_source: ES,
+       signer_provider: SP,
+}
+
+#[allow(dead_code)]
+impl<K: Deref, L: Deref, ES: Deref, SP: Deref>
+       MonitorUpdatingPersister<K, L, ES, SP>
+where
+       K::Target: KVStore,
+       L::Target: Logger,
+       ES::Target: EntropySource + Sized,
+       SP::Target: SignerProvider + Sized,
+{
+       /// Constructs a new [`MonitorUpdatingPersister`].
+       ///
+       /// The `maximum_pending_updates` parameter controls how many updates may be stored before a
+       /// [`MonitorUpdatingPersister`] consolidates updates by writing a full monitor. Note that
+       /// consolidation will frequently occur with fewer updates than what you set here; this number
+       /// is merely the maximum that may be stored. When setting this value, consider that for higher
+       /// values of `maximum_pending_updates`:
+       /// 
+       ///   - [`MonitorUpdatingPersister`] will tend to write more [`ChannelMonitorUpdate`]s than
+       /// [`ChannelMonitor`]s, approaching one [`ChannelMonitor`] write for every
+       /// `maximum_pending_updates` [`ChannelMonitorUpdate`]s.
+       ///   - [`MonitorUpdatingPersister`] will issue deletes differently. Lazy deletes will come in
+       /// "waves" for each [`ChannelMonitor`] write. A larger `maximum_pending_updates` means bigger,
+       /// less frequent "waves."
+       ///   - [`MonitorUpdatingPersister`] will potentially have more listing to do if you need to run
+       /// [`MonitorUpdatingPersister::cleanup_stale_updates`].
+       pub fn new(
+               kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
+               signer_provider: SP,
+       ) -> Self
+       where
+               ES::Target: EntropySource + Sized,
+               SP::Target: SignerProvider + Sized,
+       {
+               MonitorUpdatingPersister {
+                       kv_store,
+                       logger,
+                       maximum_pending_updates,
+                       entropy_source,
+                       signer_provider,
+               }
+       }
+
+       /// Reads all stored channel monitors, along with any stored updates for them.
+       ///
+       /// It is extremely important that your [`KVStore::read`] implementation uses the
+       /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
+       /// documentation for [`MonitorUpdatingPersister`].
+       pub fn read_all_channel_monitors_with_updates<B: Deref, F: Deref + Clone>(
+               &self, broadcaster: B, fee_estimator: F,
+       ) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>, io::Error>
+       where
+               ES::Target: EntropySource + Sized,
+               SP::Target: SignerProvider + Sized,
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+       {
+               let monitor_list = self.kv_store.list(
+                       CHANNEL_MONITOR_PERSISTENCE_NAMESPACE,
+                       CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE,
+               )?;
+               let mut res = Vec::with_capacity(monitor_list.len());
+               for monitor_key in monitor_list {
+                       res.push(self.read_channel_monitor_with_updates(
+                               &broadcaster,
+                               fee_estimator.clone(),
+                               monitor_key,
+                       )?)
+               }
+               Ok(res)
+       }
+
+       /// Read a single channel monitor, along with any stored updates for it.
+       ///
+       /// It is extremely important that your [`KVStore::read`] implementation uses the
+       /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
+       /// documentation for [`MonitorUpdatingPersister`].
+       ///
+       /// For `monitor_key`, channel storage keys be the channel's transaction ID and index, or
+       /// [`OutPoint`], with an underscore `_` between them. For example, given:
+       ///
+       ///   - Transaction ID: `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef`
+       ///   - Index: `1`
+       ///
+       /// The correct `monitor_key` would be:
+       /// `deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1`
+       /// 
+       /// Loading a large number of monitors will be faster if done in parallel. You can use this
+       /// function to accomplish this. Take care to limit the number of parallel readers.
+       pub fn read_channel_monitor_with_updates<B: Deref, F: Deref + Clone>(
+               &self, broadcaster: &B, fee_estimator: F, monitor_key: String,
+       ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>), io::Error>
+       where
+               ES::Target: EntropySource + Sized,
+               SP::Target: SignerProvider + Sized,
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+       {
+               let monitor_name = MonitorName::new(monitor_key)?;
+               let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
+               let mut current_update_id = monitor.get_latest_update_id();
+               loop {
+                       current_update_id = match current_update_id.checked_add(1) {
+                               Some(next_update_id) => next_update_id,
+                               None => break,
+                       };
+                       let update_name = UpdateName::from(current_update_id);
+                       let update = match self.read_monitor_update(&monitor_name, &update_name) {
+                               Ok(update) => update,
+                               Err(err) if err.kind() == io::ErrorKind::NotFound => {
+                                       // We can't find any more updates, so we are done.
+                                       break;
+                               }
+                               Err(err) => return Err(err),
+                       };
+
+                       monitor.update_monitor(&update, broadcaster, fee_estimator.clone(), &self.logger)
+                               .map_err(|e| {
+                                       log_error!(
+                                               self.logger,
+                                               "Monitor update failed. monitor: {} update: {} reason: {:?}",
+                                               monitor_name.as_str(),
+                                               update_name.as_str(),
+                                               e
+                                       );
+                                       io::Error::new(io::ErrorKind::Other, "Monitor update failed")
+                               })?;
+               }
+               Ok((block_hash, monitor))
+       }
+
+       /// Read a channel monitor.
+       fn read_monitor(
+               &self, monitor_name: &MonitorName,
+       ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>), io::Error> {
+               let outpoint: OutPoint = monitor_name.try_into()?;
+               let mut monitor_cursor = io::Cursor::new(self.kv_store.read(
+                       CHANNEL_MONITOR_PERSISTENCE_NAMESPACE,
+                       CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE,
+                       monitor_name.as_str(),
+               )?);
+               // Discard the sentinel bytes if found.
+               if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) {
+                       monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64);
+               }
+               match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>::read(
+                       &mut monitor_cursor,
+                       (&*self.entropy_source, &*self.signer_provider),
+               ) {
+                       Ok((blockhash, channel_monitor)) => {
+                               if channel_monitor.get_funding_txo().0.txid != outpoint.txid
+                                       || channel_monitor.get_funding_txo().0.index != outpoint.index
+                               {
+                                       log_error!(
+                                               self.logger,
+                                               "ChannelMonitor {} was stored under the wrong key!",
+                                               monitor_name.as_str()
+                                       );
+                                       Err(io::Error::new(
+                                               io::ErrorKind::InvalidData,
+                                               "ChannelMonitor was stored under the wrong key",
+                                       ))
+                               } else {
+                                       Ok((blockhash, channel_monitor))
+                               }
+                       }
+                       Err(e) => {
+                               log_error!(
+                                       self.logger,
+                                       "Failed to read ChannelMonitor {}, reason: {}",
+                                       monitor_name.as_str(),
+                                       e,
+                               );
+                               Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor"))
+                       }
+               }
+       }
+
+       /// Read a channel monitor update.
+       fn read_monitor_update(
+               &self, monitor_name: &MonitorName, update_name: &UpdateName,
+       ) -> Result<ChannelMonitorUpdate, io::Error> {
+               let update_bytes = self.kv_store.read(
+                       CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE,
+                       monitor_name.as_str(),
+                       update_name.as_str(),
+               )?;
+               ChannelMonitorUpdate::read(&mut io::Cursor::new(update_bytes)).map_err(|e| {
+                       log_error!(
+                               self.logger,
+                               "Failed to read ChannelMonitorUpdate {}/{}/{}, reason: {}",
+                               CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE,
+                               monitor_name.as_str(),
+                               update_name.as_str(),
+                               e,
+                       );
+                       io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitorUpdate")
+               })
+       }
+
+       /// Cleans up stale updates for all monitors.
+       ///
+       /// This function works by first listing all monitors, and then for each of them, listing all
+       /// updates. The updates that have an `update_id` less than or equal to than the stored monitor
+       /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will
+       /// be passed to [`KVStore::remove`].
+       pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
+               let monitor_keys = self.kv_store.list(
+                       CHANNEL_MONITOR_PERSISTENCE_NAMESPACE,
+                       CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE,
+               )?;
+               for monitor_key in monitor_keys {
+                       let monitor_name = MonitorName::new(monitor_key)?;
+                       let (_, current_monitor) = self.read_monitor(&monitor_name)?;
+                       let updates = self
+                               .kv_store
+                               .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, monitor_name.as_str())?;
+                       for update in updates {
+                               let update_name = UpdateName::new(update)?;
+                               // if the update_id is lower than the stored monitor, delete
+                               if update_name.0 <= current_monitor.get_latest_update_id() {
+                                       self.kv_store.remove(
+                                               CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE,
+                                               monitor_name.as_str(),
+                                               update_name.as_str(),
+                                               lazy,
+                                       )?;
+                               }
+                       }
+               }
+               Ok(())
+       }
+}
+
+impl<ChannelSigner: WriteableEcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref> 
+       Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP>
+where
+       K::Target: KVStore,
+       L::Target: Logger,
+       ES::Target: EntropySource + Sized,
+       SP::Target: SignerProvider + Sized,
+{
+       /// Persists a new channel. This means writing the entire monitor to the
+       /// parametrized [`KVStore`].
+       fn persist_new_channel(
+               &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
+               _monitor_update_call_id: MonitorUpdateId,
+       ) -> chain::ChannelMonitorUpdateStatus {
+               // Determine the proper key for this monitor
+               let monitor_name = MonitorName::from(funding_txo);
+               let maybe_old_monitor = self.read_monitor(&monitor_name);
+               match maybe_old_monitor {
+                       Ok((_, ref old_monitor)) => {
+                               // Check that this key isn't already storing a monitor with a higher update_id
+                               // (collision)
+                               if old_monitor.get_latest_update_id() > monitor.get_latest_update_id() {
+                                       log_error!(
+                                               self.logger,
+                                               "Tried to write a monitor at the same outpoint {} with a higher update_id!",
+                                               monitor_name.as_str()
+                                       );
+                                       return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
+                               }
+                       }
+                       // This means the channel monitor is new.
+                       Err(ref e) if e.kind() == io::ErrorKind::NotFound => {}
+                       _ => return chain::ChannelMonitorUpdateStatus::UnrecoverableError,
+               }
+               // Serialize and write the new monitor
+               let mut monitor_bytes = Vec::with_capacity(
+                       MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
+               );
+               monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
+               monitor.write(&mut monitor_bytes).unwrap();
+               match self.kv_store.write(
+                       CHANNEL_MONITOR_PERSISTENCE_NAMESPACE,
+                       CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE,
+                       monitor_name.as_str(),
+                       &monitor_bytes,
+               ) {
+                       Ok(_) => {
+                               // Assess cleanup. Typically, we'll clean up only between the last two known full
+                               // monitors.
+                               if let Ok((_, old_monitor)) = maybe_old_monitor {
+                                       let start = old_monitor.get_latest_update_id();
+                                       let end = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
+                                               // We don't want to clean the rest of u64, so just do possible pending
+                                               // updates. Note that we never write updates at
+                                               // `CLOSED_CHANNEL_UPDATE_ID`.
+                                               cmp::min(
+                                                       start.saturating_add(self.maximum_pending_updates),
+                                                       CLOSED_CHANNEL_UPDATE_ID - 1,
+                                               )
+                                       } else {
+                                               monitor.get_latest_update_id().saturating_sub(1)
+                                       };
+                                       // We should bother cleaning up only if there's at least one update
+                                       // expected.
+                                       for update_id in start..=end {
+                                               let update_name = UpdateName::from(update_id);
+                                               #[cfg(debug_assertions)]
+                                               {
+                                                       if let Ok(update) =
+                                                               self.read_monitor_update(&monitor_name, &update_name)
+                                                       {
+                                                               // Assert that we are reading what we think we are.
+                                                               debug_assert_eq!(update.update_id, update_name.0);
+                                                       } else if update_id != start && monitor.get_latest_update_id() != CLOSED_CHANNEL_UPDATE_ID
+                                                       {
+                                                               // We're deleting something we should know doesn't exist.
+                                                               panic!(
+                                                                       "failed to read monitor update {}",
+                                                                       update_name.as_str()
+                                                               );
+                                                       }
+                                                       // On closed channels, we will unavoidably try to read
+                                                       // non-existent updates since we have to guess at the range of
+                                                       // stale updates, so do nothing.
+                                               }
+                                               if let Err(e) = self.kv_store.remove(
+                                                       CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE,
+                                                       monitor_name.as_str(),
+                                                       update_name.as_str(),
+                                                       true,
+                                               ) {
+                                                       log_error!(
+                                                               self.logger,
+                                                               "error cleaning up channel monitor updates for monitor {}, reason: {}",
+                                                               monitor_name.as_str(),
+                                                               e
+                                                       );
+                                               };
+                                       }
+                               };
+                               chain::ChannelMonitorUpdateStatus::Completed
+                       }
+                       Err(e) => {
+                               log_error!(
+                                       self.logger,
+                                       "error writing channel monitor {}/{}/{} reason: {}",
+                                       CHANNEL_MONITOR_PERSISTENCE_NAMESPACE,
+                                       CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE,
+                                       monitor_name.as_str(),
+                                       e
+                               );
+                               chain::ChannelMonitorUpdateStatus::UnrecoverableError
+                       }
+               }
+       }
+
+       /// Persists a channel update, writing only the update to the parameterized [`KVStore`] if possible.
+       ///
+       /// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]:
+       ///
+       ///   - No full monitor is found in [`KVStore`]
+       ///   - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`]
+       ///   - LDK commands re-persisting the entire monitor through this function, specifically when
+       ///     `update` is `None`.
+       ///   - The update is at [`CLOSED_CHANNEL_UPDATE_ID`]
+       fn update_persisted_channel(
+               &self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>,
+               monitor: &ChannelMonitor<ChannelSigner>, monitor_update_call_id: MonitorUpdateId,
+       ) -> chain::ChannelMonitorUpdateStatus {
+               // IMPORTANT: monitor_update_call_id: MonitorUpdateId is not to be confused with
+               // ChannelMonitorUpdate's update_id.
+               if let Some(update) = update {
+                       if update.update_id != CLOSED_CHANNEL_UPDATE_ID
+                               && update.update_id % self.maximum_pending_updates != 0
+                       {
+                               let monitor_name = MonitorName::from(funding_txo);
+                               let update_name = UpdateName::from(update.update_id);
+                               match self.kv_store.write(
+                                       CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE,
+                                       monitor_name.as_str(),
+                                       update_name.as_str(),
+                                       &update.encode(),
+                               ) {
+                                       Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
+                                       Err(e) => {
+                                               log_error!(
+                                                       self.logger,
+                                                       "error writing channel monitor update {}/{}/{} reason: {}",
+                                                       CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE,
+                                                       monitor_name.as_str(),
+                                                       update_name.as_str(),
+                                                       e
+                                               );
+                                               chain::ChannelMonitorUpdateStatus::UnrecoverableError
+                                       }
+                               }
+                       } else {
+                               // We could write this update, but it meets criteria of our design that call for a full monitor write.
+                               self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
+                       }
+               } else {
+                       // There is no update given, so we must persist a new monitor.
+                       self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
+               }
+       }
+}
+
+/// A struct representing a name for a monitor.
+#[derive(Debug)]
+struct MonitorName(String);
+
+impl MonitorName {
+       /// Constructs a [`MonitorName`], after verifying that an [`OutPoint`] can
+       /// be formed from the given `name`.
+       pub fn new(name: String) -> Result<Self, io::Error> {
+               MonitorName::do_try_into_outpoint(&name)?;
+               Ok(Self(name))
+       }
+       /// Convert this monitor name to a str.
+       pub fn as_str(&self) -> &str {
+               &self.0
+       }
+       /// Attempt to form a valid [`OutPoint`] from a given name string.
+       fn do_try_into_outpoint(name: &str) -> Result<OutPoint, io::Error> {
+               let mut parts = name.splitn(2, '_');
+               let txid = if let Some(part) = parts.next() {
+                       Txid::from_hex(part).map_err(|_| {
+                               io::Error::new(io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
+                       })?
+               } else {
+                       return Err(io::Error::new(
+                               io::ErrorKind::InvalidData,
+                               "Stored monitor key is not a splittable string",
+                       ));
+               };
+               let index = if let Some(part) = parts.next() {
+                       part.parse().map_err(|_| {
+                               io::Error::new(io::ErrorKind::InvalidData, "Invalid tx index in stored key")
+                       })?
+               } else {
+                       return Err(io::Error::new(
+                               io::ErrorKind::InvalidData,
+                               "No tx index value found after underscore in stored key",
+                       ));
+               };
+               Ok(OutPoint { txid, index })
+       }
+}
+
+impl TryFrom<&MonitorName> for OutPoint {
+       type Error = io::Error;
+
+       fn try_from(value: &MonitorName) -> Result<Self, io::Error> {
+               MonitorName::do_try_into_outpoint(&value.0)
+       }
+}
+
+impl From<OutPoint> for MonitorName {
+       fn from(value: OutPoint) -> Self {
+               MonitorName(format!("{}_{}", value.txid.to_hex(), value.index))
+       }
+}
+
+/// A struct representing a name for an update.
+#[derive(Debug)]
+struct UpdateName(u64, String);
+
+impl UpdateName {
+       /// Constructs an [`UpdateName`], after verifying that an update sequence ID
+       /// can be derived from the given `name`.
+       pub fn new(name: String) -> Result<Self, io::Error> {
+               match name.parse::<u64>() {
+                       Ok(u) => Ok(u.into()),
+                       Err(_) => {
+                               Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name"))
+                       }
+               }
+       }
+
+       /// Convert this monitor update name to a &str
+       pub fn as_str(&self) -> &str {
+               &self.1
+       }
+}
+
+impl From<u64> for UpdateName {
+       fn from(value: u64) -> Self {
+               Self(value, value.to_string())
+       }
+}
+
+#[cfg(test)]
+mod tests {
+       use super::*;
+       use crate::chain::chainmonitor::Persist;
+       use crate::chain::ChannelMonitorUpdateStatus;
+       use crate::events::{ClosureReason, MessageSendEventsProvider};
+       use crate::ln::functional_test_utils::*;
+       use crate::util::test_utils::{self, TestLogger, TestStore};
+       use crate::{check_added_monitors, check_closed_broadcast};
+
+       const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
+
+       #[test]
+       fn converts_u64_to_update_name() {
+               assert_eq!(UpdateName::from(0).as_str(), "0");
+               assert_eq!(UpdateName::from(21).as_str(), "21");
+               assert_eq!(UpdateName::from(u64::MAX).as_str(), "18446744073709551615");
+       }
+
+       #[test]
+       fn bad_update_name_fails() {
+               assert!(UpdateName::new("deadbeef".to_string()).is_err());
+               assert!(UpdateName::new("-1".to_string()).is_err());
+       }
+
+       #[test]
+       fn monitor_from_outpoint_works() {
+               let monitor_name1 = MonitorName::from(OutPoint {
+                       txid: Txid::from_hex("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
+                       index: 1,
+               });
+               assert_eq!(monitor_name1.as_str(), "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1");
+
+               let monitor_name2 = MonitorName::from(OutPoint {
+                       txid: Txid::from_hex("f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef").unwrap(),
+                       index: u16::MAX,
+               });
+               assert_eq!(monitor_name2.as_str(), "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535");
+       }
+
+       #[test]
+       fn bad_monitor_string_fails() {
+               assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string()).is_err());
+               assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string()).is_err());
+               assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string()).is_err());
+       }
+
+       // Exercise the `MonitorUpdatingPersister` with real channels and payments.
+       #[test]
+       fn persister_with_real_monitors() {
+               // This value is used later to limit how many iterations we perform.
+               let test_max_pending_updates = 7;
+               let chanmon_cfgs = create_chanmon_cfgs(4);
+               let persister_0 = MonitorUpdatingPersister {
+                       kv_store: &TestStore::new(false),
+                       logger: &TestLogger::new(),
+                       maximum_pending_updates: test_max_pending_updates,
+                       entropy_source: &chanmon_cfgs[0].keys_manager,
+                       signer_provider: &chanmon_cfgs[0].keys_manager,
+               };
+               let persister_1 = MonitorUpdatingPersister {
+                       kv_store: &TestStore::new(false),
+                       logger: &TestLogger::new(),
+                       // Intentionally set this to a smaller value to test a different alignment.
+                       maximum_pending_updates: 3,
+                       entropy_source: &chanmon_cfgs[1].keys_manager,
+                       signer_provider: &chanmon_cfgs[1].keys_manager,
+               };
+               let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let chain_mon_0 = test_utils::TestChainMonitor::new(
+                       Some(&chanmon_cfgs[0].chain_source),
+                       &chanmon_cfgs[0].tx_broadcaster,
+                       &chanmon_cfgs[0].logger,
+                       &chanmon_cfgs[0].fee_estimator,
+                       &persister_0,
+                       &chanmon_cfgs[0].keys_manager,
+               );
+               let chain_mon_1 = test_utils::TestChainMonitor::new(
+                       Some(&chanmon_cfgs[1].chain_source),
+                       &chanmon_cfgs[1].tx_broadcaster,
+                       &chanmon_cfgs[1].logger,
+                       &chanmon_cfgs[1].fee_estimator,
+                       &persister_1,
+                       &chanmon_cfgs[1].keys_manager,
+               );
+               node_cfgs[0].chain_monitor = chain_mon_0;
+               node_cfgs[1].chain_monitor = chain_mon_1;
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+               let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
+               let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;
+
+               // Check that the persisted channel data is empty before any channels are
+               // open.
+               let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
+                       broadcaster_0, &chanmon_cfgs[0].fee_estimator).unwrap();
+               assert_eq!(persisted_chan_data_0.len(), 0);
+               let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
+                       broadcaster_1, &chanmon_cfgs[1].fee_estimator).unwrap();
+               assert_eq!(persisted_chan_data_1.len(), 0);
+
+               // Helper to make sure the channel is on the expected update ID.
+               macro_rules! check_persisted_data {
+                       ($expected_update_id: expr) => {
+                               persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
+                                       broadcaster_0, &chanmon_cfgs[0].fee_estimator).unwrap();
+                               // check that we stored only one monitor
+                               assert_eq!(persisted_chan_data_0.len(), 1);
+                               for (_, mon) in persisted_chan_data_0.iter() {
+                                       // check that when we read it, we got the right update id
+                                       assert_eq!(mon.get_latest_update_id(), $expected_update_id);
+                                       // if the CM is at the correct update id without updates, ensure no updates are stored
+                                       let monitor_name = MonitorName::from(mon.get_funding_txo().0);
+                                       let (_, cm_0) = persister_0.read_monitor(&monitor_name).unwrap();
+                                       if cm_0.get_latest_update_id() == $expected_update_id {
+                                               assert_eq!(
+                                                       persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE,
+                                                               monitor_name.as_str()).unwrap().len(),
+                                                       0,
+                                                       "updates stored when they shouldn't be in persister 0"
+                                               );
+                                       }
+                               }
+                               persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
+                                       broadcaster_1, &chanmon_cfgs[1].fee_estimator).unwrap();
+                               assert_eq!(persisted_chan_data_1.len(), 1);
+                               for (_, mon) in persisted_chan_data_1.iter() {
+                                       assert_eq!(mon.get_latest_update_id(), $expected_update_id);
+                                       let monitor_name = MonitorName::from(mon.get_funding_txo().0);
+                                       let (_, cm_1) = persister_1.read_monitor(&monitor_name).unwrap();
+                                       if cm_1.get_latest_update_id() == $expected_update_id {
+                                               assert_eq!(
+                                                       persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE,
+                                                               monitor_name.as_str()).unwrap().len(),
+                                                       0,
+                                                       "updates stored when they shouldn't be in persister 1"
+                                               );
+                                       }
+                               }
+                       };
+               }
+
+               // Create some initial channel and check that a channel was persisted.
+               let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
+               check_persisted_data!(0);
+
+               // Send a few payments and make sure the monitors are updated to the latest.
+               send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
+               check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
+               send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
+               check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);
+
+               // Send a few more payments to try all the alignments of max pending updates with
+               // updates for a payment sent and received.
+               let mut sender = 0;
+               for i in 3..=test_max_pending_updates * 2 {
+                       let receiver;
+                       if sender == 0 {
+                               sender = 1;
+                               receiver = 0;
+                       } else {
+                               sender = 0;
+                               receiver = 1;
+                       }
+                       send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000);
+                       check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
+               }
+
+               // Force close because cooperative close doesn't result in any persisted
+               // updates.
+               nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
+
+               check_closed_event(&nodes[0], 1, ClosureReason::HolderForceClosed, false, &[nodes[1].node.get_our_node_id()], 100000);
+               check_closed_broadcast!(nodes[0], true);
+               check_added_monitors!(nodes[0], 1);
+
+               let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
+               assert_eq!(node_txn.len(), 1);
+
+               connect_block(&nodes[1], &create_dummy_block(nodes[0].best_block_hash(), 42, vec![node_txn[0].clone(), node_txn[0].clone()]));
+
+               check_closed_broadcast!(nodes[1], true);
+               check_closed_event(&nodes[1], 1, ClosureReason::CommitmentTxConfirmed, false, &[nodes[0].node.get_our_node_id()], 100000);
+               check_added_monitors!(nodes[1], 1);
+
+               // Make sure everything is persisted as expected after close.
+               check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID);
+
+               // Make sure the expected number of stale updates is present.
+               let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(broadcaster_0, &chanmon_cfgs[0].fee_estimator).unwrap();
+               let (_, monitor) = &persisted_chan_data[0];
+               let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
+               // The channel should have 0 updates, as it wrote a full monitor and consolidated.
+               assert_eq!(persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0);
+               assert_eq!(persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0);
+       }
+
+       // Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a
+       // monitor or update with it results in the persister returning an UnrecoverableError status.
+       #[test]
+       fn unrecoverable_error_on_write_failure() {
+               // Set up a dummy channel and force close. This will produce a monitor
+               // that we can then use to test persistence.
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+               let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
+               nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
+               check_closed_event(&nodes[1], 1, ClosureReason::HolderForceClosed, false, &[nodes[0].node.get_our_node_id()], 100000);
+               {
+                       let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
+                       let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
+                       let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
+                       let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap();
+                       let cmu = &cmu_map.get(&added_monitors[0].0.to_channel_id()).unwrap()[0];
+                       let test_txo = OutPoint { txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), index: 0 };
+
+                       let ro_persister = MonitorUpdatingPersister {
+                               kv_store: &TestStore::new(true),
+                               logger: &TestLogger::new(),
+                               maximum_pending_updates: 11,
+                               entropy_source: node_cfgs[0].keys_manager,
+                               signer_provider: node_cfgs[0].keys_manager,
+                       };
+                       match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
+                               ChannelMonitorUpdateStatus::UnrecoverableError => {
+                                       // correct result
+                               }
+                               ChannelMonitorUpdateStatus::Completed => {
+                                       panic!("Completed persisting new channel when shouldn't have")
+                               }
+                               ChannelMonitorUpdateStatus::InProgress => {
+                                       panic!("Returned InProgress when shouldn't have")
+                               }
+                       }
+                       match ro_persister.update_persisted_channel(test_txo, Some(cmu), &added_monitors[0].1, update_id.2) {
+                               ChannelMonitorUpdateStatus::UnrecoverableError => {
+                                       // correct result
+                               }
+                               ChannelMonitorUpdateStatus::Completed => {
+                                       panic!("Completed persisting new channel when shouldn't have")
+                               }
+                               ChannelMonitorUpdateStatus::InProgress => {
+                                       panic!("Returned InProgress when shouldn't have")
+                               }
+                       }
+                       added_monitors.clear();
+               }
+               nodes[1].node.get_and_clear_pending_msg_events();
+       }
+
+       // Confirm that the `clean_stale_updates` function finds and deletes stale updates.
+       #[test]
+       fn clean_stale_updates_works() {
+               let test_max_pending_updates = 7;
+               let chanmon_cfgs = create_chanmon_cfgs(3);
+               let persister_0 = MonitorUpdatingPersister {
+                       kv_store: &TestStore::new(false),
+                       logger: &TestLogger::new(),
+                       maximum_pending_updates: test_max_pending_updates,
+                       entropy_source: &chanmon_cfgs[0].keys_manager,
+                       signer_provider: &chanmon_cfgs[0].keys_manager,
+               };
+               let persister_1 = MonitorUpdatingPersister {
+                       kv_store: &TestStore::new(false),
+                       logger: &TestLogger::new(),
+                       maximum_pending_updates: test_max_pending_updates,
+                       entropy_source: &chanmon_cfgs[1].keys_manager,
+                       signer_provider: &chanmon_cfgs[1].keys_manager,
+               };
+               let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let chain_mon_0 = test_utils::TestChainMonitor::new(
+                       Some(&chanmon_cfgs[0].chain_source),
+                       &chanmon_cfgs[0].tx_broadcaster,
+                       &chanmon_cfgs[0].logger,
+                       &chanmon_cfgs[0].fee_estimator,
+                       &persister_0,
+                       &chanmon_cfgs[0].keys_manager,
+               );
+               let chain_mon_1 = test_utils::TestChainMonitor::new(
+                       Some(&chanmon_cfgs[1].chain_source),
+                       &chanmon_cfgs[1].tx_broadcaster,
+                       &chanmon_cfgs[1].logger,
+                       &chanmon_cfgs[1].fee_estimator,
+                       &persister_1,
+                       &chanmon_cfgs[1].keys_manager,
+               );
+               node_cfgs[0].chain_monitor = chain_mon_0;
+               node_cfgs[1].chain_monitor = chain_mon_1;
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+               let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
+
+               // Check that the persisted channel data is empty before any channels are
+               // open.
+               let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(broadcaster_0, &chanmon_cfgs[0].fee_estimator).unwrap();
+               assert_eq!(persisted_chan_data.len(), 0);
+
+               // Create some initial channel
+               let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               // Send a few payments to advance the updates a bit
+               send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
+               send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
+
+               // Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible)
+               let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(broadcaster_0, &chanmon_cfgs[0].fee_estimator).unwrap();
+               let (_, monitor) = &persisted_chan_data[0];
+               let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
+               persister_0
+                       .kv_store
+                       .write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str(), &[0u8; 1])
+                       .unwrap();
+
+               // Do the stale update cleanup
+               persister_0.cleanup_stale_updates(false).unwrap();
+
+               // Confirm the stale update is unreadable/gone
+               assert!(persister_0
+                       .kv_store
+                       .read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str())
+                       .is_err());
+
+               // Force close.
+               nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
+               check_closed_event(&nodes[0], 1, ClosureReason::HolderForceClosed, false, &[nodes[1].node.get_our_node_id()], 100000);
+               check_closed_broadcast!(nodes[0], true);
+               check_added_monitors!(nodes[0], 1);
+
+               // Write an update near u64::MAX
+               persister_0
+                       .kv_store
+                       .write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str(), &[0u8; 1])
+                       .unwrap();
+
+               // Do the stale update cleanup
+               persister_0.cleanup_stale_updates(false).unwrap();
+
+               // Confirm the stale update is unreadable/gone
+               assert!(persister_0
+                       .kv_store
+                       .read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str())
+                       .is_err());
+       }
+}
diff --git a/pending_changelog/monitorupdatingpersister.txt b/pending_changelog/monitorupdatingpersister.txt
new file mode 100644 (file)
index 0000000..24d63ff
--- /dev/null
@@ -0,0 +1,5 @@
+## Backwards Compatibility
+
+* The `MonitorUpdatingPersister` can read monitors stored conventionally, such as with the `KVStorePersister` from previous LDK versions. You can use this to migrate _to_ the `MonitorUpdatingPersister`; just "point" `MonitorUpdatingPersister` to existing, fully updated `ChannelMonitors`, and it will read them and work from there. However, downgrading is more complex. Monitors stored with `MonitorUpdatingPersister` have a prepended sentinel value that prevents them from being deserialized by previous `Persist` implementations. This is to ensure that they are not accidentally read and used while pending updates are still stored and not applied, as this could result in penalty transactions. Users who wish to downgrade should perform the following steps:
+  * Make a backup copy of all channel state.
+  * Ensure all updates are applied to the monitors. This may be done by loading all the existing data with the `MonitorUpdatingPersister::read_all_channel_monitors_with_updates` function. You can then write the resulting `ChannelMonitor`s using your previous `Persist` implementation.
\ No newline at end of file