Move the bucketed history tracking logic into a scoring submodule
authorMatt Corallo <git@bluematt.me>
Sat, 20 May 2023 23:31:57 +0000 (23:31 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 23 Aug 2023 21:15:11 +0000 (21:15 +0000)
lightning/src/routing/scoring.rs

index 804e08f278dca86b0b13e3ed3fdcbefa25f790e9..9c337f9b9edbe9ef98ac4c44016233d975d15e24 100644 (file)
@@ -649,161 +649,6 @@ impl ProbabilisticScoringDecayParameters {
        }
 }
 
-/// Tracks the historical state of a distribution as a weighted average of how much time was spent
-/// in each of 8 buckets.
-#[derive(Clone, Copy)]
-struct HistoricalBucketRangeTracker {
-       buckets: [u16; 8],
-}
-
-impl HistoricalBucketRangeTracker {
-       fn new() -> Self { Self { buckets: [0; 8] } }
-       fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) {
-               // We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
-               // we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part.
-               //
-               // Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to
-               // the buckets for the current min and max liquidity offset positions.
-               //
-               // We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a
-               // non-power-of-two). This ensures we can't actually overflow the u16 - when we get to
-               // 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457.
-               //
-               // In total, this allows us to track data for the last 8,000 or so payments across a given
-               // channel.
-               //
-               // These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead,
-               // and need to balance having more bits in the decimal part (to ensure decay isn't too
-               // non-linear) with having too few bits in the mantissa, causing us to not store very many
-               // datapoints.
-               //
-               // The constants were picked experimentally, selecting a decay amount that restricts us
-               // from overflowing buckets without having to cap them manually.
-
-               // Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or
-               // the channel's capacity, though the second should generally never happen.
-               debug_assert!(liquidity_offset_msat <= capacity_msat);
-               let bucket_idx: u8 = (liquidity_offset_msat * 8 / capacity_msat.saturating_add(1))
-                       .try_into().unwrap_or(32); // 32 is bogus for 8 buckets, and will be ignored
-               debug_assert!(bucket_idx < 8);
-               if bucket_idx < 8 {
-                       for e in self.buckets.iter_mut() {
-                               *e = ((*e as u32) * 2047 / 2048) as u16;
-                       }
-                       self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
-               }
-       }
-       /// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
-       /// datapoints as we receive newer information.
-       fn time_decay_data(&mut self, half_lives: u32) {
-               for e in self.buckets.iter_mut() {
-                       *e = e.checked_shr(half_lives).unwrap_or(0);
-               }
-       }
-}
-
-impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
-
-struct HistoricalMinMaxBuckets<'a> {
-       min_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
-       max_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
-}
-
-impl HistoricalMinMaxBuckets<'_> {
-       #[inline]
-       fn get_decayed_buckets<T: Time>(&self, now: T, last_updated: T, half_life: Duration)
-       -> ([u16; 8], [u16; 8], u32) {
-               let required_decays = now.duration_since(last_updated).as_secs()
-                       .checked_div(half_life.as_secs())
-                       .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
-               let mut min_buckets = *self.min_liquidity_offset_history;
-               min_buckets.time_decay_data(required_decays);
-               let mut max_buckets = *self.max_liquidity_offset_history;
-               max_buckets.time_decay_data(required_decays);
-               (min_buckets.buckets, max_buckets.buckets, required_decays)
-       }
-
-       #[inline]
-       fn calculate_success_probability_times_billion<T: Time>(
-               &self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64)
-       -> Option<u64> {
-               // If historical penalties are enabled, calculate the penalty by walking the set of
-               // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for
-               // each, calculate the probability of success given our payment amount, then total the
-               // weighted average probability of success.
-               //
-               // We use a sliding scale to decide which point within a given bucket will be compared to
-               // the amount being sent - for lower-bounds, the amount being sent is compared to the lower
-               // edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last
-               // bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the
-               // comparison point by 1/64th. For upper-bounds, the same applies, however with an offset
-               // of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign
-               // penalties to channels at the edges.
-               //
-               // If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to
-               // such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats
-               // for a 1 BTC channel!).
-               //
-               // If we used the middle of each bucket we'd never assign any penalty at all when sending
-               // less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket.
-               let mut total_valid_points_tracked = 0;
-
-               let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 {
-                       (amount_msat * 64 / capacity_msat.saturating_add(1))
-                               .try_into().unwrap_or(65)
-               } else {
-                       // Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit
-                       // division. This branch should only be hit in fuzz testing since the amount would
-                       // need to be over 2.88 million BTC in practice.
-                       ((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1))
-                               .try_into().unwrap_or(65)
-               };
-               #[cfg(not(fuzzing))]
-               debug_assert!(payment_amt_64th_bucket <= 64);
-               if payment_amt_64th_bucket >= 64 { return None; }
-
-               // Check if all our buckets are zero, once decayed and treat it as if we had no data. We
-               // don't actually use the decayed buckets, though, as that would lose precision.
-               let (decayed_min_buckets, decayed_max_buckets, required_decays) =
-                       self.get_decayed_buckets(now, last_updated, half_life);
-               if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) {
-                       return None;
-               }
-
-               for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
-                       for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
-                               total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
-                       }
-               }
-               // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat
-               // it as if we were fully decayed.
-               if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 {
-                       return None;
-               }
-
-               let mut cumulative_success_prob_times_billion = 0;
-               for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
-                       for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
-                               let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
-                                       * 1024 * 1024 / total_valid_points_tracked;
-                               let min_64th_bucket = min_idx as u8 * 9;
-                               let max_64th_bucket = (7 - max_idx as u8) * 9 + 1;
-                               if payment_amt_64th_bucket > max_64th_bucket {
-                                       // Success probability 0, the payment amount is above the max liquidity
-                               } else if payment_amt_64th_bucket <= min_64th_bucket {
-                                       cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
-                               } else {
-                                       cumulative_success_prob_times_billion += bucket_prob_times_million *
-                                               ((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 /
-                                               ((max_64th_bucket - min_64th_bucket) as u64);
-                               }
-                       }
-               }
-
-               Some(cumulative_success_prob_times_billion)
-       }
-}
-
 /// Accounting for channel liquidity balance uncertainty.
 ///
 /// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the
@@ -1704,6 +1549,166 @@ mod approx {
        }
 }
 
+mod bucketed_history {
+       use super::*;
+
+       /// Tracks the historical state of a distribution as a weighted average of how much time was spent
+       /// in each of 8 buckets.
+       #[derive(Clone, Copy)]
+       pub(super) struct HistoricalBucketRangeTracker {
+               buckets: [u16; 8],
+       }
+
+       impl HistoricalBucketRangeTracker {
+               pub(super) fn new() -> Self { Self { buckets: [0; 8] } }
+               pub(super) fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) {
+                       // We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
+                       // we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part.
+                       //
+                       // Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to
+                       // the buckets for the current min and max liquidity offset positions.
+                       //
+                       // We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a
+                       // non-power-of-two). This ensures we can't actually overflow the u16 - when we get to
+                       // 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457.
+                       //
+                       // In total, this allows us to track data for the last 8,000 or so payments across a given
+                       // channel.
+                       //
+                       // These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead,
+                       // and need to balance having more bits in the decimal part (to ensure decay isn't too
+                       // non-linear) with having too few bits in the mantissa, causing us to not store very many
+                       // datapoints.
+                       //
+                       // The constants were picked experimentally, selecting a decay amount that restricts us
+                       // from overflowing buckets without having to cap them manually.
+
+                       // Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or
+                       // the channel's capacity, though the second should generally never happen.
+                       debug_assert!(liquidity_offset_msat <= capacity_msat);
+                       let bucket_idx: u8 = (liquidity_offset_msat * 8 / capacity_msat.saturating_add(1))
+                               .try_into().unwrap_or(32); // 32 is bogus for 8 buckets, and will be ignored
+                       debug_assert!(bucket_idx < 8);
+                       if bucket_idx < 8 {
+                               for e in self.buckets.iter_mut() {
+                                       *e = ((*e as u32) * 2047 / 2048) as u16;
+                               }
+                               self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
+                       }
+               }
+               /// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
+               /// datapoints as we receive newer information.
+               pub(super) fn time_decay_data(&mut self, half_lives: u32) {
+                       for e in self.buckets.iter_mut() {
+                               *e = e.checked_shr(half_lives).unwrap_or(0);
+                       }
+               }
+       }
+
+       impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
+
+       pub(super) struct HistoricalMinMaxBuckets<'a> {
+               pub(super) min_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
+               pub(super) max_liquidity_offset_history: &'a HistoricalBucketRangeTracker,
+       }
+
+       impl HistoricalMinMaxBuckets<'_> {
+               #[inline]
+               pub(super) fn get_decayed_buckets<T: Time>(&self, now: T, last_updated: T, half_life: Duration)
+               -> ([u16; 8], [u16; 8], u32) {
+                       let required_decays = now.duration_since(last_updated).as_secs()
+                               .checked_div(half_life.as_secs())
+                               .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
+                       let mut min_buckets = *self.min_liquidity_offset_history;
+                       min_buckets.time_decay_data(required_decays);
+                       let mut max_buckets = *self.max_liquidity_offset_history;
+                       max_buckets.time_decay_data(required_decays);
+                       (min_buckets.buckets, max_buckets.buckets, required_decays)
+               }
+
+               #[inline]
+               pub(super) fn calculate_success_probability_times_billion<T: Time>(
+                       &self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64)
+               -> Option<u64> {
+                       // If historical penalties are enabled, calculate the penalty by walking the set of
+                       // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for
+                       // each, calculate the probability of success given our payment amount, then total the
+                       // weighted average probability of success.
+                       //
+                       // We use a sliding scale to decide which point within a given bucket will be compared to
+                       // the amount being sent - for lower-bounds, the amount being sent is compared to the lower
+                       // edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last
+                       // bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the
+                       // comparison point by 1/64th. For upper-bounds, the same applies, however with an offset
+                       // of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign
+                       // penalties to channels at the edges.
+                       //
+                       // If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to
+                       // such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats
+                       // for a 1 BTC channel!).
+                       //
+                       // If we used the middle of each bucket we'd never assign any penalty at all when sending
+                       // less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket.
+                       let mut total_valid_points_tracked = 0;
+
+                       let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 {
+                               (amount_msat * 64 / capacity_msat.saturating_add(1))
+                                       .try_into().unwrap_or(65)
+                       } else {
+                               // Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit
+                               // division. This branch should only be hit in fuzz testing since the amount would
+                               // need to be over 2.88 million BTC in practice.
+                               ((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1))
+                                       .try_into().unwrap_or(65)
+                       };
+                       #[cfg(not(fuzzing))]
+                       debug_assert!(payment_amt_64th_bucket <= 64);
+                       if payment_amt_64th_bucket >= 64 { return None; }
+
+                       // Check if all our buckets are zero, once decayed and treat it as if we had no data. We
+                       // don't actually use the decayed buckets, though, as that would lose precision.
+                       let (decayed_min_buckets, decayed_max_buckets, required_decays) =
+                               self.get_decayed_buckets(now, last_updated, half_life);
+                       if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) {
+                               return None;
+                       }
+
+                       for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
+                               for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
+                                       total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
+                               }
+                       }
+                       // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat
+                       // it as if we were fully decayed.
+                       if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 {
+                               return None;
+                       }
+
+                       let mut cumulative_success_prob_times_billion = 0;
+                       for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
+                               for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) {
+                                       let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64)
+                                               * 1024 * 1024 / total_valid_points_tracked;
+                                       let min_64th_bucket = min_idx as u8 * 9;
+                                       let max_64th_bucket = (7 - max_idx as u8) * 9 + 1;
+                                       if payment_amt_64th_bucket > max_64th_bucket {
+                                               // Success probability 0, the payment amount is above the max liquidity
+                                       } else if payment_amt_64th_bucket <= min_64th_bucket {
+                                               cumulative_success_prob_times_billion += bucket_prob_times_million * 1024;
+                                       } else {
+                                               cumulative_success_prob_times_billion += bucket_prob_times_million *
+                                                       ((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 /
+                                                       ((max_64th_bucket - min_64th_bucket) as u64);
+                                       }
+                               }
+                       }
+
+                       Some(cumulative_success_prob_times_billion)
+               }
+       }
+}
+use bucketed_history::{HistoricalBucketRangeTracker, HistoricalMinMaxBuckets};
+
 impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> Writeable for ProbabilisticScorerUsingTime<G, L, T> where L::Target: Logger {
        #[inline]
        fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {