From: Matt Corallo Date: Sat, 20 May 2023 23:31:57 +0000 (+0000) Subject: Move the bucketed history tracking logic into a scoring submodule X-Git-Tag: v0.0.117-alpha1~41^2~1 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=2a1dff4c1007ba777f446dd0b232cc20b900956b;p=rust-lightning Move the bucketed history tracking logic into a scoring submodule --- diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 804e08f27..9c337f9b9 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -649,161 +649,6 @@ impl ProbabilisticScoringDecayParameters { } } -/// Tracks the historical state of a distribution as a weighted average of how much time was spent -/// in each of 8 buckets. -#[derive(Clone, Copy)] -struct HistoricalBucketRangeTracker { - buckets: [u16; 8], -} - -impl HistoricalBucketRangeTracker { - fn new() -> Self { Self { buckets: [0; 8] } } - fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) { - // We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time - // we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part. - // - // Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to - // the buckets for the current min and max liquidity offset positions. - // - // We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a - // non-power-of-two). This ensures we can't actually overflow the u16 - when we get to - // 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457. - // - // In total, this allows us to track data for the last 8,000 or so payments across a given - // channel. - // - // These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead, - // and need to balance having more bits in the decimal part (to ensure decay isn't too - // non-linear) with having too few bits in the mantissa, causing us to not store very many - // datapoints. - // - // The constants were picked experimentally, selecting a decay amount that restricts us - // from overflowing buckets without having to cap them manually. - - // Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or - // the channel's capacity, though the second should generally never happen. - debug_assert!(liquidity_offset_msat <= capacity_msat); - let bucket_idx: u8 = (liquidity_offset_msat * 8 / capacity_msat.saturating_add(1)) - .try_into().unwrap_or(32); // 32 is bogus for 8 buckets, and will be ignored - debug_assert!(bucket_idx < 8); - if bucket_idx < 8 { - for e in self.buckets.iter_mut() { - *e = ((*e as u32) * 2047 / 2048) as u16; - } - self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32); - } - } - /// Decay all buckets by the given number of half-lives. Used to more aggressively remove old - /// datapoints as we receive newer information. - fn time_decay_data(&mut self, half_lives: u32) { - for e in self.buckets.iter_mut() { - *e = e.checked_shr(half_lives).unwrap_or(0); - } - } -} - -impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) }); - -struct HistoricalMinMaxBuckets<'a> { - min_liquidity_offset_history: &'a HistoricalBucketRangeTracker, - max_liquidity_offset_history: &'a HistoricalBucketRangeTracker, -} - -impl HistoricalMinMaxBuckets<'_> { - #[inline] - fn get_decayed_buckets(&self, now: T, last_updated: T, half_life: Duration) - -> ([u16; 8], [u16; 8], u32) { - let required_decays = now.duration_since(last_updated).as_secs() - .checked_div(half_life.as_secs()) - .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32); - let mut min_buckets = *self.min_liquidity_offset_history; - min_buckets.time_decay_data(required_decays); - let mut max_buckets = *self.max_liquidity_offset_history; - max_buckets.time_decay_data(required_decays); - (min_buckets.buckets, max_buckets.buckets, required_decays) - } - - #[inline] - fn calculate_success_probability_times_billion( - &self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64) - -> Option { - // If historical penalties are enabled, calculate the penalty by walking the set of - // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for - // each, calculate the probability of success given our payment amount, then total the - // weighted average probability of success. - // - // We use a sliding scale to decide which point within a given bucket will be compared to - // the amount being sent - for lower-bounds, the amount being sent is compared to the lower - // edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last - // bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the - // comparison point by 1/64th. For upper-bounds, the same applies, however with an offset - // of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign - // penalties to channels at the edges. - // - // If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to - // such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats - // for a 1 BTC channel!). - // - // If we used the middle of each bucket we'd never assign any penalty at all when sending - // less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket. - let mut total_valid_points_tracked = 0; - - let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 { - (amount_msat * 64 / capacity_msat.saturating_add(1)) - .try_into().unwrap_or(65) - } else { - // Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit - // division. This branch should only be hit in fuzz testing since the amount would - // need to be over 2.88 million BTC in practice. - ((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1)) - .try_into().unwrap_or(65) - }; - #[cfg(not(fuzzing))] - debug_assert!(payment_amt_64th_bucket <= 64); - if payment_amt_64th_bucket >= 64 { return None; } - - // Check if all our buckets are zero, once decayed and treat it as if we had no data. We - // don't actually use the decayed buckets, though, as that would lose precision. - let (decayed_min_buckets, decayed_max_buckets, required_decays) = - self.get_decayed_buckets(now, last_updated, half_life); - if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) { - return None; - } - - for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { - for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) { - total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64); - } - } - // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat - // it as if we were fully decayed. - if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 { - return None; - } - - let mut cumulative_success_prob_times_billion = 0; - for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { - for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) { - let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64) - * 1024 * 1024 / total_valid_points_tracked; - let min_64th_bucket = min_idx as u8 * 9; - let max_64th_bucket = (7 - max_idx as u8) * 9 + 1; - if payment_amt_64th_bucket > max_64th_bucket { - // Success probability 0, the payment amount is above the max liquidity - } else if payment_amt_64th_bucket <= min_64th_bucket { - cumulative_success_prob_times_billion += bucket_prob_times_million * 1024; - } else { - cumulative_success_prob_times_billion += bucket_prob_times_million * - ((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 / - ((max_64th_bucket - min_64th_bucket) as u64); - } - } - } - - Some(cumulative_success_prob_times_billion) - } -} - /// Accounting for channel liquidity balance uncertainty. /// /// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the @@ -1704,6 +1549,166 @@ mod approx { } } +mod bucketed_history { + use super::*; + + /// Tracks the historical state of a distribution as a weighted average of how much time was spent + /// in each of 8 buckets. + #[derive(Clone, Copy)] + pub(super) struct HistoricalBucketRangeTracker { + buckets: [u16; 8], + } + + impl HistoricalBucketRangeTracker { + pub(super) fn new() -> Self { Self { buckets: [0; 8] } } + pub(super) fn track_datapoint(&mut self, liquidity_offset_msat: u64, capacity_msat: u64) { + // We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time + // we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part. + // + // Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to + // the buckets for the current min and max liquidity offset positions. + // + // We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a + // non-power-of-two). This ensures we can't actually overflow the u16 - when we get to + // 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457. + // + // In total, this allows us to track data for the last 8,000 or so payments across a given + // channel. + // + // These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead, + // and need to balance having more bits in the decimal part (to ensure decay isn't too + // non-linear) with having too few bits in the mantissa, causing us to not store very many + // datapoints. + // + // The constants were picked experimentally, selecting a decay amount that restricts us + // from overflowing buckets without having to cap them manually. + + // Ensure the bucket index is in the range [0, 7], even if the liquidity offset is zero or + // the channel's capacity, though the second should generally never happen. + debug_assert!(liquidity_offset_msat <= capacity_msat); + let bucket_idx: u8 = (liquidity_offset_msat * 8 / capacity_msat.saturating_add(1)) + .try_into().unwrap_or(32); // 32 is bogus for 8 buckets, and will be ignored + debug_assert!(bucket_idx < 8); + if bucket_idx < 8 { + for e in self.buckets.iter_mut() { + *e = ((*e as u32) * 2047 / 2048) as u16; + } + self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32); + } + } + /// Decay all buckets by the given number of half-lives. Used to more aggressively remove old + /// datapoints as we receive newer information. + pub(super) fn time_decay_data(&mut self, half_lives: u32) { + for e in self.buckets.iter_mut() { + *e = e.checked_shr(half_lives).unwrap_or(0); + } + } + } + + impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) }); + + pub(super) struct HistoricalMinMaxBuckets<'a> { + pub(super) min_liquidity_offset_history: &'a HistoricalBucketRangeTracker, + pub(super) max_liquidity_offset_history: &'a HistoricalBucketRangeTracker, + } + + impl HistoricalMinMaxBuckets<'_> { + #[inline] + pub(super) fn get_decayed_buckets(&self, now: T, last_updated: T, half_life: Duration) + -> ([u16; 8], [u16; 8], u32) { + let required_decays = now.duration_since(last_updated).as_secs() + .checked_div(half_life.as_secs()) + .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32); + let mut min_buckets = *self.min_liquidity_offset_history; + min_buckets.time_decay_data(required_decays); + let mut max_buckets = *self.max_liquidity_offset_history; + max_buckets.time_decay_data(required_decays); + (min_buckets.buckets, max_buckets.buckets, required_decays) + } + + #[inline] + pub(super) fn calculate_success_probability_times_billion( + &self, now: T, last_updated: T, half_life: Duration, amount_msat: u64, capacity_msat: u64) + -> Option { + // If historical penalties are enabled, calculate the penalty by walking the set of + // historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for + // each, calculate the probability of success given our payment amount, then total the + // weighted average probability of success. + // + // We use a sliding scale to decide which point within a given bucket will be compared to + // the amount being sent - for lower-bounds, the amount being sent is compared to the lower + // edge of the first bucket (i.e. zero), but compared to the upper 7/8ths of the last + // bucket (i.e. 9 times the index, or 63), with each bucket in between increasing the + // comparison point by 1/64th. For upper-bounds, the same applies, however with an offset + // of 1/64th (i.e. starting at one and ending at 64). This avoids failing to assign + // penalties to channels at the edges. + // + // If we used the bottom edge of buckets, we'd end up never assigning any penalty at all to + // such a channel when sending less than ~0.19% of the channel's capacity (e.g. ~200k sats + // for a 1 BTC channel!). + // + // If we used the middle of each bucket we'd never assign any penalty at all when sending + // less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket. + let mut total_valid_points_tracked = 0; + + let payment_amt_64th_bucket: u8 = if amount_msat < u64::max_value() / 64 { + (amount_msat * 64 / capacity_msat.saturating_add(1)) + .try_into().unwrap_or(65) + } else { + // Only use 128-bit arithmetic when multiplication will overflow to avoid 128-bit + // division. This branch should only be hit in fuzz testing since the amount would + // need to be over 2.88 million BTC in practice. + ((amount_msat as u128) * 64 / (capacity_msat as u128).saturating_add(1)) + .try_into().unwrap_or(65) + }; + #[cfg(not(fuzzing))] + debug_assert!(payment_amt_64th_bucket <= 64); + if payment_amt_64th_bucket >= 64 { return None; } + + // Check if all our buckets are zero, once decayed and treat it as if we had no data. We + // don't actually use the decayed buckets, though, as that would lose precision. + let (decayed_min_buckets, decayed_max_buckets, required_decays) = + self.get_decayed_buckets(now, last_updated, half_life); + if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) { + return None; + } + + for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { + for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) { + total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64); + } + } + // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat + // it as if we were fully decayed. + if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 { + return None; + } + + let mut cumulative_success_prob_times_billion = 0; + for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { + for (max_idx, max_bucket) in self.max_liquidity_offset_history.buckets.iter().enumerate().take(8 - min_idx) { + let bucket_prob_times_million = (*min_bucket as u64) * (*max_bucket as u64) + * 1024 * 1024 / total_valid_points_tracked; + let min_64th_bucket = min_idx as u8 * 9; + let max_64th_bucket = (7 - max_idx as u8) * 9 + 1; + if payment_amt_64th_bucket > max_64th_bucket { + // Success probability 0, the payment amount is above the max liquidity + } else if payment_amt_64th_bucket <= min_64th_bucket { + cumulative_success_prob_times_billion += bucket_prob_times_million * 1024; + } else { + cumulative_success_prob_times_billion += bucket_prob_times_million * + ((max_64th_bucket - payment_amt_64th_bucket) as u64) * 1024 / + ((max_64th_bucket - min_64th_bucket) as u64); + } + } + } + + Some(cumulative_success_prob_times_billion) + } + } +} +use bucketed_history::{HistoricalBucketRangeTracker, HistoricalMinMaxBuckets}; + impl>, L: Deref, T: Time> Writeable for ProbabilisticScorerUsingTime where L::Target: Logger { #[inline] fn write(&self, w: &mut W) -> Result<(), io::Error> {