From 35b49645c4eb0d61c162a502723cd4e44ef0ca1e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 9 Oct 2023 02:14:21 +0000 Subject: [PATCH] Stop decaying historical liquidity information during scoring Because scoring is an incredibly performance-sensitive operation, doing liquidity information decay (and especially fetching the current time!) during scoring isn't really a great idea. Now that we decay liquidity information in the background, we don't have any reason to decay during scoring, and we remove the historical bucket liquidity decaying here. --- lightning/src/routing/scoring.rs | 136 ++++++++++++++----------------- 1 file changed, 59 insertions(+), 77 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 21dee09be..4391da8e9 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -850,8 +850,6 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU /// Note that this writes roughly one line per channel for which we have a liquidity estimate, /// which may be a substantial amount of log output. pub fn debug_log_liquidity_stats(&self) { - let now = T::now(); - let graph = self.network_graph.read_only(); for (scid, liq) in self.channel_liquidities.iter() { if let Some(chan_debug) = graph.channels().get(scid) { @@ -860,10 +858,8 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let amt = directed_info.effective_capacity().as_msat(); let dir_liq = liq.as_directed(source, target, amt, self.decay_params); - let (min_buckets, max_buckets) = dir_liq.liquidity_history - .get_decayed_buckets(now, *dir_liq.offset_history_last_updated, - self.decay_params.historical_no_updates_half_life) - .unwrap_or(([0; 32], [0; 32])); + let min_buckets = &dir_liq.liquidity_history.min_liquidity_offset_history.buckets; + let max_buckets = &dir_liq.liquidity_history.max_liquidity_offset_history.buckets; log_debug!(self.logger, core::concat!( "Liquidity from {} to {} via {} is in the range ({}, {}).\n", @@ -942,7 +938,7 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU /// in the top and bottom bucket, and roughly with similar (recent) frequency. /// /// Because the datapoints are decayed slowly over time, values will eventually return to - /// `Some(([1; 32], [1; 32]))` and then to `None` once no datapoints remain. + /// `Some(([0; 32], [0; 32]))` or `None` if no data remains for a channel. /// /// In order to fetch a single success probability from the buckets provided here, as used in /// the scoring model, see [`Self::historical_estimated_payment_success_probability`]. @@ -956,11 +952,8 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let amt = directed_info.effective_capacity().as_msat(); let dir_liq = liq.as_directed(source, target, amt, self.decay_params); - let (min_buckets, mut max_buckets) = - dir_liq.liquidity_history.get_decayed_buckets( - dir_liq.now, *dir_liq.offset_history_last_updated, - self.decay_params.historical_no_updates_half_life - )?; + let min_buckets = dir_liq.liquidity_history.min_liquidity_offset_history.buckets; + let mut max_buckets = dir_liq.liquidity_history.max_liquidity_offset_history.buckets; // Note that the liquidity buckets are an offset from the edge, so we inverse // the max order to get the probabilities from zero. @@ -991,9 +984,7 @@ impl>, L: Deref, T: Time> ProbabilisticScorerU let dir_liq = liq.as_directed(source, target, capacity_msat, self.decay_params); return dir_liq.liquidity_history.calculate_success_probability_times_billion( - dir_liq.now, *dir_liq.offset_history_last_updated, - self.decay_params.historical_no_updates_half_life, ¶ms, amount_msat, - capacity_msat + ¶ms, amount_msat, capacity_msat ).map(|p| p as f64 / (1024 * 1024 * 1024) as f64); } } @@ -1214,9 +1205,7 @@ impl, BRT: Deref, score_params.historical_liquidity_penalty_amount_multiplier_msat != 0 { if let Some(cumulative_success_prob_times_billion) = self.liquidity_history .calculate_success_probability_times_billion( - self.now, *self.offset_history_last_updated, - self.decay_params.historical_no_updates_half_life, score_params, amount_msat, - self.capacity_msat) + score_params, amount_msat, self.capacity_msat) { let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024); res = res.saturating_add(Self::combined_penalty_msat(amount_msat, @@ -2027,22 +2016,20 @@ mod bucketed_history { } impl> HistoricalMinMaxBuckets { - pub(super) fn get_decayed_buckets(&self, now: T, offset_history_last_updated: T, half_life: Duration) - -> Option<([u16; 32], [u16; 32])> { - let (_, required_decays) = self.get_total_valid_points(now, offset_history_last_updated, half_life)?; - - let mut min_buckets = *self.min_liquidity_offset_history; - min_buckets.time_decay_data(required_decays); - let mut max_buckets = *self.max_liquidity_offset_history; - max_buckets.time_decay_data(required_decays); - Some((min_buckets.buckets, max_buckets.buckets)) - } #[inline] - pub(super) fn get_total_valid_points(&self, now: T, offset_history_last_updated: T, half_life: Duration) - -> Option<(u64, u32)> { - let required_decays = now.duration_since(offset_history_last_updated).as_secs() - .checked_div(half_life.as_secs()) - .map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32); + pub(super) fn calculate_success_probability_times_billion( + &self, params: &ProbabilisticScoringFeeParameters, amount_msat: u64, + capacity_msat: u64 + ) -> Option { + // If historical penalties are enabled, we try to calculate a probability of success + // given our historical distribution of min- and max-liquidity bounds in a channel. + // To do so, we walk the set of historical liquidity bucket (min, max) combinations + // (where min_idx < max_idx, as having a minimum above our maximum is an invalid + // state). For each pair, we calculate the probability as if the bucket's corresponding + // min- and max- liquidity bounds were our current liquidity bounds and then multiply + // that probability by the weight of the selected buckets. + let payment_pos = amount_to_pos(amount_msat, capacity_msat); + if payment_pos >= POSITION_TICKS { return None; } let mut total_valid_points_tracked = 0; for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() { @@ -2054,33 +2041,10 @@ mod bucketed_history { // If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), // treat it as if we were fully decayed. const FULLY_DECAYED: u16 = BUCKET_FIXED_POINT_ONE * BUCKET_FIXED_POINT_ONE; - if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < FULLY_DECAYED.into() { + if total_valid_points_tracked < FULLY_DECAYED.into() { return None; } - Some((total_valid_points_tracked, required_decays)) - } - - #[inline] - pub(super) fn calculate_success_probability_times_billion( - &self, now: T, offset_history_last_updated: T, half_life: Duration, - params: &ProbabilisticScoringFeeParameters, amount_msat: u64, capacity_msat: u64 - ) -> Option { - // If historical penalties are enabled, we try to calculate a probability of success - // given our historical distribution of min- and max-liquidity bounds in a channel. - // To do so, we walk the set of historical liquidity bucket (min, max) combinations - // (where min_idx < max_idx, as having a minimum above our maximum is an invalid - // state). For each pair, we calculate the probability as if the bucket's corresponding - // min- and max- liquidity bounds were our current liquidity bounds and then multiply - // that probability by the weight of the selected buckets. - let payment_pos = amount_to_pos(amount_msat, capacity_msat); - if payment_pos >= POSITION_TICKS { return None; } - - // Check if all our buckets are zero, once decayed and treat it as if we had no data. We - // don't actually use the decayed buckets, though, as that would lose precision. - let (total_valid_points_tracked, _) - = self.get_total_valid_points(now, offset_history_last_updated, half_life)?; - let mut cumulative_success_prob_times_billion = 0; // Special-case the 0th min bucket - it generally means we failed a payment, so only // consider the highest (i.e. largest-offset-from-max-capacity) max bucket for all @@ -3012,19 +2976,9 @@ mod tests { let usage = ChannelUsage { amount_msat: 896, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); - // No decay - SinceEpoch::advance(Duration::from_secs(4)); - let usage = ChannelUsage { amount_msat: 128, ..usage }; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); - let usage = ChannelUsage { amount_msat: 256, ..usage }; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 93); - let usage = ChannelUsage { amount_msat: 768, ..usage }; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 1_479); - let usage = ChannelUsage { amount_msat: 896, ..usage }; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); - // Half decay (i.e., three-quarter life) - SinceEpoch::advance(Duration::from_secs(1)); + SinceEpoch::advance(Duration::from_secs(5)); + scorer.time_passed(Duration::from_secs(5)); let usage = ChannelUsage { amount_msat: 128, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 22); let usage = ChannelUsage { amount_msat: 256, ..usage }; @@ -3036,6 +2990,7 @@ mod tests { // One decay (i.e., half life) SinceEpoch::advance(Duration::from_secs(5)); + scorer.time_passed(Duration::from_secs(10)); let usage = ChannelUsage { amount_msat: 64, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); let usage = ChannelUsage { amount_msat: 128, ..usage }; @@ -3047,6 +3002,7 @@ mod tests { // Fully decay liquidity lower bound. SinceEpoch::advance(Duration::from_secs(10 * 7)); + scorer.time_passed(Duration::from_secs(10 * 8)); let usage = ChannelUsage { amount_msat: 0, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); let usage = ChannelUsage { amount_msat: 1, ..usage }; @@ -3058,12 +3014,14 @@ mod tests { // Fully decay liquidity upper bound. SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10 * 9)); let usage = ChannelUsage { amount_msat: 0, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); let usage = ChannelUsage { amount_msat: 1_024, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10 * 10)); let usage = ChannelUsage { amount_msat: 0, ..usage }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 0); let usage = ChannelUsage { amount_msat: 1_024, ..usage }; @@ -3103,9 +3061,11 @@ mod tests { // An unchecked right shift 64 bits or more in DirectedChannelLiquidity::decayed_offset_msat // would cause an overflow. SinceEpoch::advance(Duration::from_secs(10 * 64)); + scorer.time_passed(Duration::from_secs(10 * 64)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 125); SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10 * 65)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 125); } @@ -3144,6 +3104,7 @@ mod tests { // Decaying knowledge gives less confidence (128, 896), meaning a higher penalty. SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 291); // Reducing the upper bound gives more confidence (128, 832) that the payment amount (512) @@ -3158,6 +3119,7 @@ mod tests { // Further decaying affects the lower bound more than the upper bound (128, 928). SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(20)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 280); } @@ -3192,6 +3154,7 @@ mod tests { assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 473); scorer.payment_path_failed(&payment_path_for_amount(250), 43, Duration::from_secs(10)); @@ -3206,8 +3169,7 @@ mod tests { assert_eq!(deserialized_scorer.channel_penalty_msat(&candidate, usage, ¶ms), 300); } - #[test] - fn decays_persisted_liquidity_bounds() { + fn do_decays_persisted_liquidity_bounds(decay_before_reload: bool) { let logger = TestLogger::new(); let network_graph = network_graph(&logger); let params = ProbabilisticScoringFeeParameters { @@ -3236,23 +3198,38 @@ mod tests { }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), u64::max_value()); + if decay_before_reload { + SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10)); + } + let mut serialized_scorer = Vec::new(); scorer.write(&mut serialized_scorer).unwrap(); - SinceEpoch::advance(Duration::from_secs(10)); - let mut serialized_scorer = io::Cursor::new(&serialized_scorer); - let deserialized_scorer = + let mut deserialized_scorer = ::read(&mut serialized_scorer, (decay_params, &network_graph, &logger)).unwrap(); + if !decay_before_reload { + SinceEpoch::advance(Duration::from_secs(10)); + scorer.time_passed(Duration::from_secs(10)); + deserialized_scorer.time_passed(Duration::from_secs(10)); + } assert_eq!(deserialized_scorer.channel_penalty_msat(&candidate, usage, ¶ms), 473); scorer.payment_path_failed(&payment_path_for_amount(250), 43, Duration::from_secs(10)); assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 300); SinceEpoch::advance(Duration::from_secs(10)); + deserialized_scorer.time_passed(Duration::from_secs(20)); assert_eq!(deserialized_scorer.channel_penalty_msat(&candidate, usage, ¶ms), 370); } + #[test] + fn decays_persisted_liquidity_bounds() { + do_decays_persisted_liquidity_bounds(false); + do_decays_persisted_liquidity_bounds(true); + } + #[test] fn scores_realistic_payments() { // Shows the scores of "realistic" sends of 100k sats over channels of 1-10m sats (with a @@ -3577,6 +3554,7 @@ mod tests { // Advance the time forward 16 half-lives (which the docs claim will ensure all data is // gone), and check that we're back to where we started. SinceEpoch::advance(Duration::from_secs(10 * 16)); + scorer.time_passed(Duration::from_secs(10 * 16)); { let network_graph = network_graph.read_only(); let channel = network_graph.channel(42).unwrap(); @@ -3591,7 +3569,7 @@ mod tests { // Once fully decayed we still have data, but its all-0s. In the future we may remove the // data entirely instead. assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target), - None); + Some(([0; 32], [0; 32]))); assert_eq!(scorer.historical_estimated_payment_success_probability(42, &target, 1, ¶ms), None); let mut usage = ChannelUsage { @@ -3610,8 +3588,6 @@ mod tests { }; assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 2050); - usage.inflight_htlc_msat = 0; - assert_eq!(scorer.channel_penalty_msat(&candidate, usage, ¶ms), 866); let usage = ChannelUsage { amount_msat: 1, @@ -3623,6 +3599,12 @@ mod tests { // Advance to decay all liquidity offsets to zero. SinceEpoch::advance(Duration::from_secs(60 * 60 * 10)); + scorer.time_passed(Duration::from_secs(10 * (16 + 60 * 60))); + + // Once even the bounds have decayed information about the channel should be removed + // entirely. + assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target), + None); // Use a path in the opposite direction, which have zero for htlc_maximum_msat. This will // ensure that the effective capacity is zero to test division-by-zero edge cases. -- 2.39.5