Merge pull request #2384 from jkczyz/2023-06-message-router
[rust-lightning] / lightning / src / ln / channel.rs
index bf1ac638c44645473402c848088fbd3727d389f0..db38588230b77e153f967aba6781296fe459967d 100644 (file)
@@ -224,6 +224,7 @@ struct OutboundHTLCOutput {
        payment_hash: PaymentHash,
        state: OutboundHTLCState,
        source: HTLCSource,
+       skimmed_fee_msat: Option<u64>,
 }
 
 /// See AwaitingRemoteRevoke ChannelState for more info
@@ -235,6 +236,8 @@ enum HTLCUpdateAwaitingACK {
                payment_hash: PaymentHash,
                source: HTLCSource,
                onion_routing_packet: msgs::OnionPacket,
+               // The extra fee we're skimming off the top of this HTLC.
+               skimmed_fee_msat: Option<u64>,
        },
        ClaimHTLC {
                payment_preimage: PaymentPreimage,
@@ -485,13 +488,13 @@ enum UpdateFulfillFetch {
 }
 
 /// The return type of get_update_fulfill_htlc_and_commit.
-pub enum UpdateFulfillCommitFetch<'a> {
+pub enum UpdateFulfillCommitFetch {
        /// Indicates the HTLC fulfill is new, and either generated an update_fulfill message, placed
        /// it in the holding cell, or re-generated the update_fulfill message after the same claim was
        /// previously placed in the holding cell (and has since been removed).
        NewClaim {
                /// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
-               monitor_update: &'a ChannelMonitorUpdate,
+               monitor_update: ChannelMonitorUpdate,
                /// The value of the HTLC which was claimed, in msat.
                htlc_value_msat: u64,
        },
@@ -585,17 +588,10 @@ pub(crate) const DISCONNECT_PEER_AWAITING_RESPONSE_TICKS: usize = 2;
 
 struct PendingChannelMonitorUpdate {
        update: ChannelMonitorUpdate,
-       /// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
-       /// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is
-       /// blocked on some external event and the [`ChannelManager`] will update us when we're ready.
-       ///
-       /// [`ChannelManager`]: super::channelmanager::ChannelManager
-       blocked: bool,
 }
 
 impl_writeable_tlv_based!(PendingChannelMonitorUpdate, {
        (0, update, required),
-       (2, blocked, required),
 });
 
 /// Contains everything about the channel including state, and various flags.
@@ -866,11 +862,9 @@ pub(super) struct ChannelContext<Signer: ChannelSigner> {
        /// [`SignerProvider::derive_channel_signer`].
        channel_keys_id: [u8; 32],
 
-       /// When we generate [`ChannelMonitorUpdate`]s to persist, they may not be persisted immediately.
-       /// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
-       /// completes we still need to be able to complete the persistence. Thus, we have to keep a
-       /// copy of the [`ChannelMonitorUpdate`] here until it is complete.
-       pending_monitor_updates: Vec<PendingChannelMonitorUpdate>,
+       /// If we can't release a [`ChannelMonitorUpdate`] until some external action completes, we
+       /// store it here and only release it to the `ChannelManager` once it asks for it.
+       blocked_monitor_updates: Vec<PendingChannelMonitorUpdate>,
 }
 
 impl<Signer: ChannelSigner> ChannelContext<Signer> {
@@ -2256,7 +2250,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        }
 
        pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
-               let release_cs_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
+               let release_cs_monitor = self.context.blocked_monitor_updates.is_empty();
                match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
                        UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
                                // Even if we aren't supposed to let new monitor updates with commitment state
@@ -2264,43 +2258,30 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                // matter what. Sadly, to push a new monitor update which flies before others
                                // already queued, we have to insert it into the pending queue and update the
                                // update_ids of all the following monitors.
-                               let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
+                               if release_cs_monitor && msg.is_some() {
                                        let mut additional_update = self.build_commitment_no_status_check(logger);
                                        // build_commitment_no_status_check may bump latest_monitor_id but we want them
                                        // to be strictly increasing by one, so decrement it here.
                                        self.context.latest_monitor_update_id = monitor_update.update_id;
                                        monitor_update.updates.append(&mut additional_update.updates);
-                                       self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
-                                               update: monitor_update, blocked: false,
-                                       });
-                                       self.context.pending_monitor_updates.len() - 1
                                } else {
-                                       let insert_pos = self.context.pending_monitor_updates.iter().position(|upd| upd.blocked)
-                                               .unwrap_or(self.context.pending_monitor_updates.len());
-                                       let new_mon_id = self.context.pending_monitor_updates.get(insert_pos)
+                                       let new_mon_id = self.context.blocked_monitor_updates.get(0)
                                                .map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
                                        monitor_update.update_id = new_mon_id;
-                                       self.context.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
-                                               update: monitor_update, blocked: false,
-                                       });
-                                       for held_update in self.context.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
+                                       for held_update in self.context.blocked_monitor_updates.iter_mut() {
                                                held_update.update.update_id += 1;
                                        }
                                        if msg.is_some() {
                                                debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
                                                let update = self.build_commitment_no_status_check(logger);
-                                               self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
-                                                       update, blocked: true,
+                                               self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
+                                                       update,
                                                });
                                        }
-                                       insert_pos
-                               };
-                               self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
-                               UpdateFulfillCommitFetch::NewClaim {
-                                       monitor_update: &self.context.pending_monitor_updates.get(unblocked_update_pos)
-                                               .expect("We just pushed the monitor update").update,
-                                       htlc_value_msat,
                                }
+
+                               self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
+                               UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, }
                        },
                        UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
                }
@@ -2790,7 +2771,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                Ok(())
        }
 
-       pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError>
+       pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<ChannelMonitorUpdate>, ChannelError>
                where L::Target: Logger
        {
                if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3014,7 +2995,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        /// Public version of the below, checking relevant preconditions first.
        /// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
        /// returns `(None, Vec::new())`.
-       pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
+       pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
                if self.context.channel_state >= ChannelState::ChannelReady as u32 &&
                   (self.context.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
                        self.free_holding_cell_htlcs(logger)
@@ -3023,7 +3004,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
        /// Frees any pending commitment updates in the holding cell, generating the relevant messages
        /// for our counterparty.
-       fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
+       fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
                assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
                if self.context.holding_cell_htlc_updates.len() != 0 || self.context.holding_cell_update_fee.is_some() {
                        log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.context.holding_cell_htlc_updates.len(),
@@ -3047,8 +3028,13 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                // handling this case better and maybe fulfilling some of the HTLCs while attempting
                                // to rebalance channels.
                                match &htlc_update {
-                                       &HTLCUpdateAwaitingACK::AddHTLC {amount_msat, cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet, ..} => {
-                                               match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), false, logger) {
+                                       &HTLCUpdateAwaitingACK::AddHTLC {
+                                               amount_msat, cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet,
+                                               skimmed_fee_msat, ..
+                                       } => {
+                                               match self.send_htlc(amount_msat, *payment_hash, cltv_expiry, source.clone(),
+                                                       onion_routing_packet.clone(), false, skimmed_fee_msat, logger)
+                                               {
                                                        Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()),
                                                        Err(e) => {
                                                                match e {
@@ -3134,7 +3120,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
        /// generating an appropriate error *after* the channel state has been updated based on the
        /// revoke_and_ack message.
-       pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError>
+       pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<ChannelMonitorUpdate>), ChannelError>
                where L::Target: Logger,
        {
                if (self.context.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
@@ -3336,8 +3322,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                }
 
                match self.free_holding_cell_htlcs(logger) {
-                       (Some(_), htlcs_to_fail) => {
-                               let mut additional_update = self.context.pending_monitor_updates.pop().unwrap().update;
+                       (Some(mut additional_update), htlcs_to_fail) => {
                                // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
                                // strictly increasing by one, so decrement it here.
                                self.context.latest_monitor_update_id = monitor_update.update_id;
@@ -3553,12 +3538,6 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        {
                assert_eq!(self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
                self.context.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
-               let mut found_blocked = false;
-               self.context.pending_monitor_updates.retain(|upd| {
-                       if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
-                       if upd.blocked { found_blocked = true; }
-                       upd.blocked
-               });
 
                // If we're past (or at) the FundingSent stage on an outbound channel, try to
                // (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -3690,6 +3669,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                        payment_hash: htlc.payment_hash,
                                        cltv_expiry: htlc.cltv_expiry,
                                        onion_routing_packet: (**onion_packet).clone(),
+                                       skimmed_fee_msat: htlc.skimmed_fee_msat,
                                });
                        }
                }
@@ -4061,7 +4041,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
 
        pub fn shutdown<SP: Deref>(
                &mut self, signer_provider: &SP, their_features: &InitFeatures, msg: &msgs::Shutdown
-       ) -> Result<(Option<msgs::Shutdown>, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
+       ) -> Result<(Option<msgs::Shutdown>, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), ChannelError>
        where SP::Target: SignerProvider
        {
                if self.context.channel_state & (ChannelState::PeerDisconnected as u32) == ChannelState::PeerDisconnected as u32 {
@@ -4127,9 +4107,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                }],
                        };
                        self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
-                       if self.push_blockable_mon_update(monitor_update) {
-                               self.context.pending_monitor_updates.last().map(|upd| &upd.update)
-                       } else { None }
+                       self.push_ret_blockable_mon_update(monitor_update)
                } else { None };
                let shutdown = if send_shutdown {
                        Some(msgs::Shutdown {
@@ -4419,64 +4397,37 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                (self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
        }
 
-       pub fn get_latest_complete_monitor_update_id(&self) -> u64 {
-               if self.context.pending_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); }
-               self.context.pending_monitor_updates[0].update.update_id - 1
+       /// Gets the latest [`ChannelMonitorUpdate`] ID which has been released and is in-flight.
+       pub fn get_latest_unblocked_monitor_update_id(&self) -> u64 {
+               if self.context.blocked_monitor_updates.is_empty() { return self.context.get_latest_monitor_update_id(); }
+               self.context.blocked_monitor_updates[0].update.update_id - 1
        }
 
        /// Returns the next blocked monitor update, if one exists, and a bool which indicates a
        /// further blocked monitor update exists after the next.
-       pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> {
-               for i in 0..self.context.pending_monitor_updates.len() {
-                       if self.context.pending_monitor_updates[i].blocked {
-                               self.context.pending_monitor_updates[i].blocked = false;
-                               return Some((&self.context.pending_monitor_updates[i].update,
-                                       self.context.pending_monitor_updates.len() > i + 1));
-                       }
-               }
-               None
-       }
-
-       /// Pushes a new monitor update into our monitor update queue, returning whether it should be
-       /// immediately given to the user for persisting or if it should be held as blocked.
-       fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool {
-               let release_monitor = self.context.pending_monitor_updates.iter().all(|upd| !upd.blocked);
-               self.context.pending_monitor_updates.push(PendingChannelMonitorUpdate {
-                       update, blocked: !release_monitor
-               });
-               release_monitor
+       pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
+               if self.context.blocked_monitor_updates.is_empty() { return None; }
+               Some((self.context.blocked_monitor_updates.remove(0).update,
+                       !self.context.blocked_monitor_updates.is_empty()))
        }
 
-       /// Pushes a new monitor update into our monitor update queue, returning a reference to it if
-       /// it should be immediately given to the user for persisting or `None` if it should be held as
-       /// blocked.
+       /// Pushes a new monitor update into our monitor update queue, returning it if it should be
+       /// immediately given to the user for persisting or `None` if it should be held as blocked.
        fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
-       -> Option<&ChannelMonitorUpdate> {
-               let release_monitor = self.push_blockable_mon_update(update);
-               if release_monitor { self.context.pending_monitor_updates.last().map(|upd| &upd.update) } else { None }
-       }
-
-       pub fn no_monitor_updates_pending(&self) -> bool {
-               self.context.pending_monitor_updates.is_empty()
-       }
-
-       pub fn complete_all_mon_updates_through(&mut self, update_id: u64) {
-               self.context.pending_monitor_updates.retain(|upd| {
-                       if upd.update.update_id <= update_id {
-                               assert!(!upd.blocked, "Completed update must have flown");
-                               false
-                       } else { true }
-               });
-       }
-
-       pub fn complete_one_mon_update(&mut self, update_id: u64) {
-               self.context.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
+       -> Option<ChannelMonitorUpdate> {
+               let release_monitor = self.context.blocked_monitor_updates.is_empty();
+               if !release_monitor {
+                       self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
+                               update,
+                       });
+                       None
+               } else {
+                       Some(update)
+               }
        }
 
-       /// Returns an iterator over all unblocked monitor updates which have not yet completed.
-       pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator<Item=&ChannelMonitorUpdate> {
-               self.context.pending_monitor_updates.iter()
-                       .filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) })
+       pub fn blocked_monitor_updates_pending(&self) -> usize {
+               self.context.blocked_monitor_updates.len()
        }
 
        /// Returns true if the channel is awaiting the persistence of the initial ChannelMonitor.
@@ -5037,11 +4988,13 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        /// commitment update.
        ///
        /// `Err`s will only be [`ChannelError::Ignore`].
-       pub fn queue_add_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
-               onion_routing_packet: msgs::OnionPacket, logger: &L)
-       -> Result<(), ChannelError> where L::Target: Logger {
+       pub fn queue_add_htlc<L: Deref>(
+               &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
+               onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option<u64>, logger: &L
+       ) -> Result<(), ChannelError> where L::Target: Logger {
                self
-                       .send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, true, logger)
+                       .send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, true,
+                               skimmed_fee_msat, logger)
                        .map(|msg_opt| assert!(msg_opt.is_none(), "We forced holding cell?"))
                        .map_err(|err| {
                                if let ChannelError::Ignore(_) = err { /* fine */ }
@@ -5066,9 +5019,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        /// on this [`Channel`] if `force_holding_cell` is false.
        ///
        /// `Err`s will only be [`ChannelError::Ignore`].
-       fn send_htlc<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
-               onion_routing_packet: msgs::OnionPacket, mut force_holding_cell: bool, logger: &L)
-       -> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
+       fn send_htlc<L: Deref>(
+               &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
+               onion_routing_packet: msgs::OnionPacket, mut force_holding_cell: bool,
+               skimmed_fee_msat: Option<u64>, logger: &L
+       ) -> Result<Option<msgs::UpdateAddHTLC>, ChannelError> where L::Target: Logger {
                if (self.context.channel_state & (ChannelState::ChannelReady as u32 | BOTH_SIDES_SHUTDOWN_MASK)) != (ChannelState::ChannelReady as u32) {
                        return Err(ChannelError::Ignore("Cannot send HTLC until channel is fully established and we haven't started shutting down".to_owned()));
                }
@@ -5120,6 +5075,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                cltv_expiry,
                                source,
                                onion_routing_packet,
+                               skimmed_fee_msat,
                        });
                        return Ok(None);
                }
@@ -5131,6 +5087,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        cltv_expiry,
                        state: OutboundHTLCState::LocalAnnounced(Box::new(onion_routing_packet.clone())),
                        source,
+                       skimmed_fee_msat,
                });
 
                let res = msgs::UpdateAddHTLC {
@@ -5140,6 +5097,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                        payment_hash,
                        cltv_expiry,
                        onion_routing_packet,
+                       skimmed_fee_msat,
                };
                self.context.next_holder_htlc_id += 1;
 
@@ -5278,8 +5236,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        ///
        /// Shorthand for calling [`Self::send_htlc`] followed by a commitment update, see docs on
        /// [`Self::send_htlc`] and [`Self::build_commitment_no_state_update`] for more info.
-       pub fn send_htlc_and_commit<L: Deref>(&mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource, onion_routing_packet: msgs::OnionPacket, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
-               let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source, onion_routing_packet, false, logger);
+       pub fn send_htlc_and_commit<L: Deref>(
+               &mut self, amount_msat: u64, payment_hash: PaymentHash, cltv_expiry: u32, source: HTLCSource,
+               onion_routing_packet: msgs::OnionPacket, skimmed_fee_msat: Option<u64>, logger: &L
+       ) -> Result<Option<ChannelMonitorUpdate>, ChannelError> where L::Target: Logger {
+               let send_res = self.send_htlc(amount_msat, payment_hash, cltv_expiry, source,
+                       onion_routing_packet, false, skimmed_fee_msat, logger);
                if let Err(e) = &send_res { if let ChannelError::Ignore(_) = e {} else { debug_assert!(false, "Sending cannot trigger channel failure"); } }
                match send_res? {
                        Some(_) => {
@@ -5311,7 +5273,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
        /// [`ChannelMonitorUpdate`] will be returned).
        pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures,
                target_feerate_sats_per_kw: Option<u32>, override_shutdown_script: Option<ShutdownScript>)
-       -> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
+       -> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
        where SP::Target: SignerProvider {
                for htlc in self.context.pending_outbound_htlcs.iter() {
                        if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
@@ -5382,9 +5344,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
                                }],
                        };
                        self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
-                       if self.push_blockable_mon_update(monitor_update) {
-                               self.context.pending_monitor_updates.last().map(|upd| &upd.update)
-                       } else { None }
+                       self.push_ret_blockable_mon_update(monitor_update)
                } else { None };
                let shutdown = msgs::Shutdown {
                        channel_id: self.context.channel_id,
@@ -5622,7 +5582,7 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
                                channel_type,
                                channel_keys_id,
 
-                               pending_monitor_updates: Vec::new(),
+                               blocked_monitor_updates: Vec::new(),
                        }
                })
        }
@@ -5707,12 +5667,9 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
                // Optionally, if the user would like to negotiate the `anchors_zero_fee_htlc_tx` option, we
                // set it now. If they don't understand it, we'll fall back to our default of
                // `only_static_remotekey`.
-               #[cfg(anchors)]
-               { // Attributes are not allowed on if expressions on our current MSRV of 1.41.
-                       if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx &&
-                               their_features.supports_anchors_zero_fee_htlc_tx() {
-                               ret.set_anchors_zero_fee_htlc_tx_required();
-                       }
+               if config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx &&
+                       their_features.supports_anchors_zero_fee_htlc_tx() {
+                       ret.set_anchors_zero_fee_htlc_tx_required();
                }
 
                ret
@@ -6251,7 +6208,7 @@ impl<Signer: WriteableEcdsaChannelSigner> InboundV1Channel<Signer> {
                                channel_type,
                                channel_keys_id,
 
-                               pending_monitor_updates: Vec::new(),
+                               blocked_monitor_updates: Vec::new(),
                        }
                };
 
@@ -6602,9 +6559,10 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for Channel<Signer> {
                }
 
                let mut preimages: Vec<&Option<PaymentPreimage>> = vec![];
+               let mut pending_outbound_skimmed_fees: Vec<Option<u64>> = Vec::new();
 
                (self.context.pending_outbound_htlcs.len() as u64).write(writer)?;
-               for htlc in self.context.pending_outbound_htlcs.iter() {
+               for (idx, htlc) in self.context.pending_outbound_htlcs.iter().enumerate() {
                        htlc.htlc_id.write(writer)?;
                        htlc.amount_msat.write(writer)?;
                        htlc.cltv_expiry.write(writer)?;
@@ -6640,18 +6598,37 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for Channel<Signer> {
                                        reason.write(writer)?;
                                }
                        }
+                       if let Some(skimmed_fee) = htlc.skimmed_fee_msat {
+                               if pending_outbound_skimmed_fees.is_empty() {
+                                       for _ in 0..idx { pending_outbound_skimmed_fees.push(None); }
+                               }
+                               pending_outbound_skimmed_fees.push(Some(skimmed_fee));
+                       } else if !pending_outbound_skimmed_fees.is_empty() {
+                               pending_outbound_skimmed_fees.push(None);
+                       }
                }
 
+               let mut holding_cell_skimmed_fees: Vec<Option<u64>> = Vec::new();
                (self.context.holding_cell_htlc_updates.len() as u64).write(writer)?;
-               for update in self.context.holding_cell_htlc_updates.iter() {
+               for (idx, update) in self.context.holding_cell_htlc_updates.iter().enumerate() {
                        match update {
-                               &HTLCUpdateAwaitingACK::AddHTLC { ref amount_msat, ref cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet } => {
+                               &HTLCUpdateAwaitingACK::AddHTLC {
+                                       ref amount_msat, ref cltv_expiry, ref payment_hash, ref source, ref onion_routing_packet,
+                                       skimmed_fee_msat,
+                               } => {
                                        0u8.write(writer)?;
                                        amount_msat.write(writer)?;
                                        cltv_expiry.write(writer)?;
                                        payment_hash.write(writer)?;
                                        source.write(writer)?;
                                        onion_routing_packet.write(writer)?;
+
+                                       if let Some(skimmed_fee) = skimmed_fee_msat {
+                                               if holding_cell_skimmed_fees.is_empty() {
+                                                       for _ in 0..idx { holding_cell_skimmed_fees.push(None); }
+                                               }
+                                               holding_cell_skimmed_fees.push(Some(skimmed_fee));
+                                       } else if !holding_cell_skimmed_fees.is_empty() { holding_cell_skimmed_fees.push(None); }
                                },
                                &HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_preimage, ref htlc_id } => {
                                        1u8.write(writer)?;
@@ -6817,7 +6794,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for Channel<Signer> {
                        (28, holder_max_accepted_htlcs, option),
                        (29, self.context.temporary_channel_id, option),
                        (31, channel_pending_event_emitted, option),
-                       (33, self.context.pending_monitor_updates, vec_type),
+                       (33, self.context.blocked_monitor_updates, vec_type),
+                       (35, pending_outbound_skimmed_fees, optional_vec),
+                       (37, holding_cell_skimmed_fees, optional_vec),
                });
 
                Ok(())
@@ -6928,6 +6907,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                                        },
                                        _ => return Err(DecodeError::InvalidValue),
                                },
+                               skimmed_fee_msat: None,
                        });
                }
 
@@ -6941,6 +6921,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                                        payment_hash: Readable::read(reader)?,
                                        source: Readable::read(reader)?,
                                        onion_routing_packet: Readable::read(reader)?,
+                                       skimmed_fee_msat: None,
                                },
                                1 => HTLCUpdateAwaitingACK::ClaimHTLC {
                                        payment_preimage: Readable::read(reader)?,
@@ -7094,7 +7075,10 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                let mut temporary_channel_id: Option<[u8; 32]> = None;
                let mut holder_max_accepted_htlcs: Option<u16> = None;
 
-               let mut pending_monitor_updates = Some(Vec::new());
+               let mut blocked_monitor_updates = Some(Vec::new());
+
+               let mut pending_outbound_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
+               let mut holding_cell_skimmed_fees_opt: Option<Vec<Option<u64>>> = None;
 
                read_tlv_fields!(reader, {
                        (0, announcement_sigs, option),
@@ -7118,7 +7102,9 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                        (28, holder_max_accepted_htlcs, option),
                        (29, temporary_channel_id, option),
                        (31, channel_pending_event_emitted, option),
-                       (33, pending_monitor_updates, vec_type),
+                       (33, blocked_monitor_updates, vec_type),
+                       (35, pending_outbound_skimmed_fees_opt, optional_vec),
+                       (37, holding_cell_skimmed_fees_opt, optional_vec),
                });
 
                let (channel_keys_id, holder_signer) = if let Some(channel_keys_id) = channel_keys_id {
@@ -7177,6 +7163,25 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
 
                let holder_max_accepted_htlcs = holder_max_accepted_htlcs.unwrap_or(DEFAULT_MAX_HTLCS);
 
+               if let Some(skimmed_fees) = pending_outbound_skimmed_fees_opt {
+                       let mut iter = skimmed_fees.into_iter();
+                       for htlc in pending_outbound_htlcs.iter_mut() {
+                               htlc.skimmed_fee_msat = iter.next().ok_or(DecodeError::InvalidValue)?;
+                       }
+                       // We expect all skimmed fees to be consumed above
+                       if iter.next().is_some() { return Err(DecodeError::InvalidValue) }
+               }
+               if let Some(skimmed_fees) = holding_cell_skimmed_fees_opt {
+                       let mut iter = skimmed_fees.into_iter();
+                       for htlc in holding_cell_htlc_updates.iter_mut() {
+                               if let HTLCUpdateAwaitingACK::AddHTLC { ref mut skimmed_fee_msat, .. } = htlc {
+                                       *skimmed_fee_msat = iter.next().ok_or(DecodeError::InvalidValue)?;
+                               }
+                       }
+                       // We expect all skimmed fees to be consumed above
+                       if iter.next().is_some() { return Err(DecodeError::InvalidValue) }
+               }
+
                Ok(Channel {
                        context: ChannelContext {
                                user_id,
@@ -7294,7 +7299,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
                                channel_type: channel_type.unwrap(),
                                channel_keys_id,
 
-                               pending_monitor_updates: pending_monitor_updates.unwrap(),
+                               blocked_monitor_updates: blocked_monitor_updates.unwrap(),
                        }
                })
        }
@@ -7311,7 +7316,6 @@ mod tests {
        use hex;
        use crate::ln::PaymentHash;
        use crate::ln::channelmanager::{self, HTLCSource, PaymentId};
-       #[cfg(anchors)]
        use crate::ln::channel::InitFeatures;
        use crate::ln::channel::{Channel, InboundHTLCOutput, OutboundV1Channel, InboundV1Channel, OutboundHTLCOutput, InboundHTLCState, OutboundHTLCState, HTLCCandidate, HTLCInitiator, commit_tx_fee_msat};
        use crate::ln::channel::{MAX_FUNDING_SATOSHIS_NO_WUMBO, TOTAL_BITCOIN_SUPPLY_SATOSHIS, MIN_THEIR_CHAN_RESERVE_SATOSHIS};
@@ -7401,7 +7405,7 @@ mod tests {
                }
        }
 
-       #[cfg(not(feature = "grind_signatures"))]
+       #[cfg(all(feature = "_test_vectors", not(feature = "grind_signatures")))]
        fn public_from_secret_hex(secp_ctx: &Secp256k1<bitcoin::secp256k1::All>, hex: &str) -> PublicKey {
                PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&hex::decode(hex).unwrap()[..]).unwrap())
        }
@@ -7519,7 +7523,8 @@ mod tests {
                                session_priv: SecretKey::from_slice(&hex::decode("0fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap()[..]).unwrap(),
                                first_hop_htlc_msat: 548,
                                payment_id: PaymentId([42; 32]),
-                       }
+                       },
+                       skimmed_fee_msat: None,
                });
 
                // Make sure when Node A calculates their local commitment transaction, none of the HTLCs pass
@@ -8076,6 +8081,7 @@ mod tests {
                                payment_hash: PaymentHash([0; 32]),
                                state: OutboundHTLCState::Committed,
                                source: HTLCSource::dummy(),
+                               skimmed_fee_msat: None,
                        };
                        out.payment_hash.0 = Sha256::hash(&hex::decode("0202020202020202020202020202020202020202020202020202020202020202").unwrap()).into_inner();
                        out
@@ -8088,6 +8094,7 @@ mod tests {
                                payment_hash: PaymentHash([0; 32]),
                                state: OutboundHTLCState::Committed,
                                source: HTLCSource::dummy(),
+                               skimmed_fee_msat: None,
                        };
                        out.payment_hash.0 = Sha256::hash(&hex::decode("0303030303030303030303030303030303030303030303030303030303030303").unwrap()).into_inner();
                        out
@@ -8498,6 +8505,7 @@ mod tests {
                                payment_hash: PaymentHash([0; 32]),
                                state: OutboundHTLCState::Committed,
                                source: HTLCSource::dummy(),
+                               skimmed_fee_msat: None,
                        };
                        out.payment_hash.0 = Sha256::hash(&hex::decode("0505050505050505050505050505050505050505050505050505050505050505").unwrap()).into_inner();
                        out
@@ -8510,6 +8518,7 @@ mod tests {
                                payment_hash: PaymentHash([0; 32]),
                                state: OutboundHTLCState::Committed,
                                source: HTLCSource::dummy(),
+                               skimmed_fee_msat: None,
                        };
                        out.payment_hash.0 = Sha256::hash(&hex::decode("0505050505050505050505050505050505050505050505050505050505050505").unwrap()).into_inner();
                        out
@@ -8630,7 +8639,6 @@ mod tests {
                assert!(res.is_ok());
        }
 
-       #[cfg(anchors)]
        #[test]
        fn test_supports_anchors_zero_htlc_tx_fee() {
                // Tests that if both sides support and negotiate `anchors_zero_fee_htlc_tx`, it is the
@@ -8676,7 +8684,6 @@ mod tests {
                assert_eq!(channel_b.context.channel_type, expected_channel_type);
        }
 
-       #[cfg(anchors)]
        #[test]
        fn test_rejects_implicit_simple_anchors() {
                // Tests that if `option_anchors` is being negotiated implicitly through the intersection of
@@ -8717,7 +8724,6 @@ mod tests {
                assert!(channel_b.is_err());
        }
 
-       #[cfg(anchors)]
        #[test]
        fn test_rejects_simple_anchors_channel_type() {
                // Tests that if `option_anchors` is being negotiated through the `channel_type` feature,