Track pending update_add_htlcs in ChannelManager for later processing
authorWilmer Paulino <wilmer@wilmerpaulino.com>
Fri, 8 Mar 2024 07:03:25 +0000 (23:03 -0800)
committerWilmer Paulino <wilmer@wilmerpaulino.com>
Wed, 27 Mar 2024 21:28:01 +0000 (14:28 -0700)
We plan to decode the onions of these `update_add_htlc`s as part of the
HTLC forwarding flow (i.e., `process_pending_htlc_forwards`), so we'll
need to track them per-channel at the `ChannelManager` level.

lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs

index cadb48b4955c39f546785d9f38936b9f15f91a9d..31b2dab38d1299191305380ab25d6b3c8fc21e55 100644 (file)
@@ -1072,6 +1072,7 @@ pub(super) struct MonitorRestoreUpdates {
        pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
        pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
        pub finalized_claimed_htlcs: Vec<HTLCSource>,
+       pub pending_update_adds: Vec<msgs::UpdateAddHTLC>,
        pub funding_broadcastable: Option<Transaction>,
        pub channel_ready: Option<msgs::ChannelReady>,
        pub announcement_sigs: Option<msgs::AnnouncementSignatures>,
@@ -5251,13 +5252,16 @@ impl<SP: Deref> Channel<SP> where
                mem::swap(&mut failed_htlcs, &mut self.context.monitor_pending_failures);
                let mut finalized_claimed_htlcs = Vec::new();
                mem::swap(&mut finalized_claimed_htlcs, &mut self.context.monitor_pending_finalized_fulfills);
+               let mut pending_update_adds = Vec::new();
+               mem::swap(&mut pending_update_adds, &mut self.context.monitor_pending_update_adds);
 
                if self.context.channel_state.is_peer_disconnected() {
                        self.context.monitor_pending_revoke_and_ack = false;
                        self.context.monitor_pending_commitment_signed = false;
                        return MonitorRestoreUpdates {
                                raa: None, commitment_update: None, order: RAACommitmentOrder::RevokeAndACKFirst,
-                               accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, channel_ready, announcement_sigs
+                               accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, pending_update_adds,
+                               funding_broadcastable, channel_ready, announcement_sigs
                        };
                }
 
@@ -5279,7 +5283,8 @@ impl<SP: Deref> Channel<SP> where
                        if commitment_update.is_some() { "a" } else { "no" }, if raa.is_some() { "an" } else { "no" },
                        match order { RAACommitmentOrder::CommitmentFirst => "commitment", RAACommitmentOrder::RevokeAndACKFirst => "RAA"});
                MonitorRestoreUpdates {
-                       raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, channel_ready, announcement_sigs
+                       raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs,
+                       pending_update_adds, funding_broadcastable, channel_ready, announcement_sigs
                }
        }
 
index 5053378652a68f3465b17877d312d378e3b257ce..3afc61095c309a6a81d339f1083de7e5842f5aab 100644 (file)
@@ -1181,6 +1181,8 @@ where
 //  |   |
 //  |   |__`pending_intercepted_htlcs`
 //  |
+//  |__`decode_update_add_htlcs`
+//  |
 //  |__`per_peer_state`
 //      |
 //      |__`pending_inbound_payments`
@@ -1271,6 +1273,18 @@ where
        /// See `ChannelManager` struct-level documentation for lock order requirements.
        pending_intercepted_htlcs: Mutex<HashMap<InterceptId, PendingAddHTLCInfo>>,
 
+       /// SCID/SCID Alias -> pending `update_add_htlc`s to decode.
+       ///
+       /// Note that because we may have an SCID Alias as the key we can have two entries per channel,
+       /// though in practice we probably won't be receiving HTLCs for a channel both via the alias
+       /// and via the classic SCID.
+       ///
+       /// Note that no consistency guarantees are made about the existence of a channel with the
+       /// `short_channel_id` here, nor the `channel_id` in `UpdateAddHTLC`!
+       ///
+       /// See `ChannelManager` struct-level documentation for lock order requirements.
+       decode_update_add_htlcs: Mutex<HashMap<u64, Vec<msgs::UpdateAddHTLC>>>,
+
        /// The sets of payments which are claimable or currently being claimed. See
        /// [`ClaimablePayments`]' individual field docs for more info.
        ///
@@ -2238,9 +2252,9 @@ macro_rules! handle_monitor_update_completion {
                let update_actions = $peer_state.monitor_update_blocked_actions
                        .remove(&$chan.context.channel_id()).unwrap_or(Vec::new());
 
-               let htlc_forwards = $self.handle_channel_resumption(
+               let (htlc_forwards, decode_update_add_htlcs) = $self.handle_channel_resumption(
                        &mut $peer_state.pending_msg_events, $chan, updates.raa,
-                       updates.commitment_update, updates.order, updates.accepted_htlcs,
+                       updates.commitment_update, updates.order, updates.accepted_htlcs, updates.pending_update_adds,
                        updates.funding_broadcastable, updates.channel_ready,
                        updates.announcement_sigs);
                if let Some(upd) = channel_update {
@@ -2301,6 +2315,9 @@ macro_rules! handle_monitor_update_completion {
                if let Some(forwards) = htlc_forwards {
                        $self.forward_htlcs(&mut [forwards][..]);
                }
+               if let Some(decode) = decode_update_add_htlcs {
+                       $self.push_decode_update_add_htlcs(decode);
+               }
                $self.finalize_claims(updates.finalized_claimed_htlcs);
                for failure in updates.failed_htlcs.drain(..) {
                        let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
@@ -2477,6 +2494,7 @@ where
                        pending_inbound_payments: Mutex::new(new_hash_map()),
                        pending_outbound_payments: OutboundPayments::new(),
                        forward_htlcs: Mutex::new(new_hash_map()),
+                       decode_update_add_htlcs: Mutex::new(new_hash_map()),
                        claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }),
                        pending_intercepted_htlcs: Mutex::new(new_hash_map()),
                        outpoint_to_peer: Mutex::new(new_hash_map()),
@@ -5929,24 +5947,31 @@ where
        fn handle_channel_resumption(&self, pending_msg_events: &mut Vec<MessageSendEvent>,
                channel: &mut Channel<SP>, raa: Option<msgs::RevokeAndACK>,
                commitment_update: Option<msgs::CommitmentUpdate>, order: RAACommitmentOrder,
-               pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option<Transaction>,
+               pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec<msgs::UpdateAddHTLC>,
+               funding_broadcastable: Option<Transaction>,
                channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
-       -> Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> {
+       -> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<msgs::UpdateAddHTLC>)>) {
                let logger = WithChannelContext::from(&self.logger, &channel.context);
-               log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement",
+               log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement",
                        &channel.context.channel_id(),
                        if raa.is_some() { "an" } else { "no" },
-                       if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(),
+                       if commitment_update.is_some() { "a" } else { "no" },
+                       pending_forwards.len(), pending_update_adds.len(),
                        if funding_broadcastable.is_some() { "" } else { "not " },
                        if channel_ready.is_some() { "sending" } else { "without" },
                        if announcement_sigs.is_some() { "sending" } else { "without" });
 
-               let mut htlc_forwards = None;
-
                let counterparty_node_id = channel.context.get_counterparty_node_id();
+               let short_channel_id = channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias());
+
+               let mut htlc_forwards = None;
                if !pending_forwards.is_empty() {
-                       htlc_forwards = Some((channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias()),
-                               channel.context.get_funding_txo().unwrap(), channel.context.channel_id(), channel.context.get_user_id(), pending_forwards));
+                       htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(),
+                               channel.context.channel_id(), channel.context.get_user_id(), pending_forwards));
+               }
+               let mut decode_update_add_htlcs = None;
+               if !pending_update_adds.is_empty() {
+                       decode_update_add_htlcs = Some((short_channel_id, pending_update_adds));
                }
 
                if let Some(msg) = channel_ready {
@@ -5997,7 +6022,7 @@ where
                        emit_channel_ready_event!(pending_events, channel);
                }
 
-               htlc_forwards
+               (htlc_forwards, decode_update_add_htlcs)
        }
 
        fn channel_monitor_updated(&self, funding_txo: &OutPoint, channel_id: &ChannelId, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
@@ -6971,6 +6996,15 @@ where
                }
        }
 
+       fn push_decode_update_add_htlcs(&self, mut update_add_htlcs: (u64, Vec<msgs::UpdateAddHTLC>)) {
+               let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
+               let scid = update_add_htlcs.0;
+               match decode_update_add_htlcs.entry(scid) {
+                       hash_map::Entry::Occupied(mut e) => { e.get_mut().append(&mut update_add_htlcs.1); },
+                       hash_map::Entry::Vacant(e) => { e.insert(update_add_htlcs.1); },
+               }
+       }
+
        #[inline]
        fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
                for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
@@ -7307,10 +7341,11 @@ where
                                                        }
                                                }
                                                let need_lnd_workaround = chan.context.workaround_lnd_bug_4006.take();
-                                               let htlc_forwards = self.handle_channel_resumption(
+                                               let (htlc_forwards, decode_update_add_htlcs) = self.handle_channel_resumption(
                                                        &mut peer_state.pending_msg_events, chan, responses.raa, responses.commitment_update, responses.order,
-                                                       Vec::new(), None, responses.channel_ready, responses.announcement_sigs);
+                                                       Vec::new(), Vec::new(), None, responses.channel_ready, responses.announcement_sigs);
                                                debug_assert!(htlc_forwards.is_none());
+                                               debug_assert!(decode_update_add_htlcs.is_none());
                                                if let Some(upd) = channel_update {
                                                        peer_state.pending_msg_events.push(upd);
                                                }
@@ -10192,6 +10227,12 @@ where
                        }
                }
 
+               let mut decode_update_add_htlcs_opt = None;
+               let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
+               if !decode_update_add_htlcs.is_empty() {
+                       decode_update_add_htlcs_opt = Some(decode_update_add_htlcs);
+               }
+
                let per_peer_state = self.per_peer_state.write().unwrap();
 
                let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap();
@@ -10343,6 +10384,7 @@ where
                        (10, in_flight_monitor_updates, option),
                        (11, self.probing_cookie_secret, required),
                        (13, htlc_onion_fields, optional_vec),
+                       (14, decode_update_add_htlcs_opt, option),
                });
 
                Ok(())
@@ -10808,6 +10850,7 @@ where
                let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
                let mut events_override = None;
                let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
+               let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
                read_tlv_fields!(reader, {
                        (1, pending_outbound_payments_no_retry, option),
                        (2, pending_intercepted_htlcs, option),
@@ -10821,7 +10864,9 @@ where
                        (10, in_flight_monitor_updates, option),
                        (11, probing_cookie_secret, option),
                        (13, claimable_htlc_onion_fields, optional_vec),
+                       (14, decode_update_add_htlcs, option),
                });
+               let decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
                if fake_scid_rand_bytes.is_none() {
                        fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
                }
@@ -11356,6 +11401,7 @@ where
                        pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),
 
                        forward_htlcs: Mutex::new(forward_htlcs),
+                       decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
                        claimable_payments: Mutex::new(ClaimablePayments { claimable_payments, pending_claiming_payments: pending_claiming_payments.unwrap() }),
                        outbound_scid_aliases: Mutex::new(outbound_scid_aliases),
                        outpoint_to_peer: Mutex::new(outpoint_to_peer),