Pipe `Duration`-based time information through scoring pipeline
authorMatt Corallo <git@bluematt.me>
Mon, 9 Oct 2023 01:15:18 +0000 (01:15 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 13 Dec 2023 23:26:09 +0000 (23:26 +0000)
In the coming commits, the `T: Time` bound on `ProbabilisticScorer`
will be removed. In order to enable that, we need to pass the
current time (as a `Duration` since the unix epoch) through the
score updating pipeline, allowing us to keep the
`*last_updated_time` fields up-to-date as we go.

lightning/src/routing/scoring.rs

index 72720245f5f1eddaa5f0b8b83427c5a3d61c51e1..a74331df7e1e804295bcb67b624b47e398595978 100644 (file)
@@ -1064,7 +1064,10 @@ impl<T: Time> ChannelLiquidity<T> {
                }
        }
 
-       fn decayed_offset(&self, offset: u64, decay_params: ProbabilisticScoringDecayParameters) -> u64 {
+       fn decayed_offset(
+               &self, offset: u64, duration_since_epoch: Duration,
+               decay_params: ProbabilisticScoringDecayParameters,
+       ) -> u64 {
                let half_life = decay_params.liquidity_offset_half_life.as_secs_f64();
                if half_life != 0.0 {
                        let elapsed_time = T::now().duration_since(self.last_updated).as_secs_f64();
@@ -1266,44 +1269,50 @@ impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>,
 
 impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTracker>, T: Time, U: DerefMut<Target = T>> DirectedChannelLiquidity<L, BRT, T, U> {
        /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`.
-       fn failed_at_channel<Log: Deref>(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger {
+       fn failed_at_channel<Log: Deref>(
+               &mut self, amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log
+       ) where Log::Target: Logger {
                let existing_max_msat = self.max_liquidity_msat();
                if amount_msat < existing_max_msat {
                        log_debug!(logger, "Setting max liquidity of {} from {} to {}", chan_descr, existing_max_msat, amount_msat);
-                       self.set_max_liquidity_msat(amount_msat);
+                       self.set_max_liquidity_msat(amount_msat, duration_since_epoch);
                } else {
                        log_trace!(logger, "Max liquidity of {} is {} (already less than or equal to {})",
                                chan_descr, existing_max_msat, amount_msat);
                }
-               self.update_history_buckets(0);
+               self.update_history_buckets(0, duration_since_epoch);
        }
 
        /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat` downstream.
-       fn failed_downstream<Log: Deref>(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger {
+       fn failed_downstream<Log: Deref>(
+               &mut self, amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log
+       ) where Log::Target: Logger {
                let existing_min_msat = self.min_liquidity_msat();
                if amount_msat > existing_min_msat {
                        log_debug!(logger, "Setting min liquidity of {} from {} to {}", existing_min_msat, chan_descr, amount_msat);
-                       self.set_min_liquidity_msat(amount_msat);
+                       self.set_min_liquidity_msat(amount_msat, duration_since_epoch);
                } else {
                        log_trace!(logger, "Min liquidity of {} is {} (already greater than or equal to {})",
                                chan_descr, existing_min_msat, amount_msat);
                }
-               self.update_history_buckets(0);
+               self.update_history_buckets(0, duration_since_epoch);
        }
 
        /// Adjusts the channel liquidity balance bounds when successfully routing `amount_msat`.
-       fn successful<Log: Deref>(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger {
+       fn successful<Log: Deref>(&mut self,
+               amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log
+       ) where Log::Target: Logger {
                let max_liquidity_msat = self.max_liquidity_msat().checked_sub(amount_msat).unwrap_or(0);
                log_debug!(logger, "Subtracting {} from max liquidity of {} (setting it to {})", amount_msat, chan_descr, max_liquidity_msat);
-               self.set_max_liquidity_msat(max_liquidity_msat);
-               self.update_history_buckets(amount_msat);
+               self.set_max_liquidity_msat(max_liquidity_msat, duration_since_epoch);
+               self.update_history_buckets(amount_msat, duration_since_epoch);
        }
 
        /// Updates the history buckets for this channel. Because the history buckets track what we now
        /// know about the channel's state *prior to our payment* (i.e. what we assume is "steady
        /// state"), we allow the caller to set an offset applied to our liquidity bounds which
        /// represents the amount of the successful payment we just made.
-       fn update_history_buckets(&mut self, bucket_offset_msat: u64) {
+       fn update_history_buckets(&mut self, bucket_offset_msat: u64, duration_since_epoch: Duration) {
                let half_lives = self.now.duration_since(*self.offset_history_last_updated).as_secs()
                        .checked_div(self.decay_params.historical_no_updates_half_life.as_secs())
                        .map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value());
@@ -1322,7 +1331,7 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
        }
 
        /// Adjusts the lower bound of the channel liquidity balance in this direction.
-       fn set_min_liquidity_msat(&mut self, amount_msat: u64) {
+       fn set_min_liquidity_msat(&mut self, amount_msat: u64, duration_since_epoch: Duration) {
                *self.min_liquidity_offset_msat = amount_msat;
                *self.max_liquidity_offset_msat = if amount_msat > self.max_liquidity_msat() {
                        0
@@ -1333,7 +1342,7 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
        }
 
        /// Adjusts the upper bound of the channel liquidity balance in this direction.
-       fn set_max_liquidity_msat(&mut self, amount_msat: u64) {
+       fn set_max_liquidity_msat(&mut self, amount_msat: u64, duration_since_epoch: Duration) {
                *self.max_liquidity_offset_msat = self.capacity_msat.checked_sub(amount_msat).unwrap_or(0);
                *self.min_liquidity_offset_msat = if amount_msat < self.min_liquidity_msat() {
                        0
@@ -1396,7 +1405,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreLookUp for Prob
 }
 
 impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for ProbabilisticScorerUsingTime<G, L, T> where L::Target: Logger {
-       fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, _duration_since_epoch: Duration) {
+       fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64, duration_since_epoch: Duration) {
                let amount_msat = path.final_value_msat();
                log_trace!(self.logger, "Scoring path through to SCID {} as having failed at {} msat", short_channel_id, amount_msat);
                let network_graph = self.network_graph.read_only();
@@ -1419,13 +1428,15 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
                                                .entry(hop.short_channel_id)
                                                .or_insert_with(ChannelLiquidity::new)
                                                .as_directed_mut(source, &target, capacity_msat, self.decay_params)
-                                               .failed_at_channel(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
+                                               .failed_at_channel(amount_msat, duration_since_epoch,
+                                                       format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
                                } else {
                                        self.channel_liquidities
                                                .entry(hop.short_channel_id)
                                                .or_insert_with(ChannelLiquidity::new)
                                                .as_directed_mut(source, &target, capacity_msat, self.decay_params)
-                                               .failed_downstream(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
+                                               .failed_downstream(amount_msat, duration_since_epoch,
+                                                       format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
                                }
                        } else {
                                log_debug!(self.logger, "Not able to penalize channel with SCID {} as we do not have graph info for it (likely a route-hint last-hop).",
@@ -1435,7 +1446,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
                }
        }
 
-       fn payment_path_successful(&mut self, path: &Path, _duration_since_epoch: Duration) {
+       fn payment_path_successful(&mut self, path: &Path, duration_since_epoch: Duration) {
                let amount_msat = path.final_value_msat();
                log_trace!(self.logger, "Scoring path through SCID {} as having succeeded at {} msat.",
                        path.hops.split_last().map(|(hop, _)| hop.short_channel_id).unwrap_or(0), amount_msat);
@@ -1453,7 +1464,8 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
                                        .entry(hop.short_channel_id)
                                        .or_insert_with(ChannelLiquidity::new)
                                        .as_directed_mut(source, &target, capacity_msat, self.decay_params)
-                                       .successful(amount_msat, format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
+                                       .successful(amount_msat, duration_since_epoch,
+                                               format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
                        } else {
                                log_debug!(self.logger, "Not able to learn for channel with SCID {} as we do not have graph info for it (likely a route-hint last-hop).",
                                        hop.short_channel_id);
@@ -1469,12 +1481,15 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
                self.payment_path_failed(path, u64::max_value(), duration_since_epoch)
        }
 
-       fn time_passed(&mut self, _duration_since_epoch: Duration) {
+       fn time_passed(&mut self, duration_since_epoch: Duration) {
                let decay_params = self.decay_params;
                self.channel_liquidities.retain(|_scid, liquidity| {
-                       liquidity.min_liquidity_offset_msat = liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, decay_params);
-                       liquidity.max_liquidity_offset_msat = liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, decay_params);
+                       liquidity.min_liquidity_offset_msat =
+                               liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params);
+                       liquidity.max_liquidity_offset_msat =
+                               liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params);
                        liquidity.last_updated = T::now();
+
                        let elapsed_time =
                                T::now().duration_since(liquidity.offset_history_last_updated);
                        if elapsed_time > decay_params.historical_no_updates_half_life {
@@ -2408,7 +2423,7 @@ mod tests {
 
                scorer.channel_liquidities.get_mut(&42).unwrap()
                        .as_directed_mut(&source, &target, 1_000, decay_params)
-                       .set_min_liquidity_msat(200);
+                       .set_min_liquidity_msat(200, Duration::ZERO);
 
                let liquidity = scorer.channel_liquidities.get(&42).unwrap()
                        .as_directed(&source, &target, 1_000, decay_params);
@@ -2434,7 +2449,7 @@ mod tests {
 
                scorer.channel_liquidities.get_mut(&43).unwrap()
                        .as_directed_mut(&target, &recipient, 1_000, decay_params)
-                       .set_max_liquidity_msat(200);
+                       .set_max_liquidity_msat(200, Duration::ZERO);
 
                let liquidity = scorer.channel_liquidities.get(&43).unwrap()
                        .as_directed(&target, &recipient, 1_000, decay_params);
@@ -2480,7 +2495,7 @@ mod tests {
                // Reset from source to target.
                scorer.channel_liquidities.get_mut(&42).unwrap()
                        .as_directed_mut(&source, &target, 1_000, decay_params)
-                       .set_min_liquidity_msat(900);
+                       .set_min_liquidity_msat(900, Duration::ZERO);
 
                let liquidity = scorer.channel_liquidities.get(&42).unwrap()
                        .as_directed(&source, &target, 1_000, decay_params);
@@ -2495,7 +2510,7 @@ mod tests {
                // Reset from target to source.
                scorer.channel_liquidities.get_mut(&42).unwrap()
                        .as_directed_mut(&target, &source, 1_000, decay_params)
-                       .set_min_liquidity_msat(400);
+                       .set_min_liquidity_msat(400, Duration::ZERO);
 
                let liquidity = scorer.channel_liquidities.get(&42).unwrap()
                        .as_directed(&source, &target, 1_000, decay_params);
@@ -2541,7 +2556,7 @@ mod tests {
                // Reset from source to target.
                scorer.channel_liquidities.get_mut(&42).unwrap()
                        .as_directed_mut(&source, &target, 1_000, decay_params)
-                       .set_max_liquidity_msat(300);
+                       .set_max_liquidity_msat(300, Duration::ZERO);
 
                let liquidity = scorer.channel_liquidities.get(&42).unwrap()
                        .as_directed(&source, &target, 1_000, decay_params);
@@ -2556,7 +2571,7 @@ mod tests {
                // Reset from target to source.
                scorer.channel_liquidities.get_mut(&42).unwrap()
                        .as_directed_mut(&target, &source, 1_000, decay_params)
-                       .set_max_liquidity_msat(600);
+                       .set_max_liquidity_msat(600, Duration::ZERO);
 
                let liquidity = scorer.channel_liquidities.get(&42).unwrap()
                        .as_directed(&source, &target, 1_000, decay_params);