From 0b38b3981d4f89fe3dc302c3156e79897da50081 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 7 Mar 2024 23:03:25 -0800 Subject: [PATCH] Track pending update_add_htlcs in ChannelManager for later processing We plan to decode the onions of these `update_add_htlc`s as part of the HTLC forwarding flow (i.e., `process_pending_htlc_forwards`), so we'll need to track them per-channel at the `ChannelManager` level. --- lightning/src/ln/channel.rs | 9 +++- lightning/src/ln/channelmanager.rs | 72 ++++++++++++++++++++++++------ 2 files changed, 66 insertions(+), 15 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index cadb48b49..31b2dab38 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -1072,6 +1072,7 @@ pub(super) struct MonitorRestoreUpdates { pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>, pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, pub finalized_claimed_htlcs: Vec, + pub pending_update_adds: Vec, pub funding_broadcastable: Option, pub channel_ready: Option, pub announcement_sigs: Option, @@ -5251,13 +5252,16 @@ impl Channel where mem::swap(&mut failed_htlcs, &mut self.context.monitor_pending_failures); let mut finalized_claimed_htlcs = Vec::new(); mem::swap(&mut finalized_claimed_htlcs, &mut self.context.monitor_pending_finalized_fulfills); + let mut pending_update_adds = Vec::new(); + mem::swap(&mut pending_update_adds, &mut self.context.monitor_pending_update_adds); if self.context.channel_state.is_peer_disconnected() { self.context.monitor_pending_revoke_and_ack = false; self.context.monitor_pending_commitment_signed = false; return MonitorRestoreUpdates { raa: None, commitment_update: None, order: RAACommitmentOrder::RevokeAndACKFirst, - accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, channel_ready, announcement_sigs + accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, pending_update_adds, + funding_broadcastable, channel_ready, announcement_sigs }; } @@ -5279,7 +5283,8 @@ impl Channel where if commitment_update.is_some() { "a" } else { "no" }, if raa.is_some() { "an" } else { "no" }, match order { RAACommitmentOrder::CommitmentFirst => "commitment", RAACommitmentOrder::RevokeAndACKFirst => "RAA"}); MonitorRestoreUpdates { - raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, channel_ready, announcement_sigs + raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, + pending_update_adds, funding_broadcastable, channel_ready, announcement_sigs } } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 505337865..3afc61095 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1181,6 +1181,8 @@ where // | | // | |__`pending_intercepted_htlcs` // | +// |__`decode_update_add_htlcs` +// | // |__`per_peer_state` // | // |__`pending_inbound_payments` @@ -1271,6 +1273,18 @@ where /// See `ChannelManager` struct-level documentation for lock order requirements. pending_intercepted_htlcs: Mutex>, + /// SCID/SCID Alias -> pending `update_add_htlc`s to decode. + /// + /// Note that because we may have an SCID Alias as the key we can have two entries per channel, + /// though in practice we probably won't be receiving HTLCs for a channel both via the alias + /// and via the classic SCID. + /// + /// Note that no consistency guarantees are made about the existence of a channel with the + /// `short_channel_id` here, nor the `channel_id` in `UpdateAddHTLC`! + /// + /// See `ChannelManager` struct-level documentation for lock order requirements. + decode_update_add_htlcs: Mutex>>, + /// The sets of payments which are claimable or currently being claimed. See /// [`ClaimablePayments`]' individual field docs for more info. /// @@ -2238,9 +2252,9 @@ macro_rules! handle_monitor_update_completion { let update_actions = $peer_state.monitor_update_blocked_actions .remove(&$chan.context.channel_id()).unwrap_or(Vec::new()); - let htlc_forwards = $self.handle_channel_resumption( + let (htlc_forwards, decode_update_add_htlcs) = $self.handle_channel_resumption( &mut $peer_state.pending_msg_events, $chan, updates.raa, - updates.commitment_update, updates.order, updates.accepted_htlcs, + updates.commitment_update, updates.order, updates.accepted_htlcs, updates.pending_update_adds, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs); if let Some(upd) = channel_update { @@ -2301,6 +2315,9 @@ macro_rules! handle_monitor_update_completion { if let Some(forwards) = htlc_forwards { $self.forward_htlcs(&mut [forwards][..]); } + if let Some(decode) = decode_update_add_htlcs { + $self.push_decode_update_add_htlcs(decode); + } $self.finalize_claims(updates.finalized_claimed_htlcs); for failure in updates.failed_htlcs.drain(..) { let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; @@ -2477,6 +2494,7 @@ where pending_inbound_payments: Mutex::new(new_hash_map()), pending_outbound_payments: OutboundPayments::new(), forward_htlcs: Mutex::new(new_hash_map()), + decode_update_add_htlcs: Mutex::new(new_hash_map()), claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }), pending_intercepted_htlcs: Mutex::new(new_hash_map()), outpoint_to_peer: Mutex::new(new_hash_map()), @@ -5929,24 +5947,31 @@ where fn handle_channel_resumption(&self, pending_msg_events: &mut Vec, channel: &mut Channel, raa: Option, commitment_update: Option, order: RAACommitmentOrder, - pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option, + pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec, + funding_broadcastable: Option, channel_ready: Option, announcement_sigs: Option) - -> Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> { + -> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec)>) { let logger = WithChannelContext::from(&self.logger, &channel.context); - log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement", + log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement", &channel.context.channel_id(), if raa.is_some() { "an" } else { "no" }, - if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(), + if commitment_update.is_some() { "a" } else { "no" }, + pending_forwards.len(), pending_update_adds.len(), if funding_broadcastable.is_some() { "" } else { "not " }, if channel_ready.is_some() { "sending" } else { "without" }, if announcement_sigs.is_some() { "sending" } else { "without" }); - let mut htlc_forwards = None; - let counterparty_node_id = channel.context.get_counterparty_node_id(); + let short_channel_id = channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias()); + + let mut htlc_forwards = None; if !pending_forwards.is_empty() { - htlc_forwards = Some((channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias()), - channel.context.get_funding_txo().unwrap(), channel.context.channel_id(), channel.context.get_user_id(), pending_forwards)); + htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(), + channel.context.channel_id(), channel.context.get_user_id(), pending_forwards)); + } + let mut decode_update_add_htlcs = None; + if !pending_update_adds.is_empty() { + decode_update_add_htlcs = Some((short_channel_id, pending_update_adds)); } if let Some(msg) = channel_ready { @@ -5997,7 +6022,7 @@ where emit_channel_ready_event!(pending_events, channel); } - htlc_forwards + (htlc_forwards, decode_update_add_htlcs) } fn channel_monitor_updated(&self, funding_txo: &OutPoint, channel_id: &ChannelId, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) { @@ -6971,6 +6996,15 @@ where } } + fn push_decode_update_add_htlcs(&self, mut update_add_htlcs: (u64, Vec)) { + let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap(); + let scid = update_add_htlcs.0; + match decode_update_add_htlcs.entry(scid) { + hash_map::Entry::Occupied(mut e) => { e.get_mut().append(&mut update_add_htlcs.1); }, + hash_map::Entry::Vacant(e) => { e.insert(update_add_htlcs.1); }, + } + } + #[inline] fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) { for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards { @@ -7307,10 +7341,11 @@ where } } let need_lnd_workaround = chan.context.workaround_lnd_bug_4006.take(); - let htlc_forwards = self.handle_channel_resumption( + let (htlc_forwards, decode_update_add_htlcs) = self.handle_channel_resumption( &mut peer_state.pending_msg_events, chan, responses.raa, responses.commitment_update, responses.order, - Vec::new(), None, responses.channel_ready, responses.announcement_sigs); + Vec::new(), Vec::new(), None, responses.channel_ready, responses.announcement_sigs); debug_assert!(htlc_forwards.is_none()); + debug_assert!(decode_update_add_htlcs.is_none()); if let Some(upd) = channel_update { peer_state.pending_msg_events.push(upd); } @@ -10192,6 +10227,12 @@ where } } + let mut decode_update_add_htlcs_opt = None; + let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap(); + if !decode_update_add_htlcs.is_empty() { + decode_update_add_htlcs_opt = Some(decode_update_add_htlcs); + } + let per_peer_state = self.per_peer_state.write().unwrap(); let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap(); @@ -10343,6 +10384,7 @@ where (10, in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), (13, htlc_onion_fields, optional_vec), + (14, decode_update_add_htlcs_opt, option), }); Ok(()) @@ -10808,6 +10850,7 @@ where let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); let mut events_override = None; let mut in_flight_monitor_updates: Option>> = None; + let mut decode_update_add_htlcs: Option>> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -10821,7 +10864,9 @@ where (10, in_flight_monitor_updates, option), (11, probing_cookie_secret, option), (13, claimable_htlc_onion_fields, optional_vec), + (14, decode_update_add_htlcs, option), }); + let decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map()); if fake_scid_rand_bytes.is_none() { fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes()); } @@ -11356,6 +11401,7 @@ where pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()), forward_htlcs: Mutex::new(forward_htlcs), + decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs), claimable_payments: Mutex::new(ClaimablePayments { claimable_payments, pending_claiming_payments: pending_claiming_payments.unwrap() }), outbound_scid_aliases: Mutex::new(outbound_scid_aliases), outpoint_to_peer: Mutex::new(outpoint_to_peer), -- 2.39.5