Decay historical liquidity tracking when no new data is added 2022-07-history-tracking
authorMatt Corallo <git@bluematt.me>
Mon, 22 Aug 2022 22:41:31 +0000 (22:41 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 6 Oct 2022 21:10:23 +0000 (21:10 +0000)
To avoid scoring based on incredibly old historical liquidity data,
we add a new half-life here which is used to (very slowly) decay
historical liquidity tracking buckets.

lightning/src/routing/scoring.rs

index c8cf3583cd4b6533fd3aae281f37c8fc624fd818..73bbbe460e159378e00d12b6f41a3685bbefc1bb 100644 (file)
@@ -62,7 +62,7 @@ use util::logger::Logger;
 use util::time::Time;
 
 use prelude::*;
-use core::fmt;
+use core::{cmp, fmt};
 use core::cell::{RefCell, RefMut};
 use core::convert::TryInto;
 use core::ops::{Deref, DerefMut};
@@ -436,6 +436,16 @@ pub struct ProbabilisticScoringParameters {
        /// [`liquidity_penalty_amount_multiplier_msat`]: Self::liquidity_penalty_amount_multiplier_msat
        pub historical_liquidity_penalty_amount_multiplier_msat: u64,
 
+       /// If we aren't learning any new datapoints for a channel, the historical liquidity bounds
+       /// tracking can simply live on with increasingly stale data. Instead, when a channel has not
+       /// seen a liquidity estimate update for this amount of time, the historical datapoints are
+       /// decayed by half.
+       ///
+       /// Note that after 16 or more half lives all historical data will be completely gone.
+       ///
+       /// Default value: 14 days
+       pub historical_no_updates_half_life: Duration,
+
        /// Manual penalties used for the given nodes. Allows to set a particular penalty for a given
        /// node. Note that a manual penalty of `u64::max_value()` means the node would not ever be
        /// considered during path finding.
@@ -509,10 +519,89 @@ impl HistoricalBucketRangeTracker {
                        self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
                }
        }
+       /// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
+       /// datapoints as we receive newer information.
+       fn time_decay_data(&mut self, half_lives: u32) {
+               for e in self.buckets.iter_mut() {
+                       *e = e.checked_shr(half_lives).unwrap_or(0);
+               }
+       }
 }
 
 impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
 
+struct HistoricalMinMaxBuckets<'a> {
+       min_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
+       max_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
+}
+
+impl HistoricalMinMaxBuckets<'_> {
+       #[inline]
+       fn calculate_success_probability_times_billion(&self, required_decays: u32, payment_amt_64th_bucket: u8) -> Option<u64> {
+               // If historical penalties are enabled, calculate the penalty by walking the set of
+               // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for
+               // each, calculate the probability of success given our payment amount, then total the
+               // weighted average probability of success.
+               //
+               // We use a sliding scale to decide which point within a given bucket will be compared to
+               // the amount being sent - for lower-bounds, the amount being sent is compared to the lower
+               // edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last
+               // bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the
+               // comparison point by 1/64th. For upper-bounds, the same applies, however with an offset
+               // of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign
+               // penalties to channels at the edges.
+               //
+               // If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to
+               // such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats
+               // for a 1 BTC channel!).
+               //
+               // If we used the middle of each bucket we'd never assign any penalty at all when sending
+               // less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket.
+               let mut total_valid_points_tracked = 0;
+
+               // Rather than actually decaying the individual buckets, which would lose precision, we
+               // simply track whether all buckets would be decayed to zero, in which case we treat it as
+               // if we had no data.
+               let mut is_fully_decayed = true;
+               let mut check_track_bucket_contains_undecayed_points =
+                       |bucket_val: u16| if bucket_val.checked_shr(required_decays).unwrap_or(0) > 0 { is_fully_decayed = false; };
+
+               for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
+                       check_track_bucket_contains_undecayed_points(*min_bucket);
+                       for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
+                               total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
+                               check_track_bucket_contains_undecayed_points(*max_bucket);
+                       }
+               }
+               // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat
+               // it as if we were fully decayed.
+               if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 || is_fully_decayed {
+                       return None;
+               }
+
+               let mut cumulative_success_prob_times_billion = 0;
+               for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
+                       for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
+                               let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
+                                       * 1024 * 1024 / total_valid_points_tracked;
+                               let min_64th_bucket = min_idx as u8 * 9;
+                               let max_64th_bucket = (7 - max_idx as u8) * 9 + 1;
+                               if payment_amt_64th_bucket > max_64th_bucket {
+                                       // Success probability 0, the payment amount is above the max liquidity
+                               } else if payment_amt_64th_bucket <= min_64th_bucket {
+                                       cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
+                               } else {
+                                       cumulative_success_prob_times_billion += bucket_prob_times_million *
+                                               ((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 /
+                                               ((max_64th_bucket - min_64th_bucket) as u64);
+                               }
+                       }
+               }
+
+               Some(cumulative_success_prob_times_billion)
+       }
+}
+
 /// Accounting for channel liquidity balance uncertainty.
 ///
 /// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the
@@ -645,6 +734,7 @@ impl ProbabilisticScoringParameters {
                        liquidity_penalty_amount_multiplier_msat: 0,
                        historical_liquidity_penalty_multiplier_msat: 0,
                        historical_liquidity_penalty_amount_multiplier_msat: 0,
+                       historical_no_updates_half_life: Duration::from_secs(60 * 60 * 24 * 14),
                        manual_node_penalties: HashMap::new(),
                        anti_probing_penalty_msat: 0,
                        considered_impossible_penalty_msat: 0,
@@ -670,6 +760,7 @@ impl Default for ProbabilisticScoringParameters {
                        liquidity_penalty_amount_multiplier_msat: 192,
                        historical_liquidity_penalty_multiplier_msat: 10_000,
                        historical_liquidity_penalty_amount_multiplier_msat: 64,
+                       historical_no_updates_half_life: Duration::from_secs(60 * 60 * 24 * 14),
                        manual_node_penalties: HashMap::new(),
                        anti_probing_penalty_msat: 250,
                        considered_impossible_penalty_msat: 1_0000_0000_000,
@@ -791,35 +882,27 @@ impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>,
 
                if params.historical_liquidity_penalty_multiplier_msat != 0 ||
                   params.historical_liquidity_penalty_amount_multiplier_msat != 0 {
-                       // If historical penalties are enabled, calculate the penalty by walking the set of
-                       // historical liquidity bucket (min, max) combinations (where min_idx < max_idx)
-                       // and, for each, calculate the probability of success given our payment amount, then
-                       // total the weighted average probability of success.
-                       //
-                       // We use a sliding scale to decide which point within a given bucket will be compared
-                       // to the amount being sent - for lower-bounds, the amount being sent is compared to
-                       // the lower edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of
-                       // the last bucket (i.e. 9 times the index, or 63), with each bucket in between
-                       // increasing the comparison point by 1/64th. For upper-bounds, the same applies,
-                       // however with an offset of 1/64th (i.e. starting at one and ending at 64). This
-                       // avoids failing to assign penalties to channels at the edges.
-                       //
-                       // If we used the bottom edge of buckets, we'd end up never assigning any penalty at
-                       // all to such a channel when sending less than ~0.19% of the channel's capacity (e.g.
-                       // ~200k sats for a 1 BTC channel!).
-                       //
-                       // If we used the middle of each bucket we'd never assign any penalty at all when
-                       // sending less than 1/16th of a channel's capacity, or 1/8th if we used the top of the
-                       // bucket.
-                       let mut total_valid_points_tracked = 0;
-                       for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
-                               for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
-                                       total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
-                               }
-                       }
-                       if total_valid_points_tracked == 0 {
-                               // If we don't have any valid points, redo the non-historical calculation with no
-                               // liquidity bounds tracked and the historical penalty multipliers.
+                       let required_decays = self.now.duration_since(*self.last_updated).as_secs()
+                               .checked_div(params.historical_no_updates_half_life.as_secs())
+                               .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
+                       let payment_amt_64th_bucket = amount_msat * 64 / self.capacity_msat;
+                       debug_assert!(payment_amt_64th_bucket <= 64);
+                       if payment_amt_64th_bucket > 64 { return res; }
+
+                       let buckets = HistoricalMinMaxBuckets {
+                               min_liquidity_offset_history: &self.min_liquidity_offset_history,
+                               max_liquidity_offset_history: &self.max_liquidity_offset_history,
+                       };
+                       if let Some(cumulative_success_prob_times_billion) = buckets
+                                       .calculate_success_probability_times_billion(required_decays, payment_amt_64th_bucket as u8) {
+                               let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024);
+                               res = res.saturating_add(Self::combined_penalty_msat(amount_msat,
+                                       historical_negative_log10_times_2048, params.historical_liquidity_penalty_multiplier_msat,
+                                       params.historical_liquidity_penalty_amount_multiplier_msat));
+                       } else {
+                               // If we don't have any valid points (or, once decayed, we have less than a full
+                               // point), redo the non-historical calculation with no liquidity bounds tracked and
+                               // the historical penalty multipliers.
                                let max_capacity = self.capacity_msat.saturating_sub(amount_msat).saturating_add(1);
                                let negative_log10_times_2048 =
                                        approx::negative_log10_times_2048(max_capacity, self.capacity_msat.saturating_add(1));
@@ -828,33 +911,6 @@ impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>,
                                        params.historical_liquidity_penalty_amount_multiplier_msat));
                                return res;
                        }
-
-                       let payment_amt_64th_bucket = amount_msat * 64 / self.capacity_msat;
-                       debug_assert!(payment_amt_64th_bucket <= 64);
-                       if payment_amt_64th_bucket > 64 { return res; }
-
-                       let mut cumulative_success_prob_times_billion = 0;
-                       for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
-                               for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
-                                       let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
-                                               * 1024 * 1024 / total_valid_points_tracked;
-                                       let min_64th_bucket = min_idx as u64 * 9;
-                                       let max_64th_bucket = (7 - max_idx as u64) * 9 + 1;
-                                       if payment_amt_64th_bucket > max_64th_bucket {
-                                               // Success probability 0, the payment amount is above the max liquidity
-                                       } else if payment_amt_64th_bucket <= min_64th_bucket {
-                                               cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
-                                       } else {
-                                               cumulative_success_prob_times_billion += bucket_prob_times_million *
-                                                       (max_64th_bucket - payment_amt_64th_bucket) * 1024 /
-                                                       (max_64th_bucket - min_64th_bucket);
-                                       }
-                               }
-                       }
-                       let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024);
-                       res = res.saturating_add(Self::combined_penalty_msat(amount_msat,
-                               historical_negative_log10_times_2048, params.historical_liquidity_penalty_multiplier_msat,
-                               params.historical_liquidity_penalty_amount_multiplier_msat));
                }
 
                res
@@ -927,6 +983,12 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
        }
 
        fn update_history_buckets(&mut self) {
+               let half_lives = self.now.duration_since(*self.last_updated).as_secs()
+                       .checked_div(self.params.historical_no_updates_half_life.as_secs())
+                       .map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value());
+               self.min_liquidity_offset_history.time_decay_data(half_lives);
+               self.max_liquidity_offset_history.time_decay_data(half_lives);
+
                debug_assert!(*self.min_liquidity_offset_msat <= self.capacity_msat);
                self.min_liquidity_offset_history.track_datapoint(
                        // Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset
@@ -949,8 +1011,8 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
                } else {
                        self.decayed_offset_msat(*self.max_liquidity_offset_msat)
                };
-               *self.last_updated = self.now;
                self.update_history_buckets();
+               *self.last_updated = self.now;
        }
 
        /// Adjusts the upper bound of the channel liquidity balance in this direction.
@@ -961,8 +1023,8 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
                } else {
                        self.decayed_offset_msat(*self.min_liquidity_offset_msat)
                };
-               *self.last_updated = self.now;
                self.update_history_buckets();
+               *self.last_updated = self.now;
        }
 }
 
@@ -2479,6 +2541,7 @@ mod tests {
                let params = ProbabilisticScoringParameters {
                        historical_liquidity_penalty_multiplier_msat: 1024,
                        historical_liquidity_penalty_amount_multiplier_msat: 1024,
+                       historical_no_updates_half_life: Duration::from_secs(10),
                        ..ProbabilisticScoringParameters::zero_penalty()
                };
                let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger);
@@ -2500,6 +2563,11 @@ mod tests {
                // still remember that there was some failure in the past, and assign a non-0 penalty.
                scorer.payment_path_failed(&payment_path_for_amount(1000).iter().collect::<Vec<_>>(), 43);
                assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 198);
+
+               // Advance the time forward 16 half-lives (which the docs claim will ensure all data is
+               // gone), and check that we're back to where we started.
+               SinceEpoch::advance(Duration::from_secs(10 * 16));
+               assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 47);
        }
 
        #[test]