From d54c93065acedba911d04ef8dcc48e199c452406 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 9 Oct 2023 01:15:18 +0000 Subject: [PATCH] Pipe `Duration`-based time information through scoring pipeline In the coming commits, the `T: Time` bound on `ProbabilisticScorer` will be removed. In order to enable that, we need to pass the current time (as a `Duration` since the unix epoch) through the score updating pipeline, allowing us to keep the `*last_updated_time` fields up-to-date as we go. --- lightning/src/routing/scoring.rs | 69 +++++++++++++++++++------------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 72720245f..a74331df7 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -1064,7 +1064,10 @@ impl ChannelLiquidity { } } - fn decayed_offset(&self, offset: u64, decay_params: ProbabilisticScoringDecayParameters) -> u64 { + fn decayed_offset( + &self, offset: u64, duration_since_epoch: Duration, + decay_params: ProbabilisticScoringDecayParameters, + ) -> u64 { let half_life = decay_params.liquidity_offset_half_life.as_secs_f64(); if half_life != 0.0 { let elapsed_time = T::now().duration_since(self.last_updated).as_secs_f64(); @@ -1266,44 +1269,50 @@ impl, BRT: Deref, impl, BRT: DerefMut, T: Time, U: DerefMut> DirectedChannelLiquidity { /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`. - fn failed_at_channel(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger { + fn failed_at_channel( + &mut self, amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log + ) where Log::Target: Logger { let existing_max_msat = self.max_liquidity_msat(); if amount_msat < existing_max_msat { log_debug!(logger, "Setting max liquidity of {} from {} to {}", chan_descr, existing_max_msat, amount_msat); - self.set_max_liquidity_msat(amount_msat); + self.set_max_liquidity_msat(amount_msat, duration_since_epoch); } else { log_trace!(logger, "Max liquidity of {} is {} (already less than or equal to {})", chan_descr, existing_max_msat, amount_msat); } - self.update_history_buckets(0); + self.update_history_buckets(0, duration_since_epoch); } /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat` downstream. - fn failed_downstream(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger { + fn failed_downstream( + &mut self, amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log + ) where Log::Target: Logger { let existing_min_msat = self.min_liquidity_msat(); if amount_msat > existing_min_msat { log_debug!(logger, "Setting min liquidity of {} from {} to {}", existing_min_msat, chan_descr, amount_msat); - self.set_min_liquidity_msat(amount_msat); + self.set_min_liquidity_msat(amount_msat, duration_since_epoch); } else { log_trace!(logger, "Min liquidity of {} is {} (already greater than or equal to {})", chan_descr, existing_min_msat, amount_msat); } - self.update_history_buckets(0); + self.update_history_buckets(0, duration_since_epoch); } /// Adjusts the channel liquidity balance bounds when successfully routing `amount_msat`. - fn successful(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger { + fn successful(&mut self, + amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log + ) where Log::Target: Logger { let max_liquidity_msat = self.max_liquidity_msat().checked_sub(amount_msat).unwrap_or(0); log_debug!(logger, "Subtracting {} from max liquidity of {} (setting it to {})", amount_msat, chan_descr, max_liquidity_msat); - self.set_max_liquidity_msat(max_liquidity_msat); - self.update_history_buckets(amount_msat); + self.set_max_liquidity_msat(max_liquidity_msat, duration_since_epoch); + self.update_history_buckets(amount_msat, duration_since_epoch); } /// Updates the history buckets for this channel. Because the history buckets track what we now /// know about the channel's state *prior to our payment* (i.e. what we assume is "steady /// state"), we allow the caller to set an offset applied to our liquidity bounds which /// represents the amount of the successful payment we just made. - fn update_history_buckets(&mut self, bucket_offset_msat: u64) { + fn update_history_buckets(&mut self, bucket_offset_msat: u64, duration_since_epoch: Duration) { let half_lives = self.now.duration_since(*self.offset_history_last_updated).as_secs() .checked_div(self.decay_params.historical_no_updates_half_life.as_secs()) .map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value()); @@ -1322,7 +1331,7 @@ impl, BRT: DerefMut self.max_liquidity_msat() { 0 @@ -1333,7 +1342,7 @@ impl, BRT: DerefMut>, L: Deref, T: Time> ScoreLookUp for Prob } impl>, L: Deref, T: Time> ScoreUpdate for ProbabilisticScorerUsingTime where L::Target: Logger { - fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, _duration_since_epoch: Duration) { + fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration) { let amount_msat = path.final_value_msat(); log_trace!(self.logger, "Scoring path through to SCID {} as having failed at {} msat", short_channel_id, amount_msat); let network_graph = self.network_graph.read_only(); @@ -1419,13 +1428,15 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob .entry(hop.short_channel_id) .or_insert_with(ChannelLiquidity::new) .as_directed_mut(source, &target, capacity_msat, self.decay_params) - .failed_at_channel(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); + .failed_at_channel(amount_msat, duration_since_epoch, + format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } else { self.channel_liquidities .entry(hop.short_channel_id) .or_insert_with(ChannelLiquidity::new) .as_directed_mut(source, &target, capacity_msat, self.decay_params) - .failed_downstream(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); + .failed_downstream(amount_msat, duration_since_epoch, + format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } } else { log_debug!(self.logger, "Not able to penalize channel with SCID {} as we do not have graph info for it (likely a route-hint last-hop).", @@ -1435,7 +1446,7 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob } } - fn payment_path_successful(&mut self, path: &Path, _duration_since_epoch: Duration) { + fn payment_path_successful(&mut self, path: &Path, duration_since_epoch: Duration) { let amount_msat = path.final_value_msat(); log_trace!(self.logger, "Scoring path through SCID {} as having succeeded at {} msat.", path.hops.split_last().map(|(hop, _)| hop.short_channel_id).unwrap_or(0), amount_msat); @@ -1453,7 +1464,8 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob .entry(hop.short_channel_id) .or_insert_with(ChannelLiquidity::new) .as_directed_mut(source, &target, capacity_msat, self.decay_params) - .successful(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); + .successful(amount_msat, duration_since_epoch, + format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger); } else { log_debug!(self.logger, "Not able to learn for channel with SCID {} as we do not have graph info for it (likely a route-hint last-hop).", hop.short_channel_id); @@ -1469,12 +1481,15 @@ impl>, L: Deref, T: Time> ScoreUpdate for Prob self.payment_path_failed(path, u64::max_value(), duration_since_epoch) } - fn time_passed(&mut self, _duration_since_epoch: Duration) { + fn time_passed(&mut self, duration_since_epoch: Duration) { let decay_params = self.decay_params; self.channel_liquidities.retain(|_scid, liquidity| { - liquidity.min_liquidity_offset_msat = liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, decay_params); - liquidity.max_liquidity_offset_msat = liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, decay_params); + liquidity.min_liquidity_offset_msat = + liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params); + liquidity.max_liquidity_offset_msat = + liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params); liquidity.last_updated = T::now(); + let elapsed_time = T::now().duration_since(liquidity.offset_history_last_updated); if elapsed_time > decay_params.historical_no_updates_half_life { @@ -2408,7 +2423,7 @@ mod tests { scorer.channel_liquidities.get_mut(&42).unwrap() .as_directed_mut(&source, &target, 1_000, decay_params) - .set_min_liquidity_msat(200); + .set_min_liquidity_msat(200, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); @@ -2434,7 +2449,7 @@ mod tests { scorer.channel_liquidities.get_mut(&43).unwrap() .as_directed_mut(&target, &recipient, 1_000, decay_params) - .set_max_liquidity_msat(200); + .set_max_liquidity_msat(200, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&43).unwrap() .as_directed(&target, &recipient, 1_000, decay_params); @@ -2480,7 +2495,7 @@ mod tests { // Reset from source to target. scorer.channel_liquidities.get_mut(&42).unwrap() .as_directed_mut(&source, &target, 1_000, decay_params) - .set_min_liquidity_msat(900); + .set_min_liquidity_msat(900, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); @@ -2495,7 +2510,7 @@ mod tests { // Reset from target to source. scorer.channel_liquidities.get_mut(&42).unwrap() .as_directed_mut(&target, &source, 1_000, decay_params) - .set_min_liquidity_msat(400); + .set_min_liquidity_msat(400, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); @@ -2541,7 +2556,7 @@ mod tests { // Reset from source to target. scorer.channel_liquidities.get_mut(&42).unwrap() .as_directed_mut(&source, &target, 1_000, decay_params) - .set_max_liquidity_msat(300); + .set_max_liquidity_msat(300, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); @@ -2556,7 +2571,7 @@ mod tests { // Reset from target to source. scorer.channel_liquidities.get_mut(&42).unwrap() .as_directed_mut(&target, &source, 1_000, decay_params) - .set_max_liquidity_msat(600); + .set_max_liquidity_msat(600, Duration::ZERO); let liquidity = scorer.channel_liquidities.get(&42).unwrap() .as_directed(&source, &target, 1_000, decay_params); -- 2.39.5