Maintain order of yielded claim events
authorWilmer Paulino <wilmer@wilmerpaulino.com>
Tue, 14 Feb 2023 23:49:37 +0000 (15:49 -0800)
committerWilmer Paulino <wilmer@wilmerpaulino.com>
Mon, 20 Mar 2023 18:32:14 +0000 (11:32 -0700)
Since the claim events are stored internally within a HashMap, they will
be yielded in a random order once dispatched. Claim events may be
invalidated if a conflicting claim has confirmed on-chain and we need to
generate a new claim event; the randomized order could result in the
new claim event being handled prior to the previous. To maintain the
order in which the claim events are generated, we track them in a Vec
instead and ensure only one instance of a PackageId only ever exists
within it.

This would have certain performance implications, but since we're
bounded by the total number of HTLCs in a commitment anyway, we're
comfortable with taking the cost.

lightning/src/chain/onchaintx.rs

index faf3fe12f60b1787494d803b77ce6abbe31b6017..2c570f580bd32153bb0488fd454a8345368f1436 100644 (file)
@@ -248,8 +248,19 @@ pub struct OnchainTxHandler<ChannelSigner: WriteableEcdsaChannelSigner> {
        pub(crate) pending_claim_requests: HashMap<PackageID, PackageTemplate>,
        #[cfg(not(test))]
        pending_claim_requests: HashMap<PackageID, PackageTemplate>,
+
+       // Used to track external events that need to be forwarded to the `ChainMonitor`. This `Vec`
+       // essentially acts as an insertion-ordered `HashMap` – there should only ever be one occurrence
+       // of a `PackageID`, which tracks its latest `ClaimEvent`, i.e., if a pending claim exists, and
+       // a new block has been connected, resulting in a new claim, the previous will be replaced with
+       // the new.
+       //
+       // These external events may be generated in the following cases:
+       //      - A channel has been force closed by broadcasting the holder's latest commitment transaction
+       //      - A block being connected/disconnected
+       //      - Learning the preimage for an HTLC we can claim onchain
        #[cfg(anchors)]
-       pending_claim_events: HashMap<PackageID, ClaimEvent>,
+       pending_claim_events: Vec<(PackageID, ClaimEvent)>,
 
        // Used to link outpoints claimed in a connected block to a pending claim request. The keys
        // represent the outpoints that our `ChannelMonitor` has detected we have keys/scripts to
@@ -426,7 +437,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
                        pending_claim_requests,
                        onchain_events_awaiting_threshold_conf,
                        #[cfg(anchors)]
-                       pending_claim_events: HashMap::new(),
+                       pending_claim_events: Vec::new(),
                        secp_ctx,
                })
        }
@@ -447,8 +458,7 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                        locktimed_packages: BTreeMap::new(),
                        onchain_events_awaiting_threshold_conf: Vec::new(),
                        #[cfg(anchors)]
-                       pending_claim_events: HashMap::new(),
-
+                       pending_claim_events: Vec::new(),
                        secp_ctx,
                }
        }
@@ -463,9 +473,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
 
        #[cfg(anchors)]
        pub(crate) fn get_and_clear_pending_claim_events(&mut self) -> Vec<ClaimEvent> {
-               let mut ret = HashMap::new();
-               swap(&mut ret, &mut self.pending_claim_events);
-               ret.into_iter().map(|(_, event)| event).collect::<Vec<_>>()
+               let mut events = Vec::new();
+               swap(&mut events, &mut self.pending_claim_events);
+               events.into_iter().map(|(_, event)| event).collect()
        }
 
        /// Lightning security model (i.e being able to redeem/timeout HTLC or penalize counterparty
@@ -709,7 +719,8 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                                                                package_id
                                                        },
                                                };
-                                               self.pending_claim_events.insert(package_id, claim_event);
+                                               debug_assert_eq!(self.pending_claim_events.iter().filter(|entry| entry.0 == package_id).count(), 0);
+                                               self.pending_claim_events.push((package_id, claim_event));
                                                package_id
                                        },
                                };
@@ -794,6 +805,20 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                                                        //TODO: recompute soonest_timelock to avoid wasting a bit on fees
                                                        if at_least_one_drop {
                                                                bump_candidates.insert(*package_id, request.clone());
+                                                               // If we have any pending claim events for the request being updated
+                                                               // that have yet to be consumed, we'll remove them since they will
+                                                               // end up producing an invalid transaction by double spending
+                                                               // input(s) that already have a confirmed spend. If such spend is
+                                                               // reorged out of the chain, then we'll attempt to re-spend the
+                                                               // inputs once we see it.
+                                                               #[cfg(anchors)] {
+                                                                       #[cfg(debug_assertions)] {
+                                                                               let existing = self.pending_claim_events.iter()
+                                                                                       .filter(|entry| entry.0 == *package_id).count();
+                                                                               assert!(existing == 0 || existing == 1);
+                                                                       }
+                                                                       self.pending_claim_events.retain(|entry| entry.0 != *package_id);
+                                                               }
                                                        }
                                                }
                                                break; //No need to iterate further, either tx is our or their
@@ -829,8 +854,14 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                                                                log_debug!(logger, "Removing claim tracking for {} due to maturation of claim package {}.",
                                                                        outpoint, log_bytes!(package_id));
                                                                self.claimable_outpoints.remove(outpoint);
-                                                               #[cfg(anchors)]
-                                                               self.pending_claim_events.remove(&package_id);
+                                                       }
+                                                       #[cfg(anchors)] {
+                                                               #[cfg(debug_assertions)] {
+                                                                       let num_existing = self.pending_claim_events.iter()
+                                                                               .filter(|entry| entry.0 == package_id).count();
+                                                                       assert!(num_existing == 0 || num_existing == 1);
+                                                               }
+                                                               self.pending_claim_events.retain(|(id, _)| *id != package_id);
                                                        }
                                                }
                                        },
@@ -866,7 +897,13 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                                        #[cfg(anchors)]
                                        OnchainClaim::Event(claim_event) => {
                                                log_info!(logger, "Yielding RBF-bumped onchain event to spend inputs {:?}", request.outpoints());
-                                               self.pending_claim_events.insert(*package_id, claim_event);
+                                               #[cfg(debug_assertions)] {
+                                                       let num_existing = self.pending_claim_events.iter().
+                                                               filter(|entry| entry.0 == *package_id).count();
+                                                       assert!(num_existing == 0 || num_existing == 1);
+                                               }
+                                               self.pending_claim_events.retain(|event| event.0 != *package_id);
+                                               self.pending_claim_events.push((*package_id, claim_event));
                                        },
                                }
                                if let Some(request) = self.pending_claim_requests.get_mut(package_id) {
@@ -930,7 +967,7 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                                self.onchain_events_awaiting_threshold_conf.push(entry);
                        }
                }
-               for ((_package_id, _), request) in bump_candidates.iter_mut() {
+               for ((_package_id, _), ref mut request) in bump_candidates.iter_mut() {
                        if let Some((new_timer, new_feerate, bump_claim)) = self.generate_claim(height, &request, fee_estimator, &&*logger) {
                                request.set_timer(new_timer);
                                request.set_feerate(new_feerate);
@@ -942,7 +979,13 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                                        #[cfg(anchors)]
                                        OnchainClaim::Event(claim_event) => {
                                                log_info!(logger, "Yielding onchain event after reorg to spend inputs {:?}", request.outpoints());
-                                               self.pending_claim_events.insert(_package_id, claim_event);
+                                               #[cfg(debug_assertions)] {
+                                                       let num_existing = self.pending_claim_events.iter()
+                                                               .filter(|entry| entry.0 == *_package_id).count();
+                                                       assert!(num_existing == 0 || num_existing == 1);
+                                               }
+                                               self.pending_claim_events.retain(|event| event.0 != *_package_id);
+                                               self.pending_claim_events.push((*_package_id, claim_event));
                                        },
                                }
                        }