Use `Duration` based time info in scoring rather than `Time`
authorMatt Corallo <git@bluematt.me>
Mon, 9 Oct 2023 01:44:33 +0000 (01:44 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 13 Dec 2023 23:26:09 +0000 (23:26 +0000)
In the coming commits, the `T: Time` bound on `ProbabilisticScorer`
will be removed. In order to enable that, we need to switch over to
using the `ScoreUpdate`-provided current time (as a `Duration`
since the unix epoch), making the `T` bound entirely unused.

lightning/src/routing/scoring.rs

index a74331df7e1e804295bcb67b624b47e398595978..2c29fd3f1c02cc588530ed4c307cb7360574bce6 100644 (file)
@@ -493,7 +493,8 @@ where L::Target: Logger {
        decay_params: ProbabilisticScoringDecayParameters,
        network_graph: G,
        logger: L,
-       channel_liquidities: HashMap<u64, ChannelLiquidity<T>>,
+       channel_liquidities: HashMap<u64, ChannelLiquidity>,
+       _unused_time: core::marker::PhantomData<T>,
 }
 
 /// Parameters for configuring [`ProbabilisticScorer`].
@@ -797,7 +798,7 @@ impl ProbabilisticScoringDecayParameters {
 /// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the
 /// first node in the ordering of the channel's counterparties. Thus, swapping the two liquidity
 /// offset fields gives the opposite direction.
-struct ChannelLiquidity<T: Time> {
+struct ChannelLiquidity {
        /// Lower channel liquidity bound in terms of an offset from zero.
        min_liquidity_offset_msat: u64,
 
@@ -807,23 +808,22 @@ struct ChannelLiquidity<T: Time> {
        min_liquidity_offset_history: HistoricalBucketRangeTracker,
        max_liquidity_offset_history: HistoricalBucketRangeTracker,
 
-       /// Time when the liquidity bounds were last modified.
-       last_updated: T,
+       /// Time when the liquidity bounds were last modified as an offset since the unix epoch.
+       last_updated: Duration,
 
-       /// Time when the historical liquidity bounds were last modified.
-       offset_history_last_updated: T,
+       /// Time when the historical liquidity bounds were last modified as an offset against the unix
+       /// epoch.
+       offset_history_last_updated: Duration,
 }
 
-/// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity and
-/// decayed with a given half life.
-struct DirectedChannelLiquidity<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Time, U: Deref<Target = T>> {
+/// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity.
+struct DirectedChannelLiquidity<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Deref<Target = Duration>> {
        min_liquidity_offset_msat: L,
        max_liquidity_offset_msat: L,
        liquidity_history: HistoricalMinMaxBuckets<BRT>,
        capacity_msat: u64,
-       last_updated: U,
-       offset_history_last_updated: U,
-       now: T,
+       last_updated: T,
+       offset_history_last_updated: T,
        decay_params: ProbabilisticScoringDecayParameters,
 }
 
@@ -836,11 +836,12 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ProbabilisticScorerU
                        network_graph,
                        logger,
                        channel_liquidities: HashMap::new(),
+                       _unused_time: core::marker::PhantomData,
                }
        }
 
        #[cfg(test)]
-       fn with_channel(mut self, short_channel_id: u64, liquidity: ChannelLiquidity<T>) -> Self {
+       fn with_channel(mut self, short_channel_id: u64, liquidity: ChannelLiquidity) -> Self {
                assert!(self.channel_liquidities.insert(short_channel_id, liquidity).is_none());
                self
        }
@@ -993,16 +994,15 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ProbabilisticScorerU
        }
 }
 
-impl<T: Time> ChannelLiquidity<T> {
-       #[inline]
-       fn new() -> Self {
+impl ChannelLiquidity {
+       fn new(last_updated: Duration) -> Self {
                Self {
                        min_liquidity_offset_msat: 0,
                        max_liquidity_offset_msat: 0,
                        min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
                        max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
-                       last_updated: T::now(),
-                       offset_history_last_updated: T::now(),
+                       last_updated,
+                       offset_history_last_updated: last_updated,
                }
        }
 
@@ -1010,7 +1010,7 @@ impl<T: Time> ChannelLiquidity<T> {
        /// `capacity_msat`.
        fn as_directed(
                &self, source: &NodeId, target: &NodeId, capacity_msat: u64, decay_params: ProbabilisticScoringDecayParameters
-       ) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, T, &T> {
+       ) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, &Duration> {
                let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
                        if source < target {
                                (&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat,
@@ -1030,7 +1030,6 @@ impl<T: Time> ChannelLiquidity<T> {
                        capacity_msat,
                        last_updated: &self.last_updated,
                        offset_history_last_updated: &self.offset_history_last_updated,
-                       now: T::now(),
                        decay_params: decay_params,
                }
        }
@@ -1039,7 +1038,7 @@ impl<T: Time> ChannelLiquidity<T> {
        /// `capacity_msat`.
        fn as_directed_mut(
                &mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, decay_params: ProbabilisticScoringDecayParameters
-       ) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, T, &mut T> {
+       ) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, &mut Duration> {
                let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
                        if source < target {
                                (&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat,
@@ -1059,7 +1058,6 @@ impl<T: Time> ChannelLiquidity<T> {
                        capacity_msat,
                        last_updated: &mut self.last_updated,
                        offset_history_last_updated: &mut self.offset_history_last_updated,
-                       now: T::now(),
                        decay_params: decay_params,
                }
        }
@@ -1070,7 +1068,7 @@ impl<T: Time> ChannelLiquidity<T> {
        ) -> u64 {
                let half_life = decay_params.liquidity_offset_half_life.as_secs_f64();
                if half_life != 0.0 {
-                       let elapsed_time = T::now().duration_since(self.last_updated).as_secs_f64();
+                       let elapsed_time = duration_since_epoch.saturating_sub(self.last_updated).as_secs_f64();
                        ((offset as f64) * powf64(0.5, elapsed_time / half_life)) as u64
                } else {
                        0
@@ -1159,7 +1157,8 @@ fn success_probability(
        (numerator, denominator)
 }
 
-impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Time, U: Deref<Target = T>> DirectedChannelLiquidity< L, BRT, T, U> {
+impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Deref<Target = Duration>>
+DirectedChannelLiquidity< L, BRT, T> {
        /// Returns a liquidity penalty for routing the given HTLC `amount_msat` through the channel in
        /// this direction.
        fn penalty_msat(&self, amount_msat: u64, score_params: &ProbabilisticScoringFeeParameters) -> u64 {
@@ -1267,7 +1266,8 @@ impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>,
        }
 }
 
-impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTracker>, T: Time, U: DerefMut<Target = T>> DirectedChannelLiquidity<L, BRT, T, U> {
+impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTracker>, T: DerefMut<Target = Duration>>
+DirectedChannelLiquidity<L, BRT, T> {
        /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`.
        fn failed_at_channel<Log: Deref>(
                &mut self, amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log
@@ -1313,7 +1313,9 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
        /// state"), we allow the caller to set an offset applied to our liquidity bounds which
        /// represents the amount of the successful payment we just made.
        fn update_history_buckets(&mut self, bucket_offset_msat: u64, duration_since_epoch: Duration) {
-               let half_lives = self.now.duration_since(*self.offset_history_last_updated).as_secs()
+               let half_lives =
+                       duration_since_epoch.checked_sub(*self.offset_history_last_updated)
+                       .unwrap_or(Duration::ZERO).as_secs()
                        .checked_div(self.decay_params.historical_no_updates_half_life.as_secs())
                        .map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value());
                self.liquidity_history.min_liquidity_offset_history.time_decay_data(half_lives);
@@ -1327,29 +1329,25 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
                self.liquidity_history.max_liquidity_offset_history.track_datapoint(
                        max_liquidity_offset_msat.saturating_sub(bucket_offset_msat), self.capacity_msat
                );
-               *self.offset_history_last_updated = self.now;
+               *self.offset_history_last_updated = duration_since_epoch;
        }
 
        /// Adjusts the lower bound of the channel liquidity balance in this direction.
        fn set_min_liquidity_msat(&mut self, amount_msat: u64, duration_since_epoch: Duration) {
                *self.min_liquidity_offset_msat = amount_msat;
-               *self.max_liquidity_offset_msat = if amount_msat > self.max_liquidity_msat() {
-                       0
-               } else {
-                       self.decayed_offset_msat(*self.max_liquidity_offset_msat)
-               };
-               *self.last_updated = self.now;
+               if amount_msat > self.max_liquidity_msat() {
+                       *self.max_liquidity_offset_msat = 0;
+               }
+               *self.last_updated = duration_since_epoch;
        }
 
        /// Adjusts the upper bound of the channel liquidity balance in this direction.
        fn set_max_liquidity_msat(&mut self, amount_msat: u64, duration_since_epoch: Duration) {
                *self.max_liquidity_offset_msat = self.capacity_msat.checked_sub(amount_msat).unwrap_or(0);
-               *self.min_liquidity_offset_msat = if amount_msat < self.min_liquidity_msat() {
-                       0
-               } else {
-                       self.decayed_offset_msat(*self.min_liquidity_offset_msat)
-               };
-               *self.last_updated = self.now;
+               if amount_msat < *self.min_liquidity_offset_msat {
+                       *self.min_liquidity_offset_msat = 0;
+               }
+               *self.last_updated = duration_since_epoch;
        }
 }
 
@@ -1396,7 +1394,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreLookUp for Prob
                let capacity_msat = usage.effective_capacity.as_msat();
                self.channel_liquidities
                        .get(&scid)
-                       .unwrap_or(&ChannelLiquidity::new())
+                       .unwrap_or(&ChannelLiquidity::new(Duration::ZERO))
                        .as_directed(&source, &target, capacity_msat, self.decay_params)
                        .penalty_msat(amount_msat, score_params)
                        .saturating_add(anti_probing_penalty_msat)
@@ -1426,14 +1424,14 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
                                if at_failed_channel {
                                        self.channel_liquidities
                                                .entry(hop.short_channel_id)
-                                               .or_insert_with(ChannelLiquidity::new)
+                                               .or_insert_with(|| ChannelLiquidity::new(duration_since_epoch))
                                                .as_directed_mut(source, &target, capacity_msat, self.decay_params)
                                                .failed_at_channel(amount_msat, duration_since_epoch,
                                                        format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
                                } else {
                                        self.channel_liquidities
                                                .entry(hop.short_channel_id)
-                                               .or_insert_with(ChannelLiquidity::new)
+                                               .or_insert_with(|| ChannelLiquidity::new(duration_since_epoch))
                                                .as_directed_mut(source, &target, capacity_msat, self.decay_params)
                                                .failed_downstream(amount_msat, duration_since_epoch,
                                                        format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
@@ -1462,7 +1460,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
                                let capacity_msat = channel.effective_capacity().as_msat();
                                self.channel_liquidities
                                        .entry(hop.short_channel_id)
-                                       .or_insert_with(ChannelLiquidity::new)
+                                       .or_insert_with(|| ChannelLiquidity::new(duration_since_epoch))
                                        .as_directed_mut(source, &target, capacity_msat, self.decay_params)
                                        .successful(amount_msat, duration_since_epoch,
                                                format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
@@ -1488,10 +1486,10 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
                                liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params);
                        liquidity.max_liquidity_offset_msat =
                                liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params);
-                       liquidity.last_updated = T::now();
+                       liquidity.last_updated = duration_since_epoch;
 
                        let elapsed_time =
-                               T::now().duration_since(liquidity.offset_history_last_updated);
+                               duration_since_epoch.saturating_sub(liquidity.offset_history_last_updated);
                        if elapsed_time > decay_params.historical_no_updates_half_life {
                                let half_life = decay_params.historical_no_updates_half_life.as_secs_f64();
                                if half_life != 0.0 {
@@ -1502,7 +1500,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
                                        for bucket in liquidity.max_liquidity_offset_history.buckets.iter_mut() {
                                                *bucket = ((*bucket as u64) * 1024 / divisor) as u16;
                                        }
-                                       liquidity.offset_history_last_updated = T::now();
+                                       liquidity.offset_history_last_updated = duration_since_epoch;
                                }
                        }
                        liquidity.min_liquidity_offset_msat != 0 || liquidity.max_liquidity_offset_msat != 0 ||
@@ -2125,31 +2123,29 @@ ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScore
                        network_graph,
                        logger,
                        channel_liquidities,
+                       _unused_time: core::marker::PhantomData,
                })
        }
 }
 
-impl<T: Time> Writeable for ChannelLiquidity<T> {
+impl Writeable for ChannelLiquidity {
        #[inline]
        fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
-               let offset_history_duration_since_epoch =
-                       T::duration_since_epoch() - self.offset_history_last_updated.elapsed();
-               let duration_since_epoch = T::duration_since_epoch() - self.last_updated.elapsed();
                write_tlv_fields!(w, {
                        (0, self.min_liquidity_offset_msat, required),
                        // 1 was the min_liquidity_offset_history in octile form
                        (2, self.max_liquidity_offset_msat, required),
                        // 3 was the max_liquidity_offset_history in octile form
-                       (4, duration_since_epoch, required),
+                       (4, self.last_updated, required),
                        (5, Some(self.min_liquidity_offset_history), option),
                        (7, Some(self.max_liquidity_offset_history), option),
-                       (9, offset_history_duration_since_epoch, required),
+                       (9, self.offset_history_last_updated, required),
                });
                Ok(())
        }
 }
 
-impl<T: Time> Readable for ChannelLiquidity<T> {
+impl Readable for ChannelLiquidity {
        #[inline]
        fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
                let mut min_liquidity_offset_msat = 0;
@@ -2158,36 +2154,18 @@ impl<T: Time> Readable for ChannelLiquidity<T> {
                let mut legacy_max_liq_offset_history: Option<LegacyHistoricalBucketRangeTracker> = None;
                let mut min_liquidity_offset_history: Option<HistoricalBucketRangeTracker> = None;
                let mut max_liquidity_offset_history: Option<HistoricalBucketRangeTracker> = None;
-               let mut duration_since_epoch = Duration::from_secs(0);
-               let mut offset_history_duration_since_epoch = None;
+               let mut last_updated = Duration::from_secs(0);
+               let mut offset_history_last_updated = None;
                read_tlv_fields!(r, {
                        (0, min_liquidity_offset_msat, required),
                        (1, legacy_min_liq_offset_history, option),
                        (2, max_liquidity_offset_msat, required),
                        (3, legacy_max_liq_offset_history, option),
-                       (4, duration_since_epoch, required),
+                       (4, last_updated, required),
                        (5, min_liquidity_offset_history, option),
                        (7, max_liquidity_offset_history, option),
-                       (9, offset_history_duration_since_epoch, option),
+                       (9, offset_history_last_updated, option),
                });
-               // On rust prior to 1.60 `Instant::duration_since` will panic if time goes backwards.
-               // We write `last_updated` as wallclock time even though its ultimately an `Instant` (which
-               // is a time from a monotonic clock usually represented as an offset against boot time).
-               // Thus, we have to construct an `Instant` by subtracting the difference in wallclock time
-               // from the one that was written. However, because `Instant` can panic if we construct one
-               // in the future, we must handle wallclock time jumping backwards, which we do by simply
-               // using `Instant::now()` in that case.
-               let wall_clock_now = T::duration_since_epoch();
-               let now = T::now();
-               let last_updated = if wall_clock_now > duration_since_epoch {
-                       now - (wall_clock_now - duration_since_epoch)
-               } else { now };
-
-               let offset_history_duration_since_epoch =
-                       offset_history_duration_since_epoch.unwrap_or(duration_since_epoch);
-               let offset_history_last_updated = if wall_clock_now > offset_history_duration_since_epoch {
-                       now - (wall_clock_now - offset_history_duration_since_epoch)
-               } else { now };
 
                if min_liquidity_offset_history.is_none() {
                        if let Some(legacy_buckets) = legacy_min_liq_offset_history {
@@ -2209,7 +2187,7 @@ impl<T: Time> Readable for ChannelLiquidity<T> {
                        min_liquidity_offset_history: min_liquidity_offset_history.unwrap(),
                        max_liquidity_offset_history: max_liquidity_offset_history.unwrap(),
                        last_updated,
-                       offset_history_last_updated,
+                       offset_history_last_updated: offset_history_last_updated.unwrap_or(last_updated),
                })
        }
 }
@@ -2219,7 +2197,6 @@ mod tests {
        use super::{ChannelLiquidity, HistoricalBucketRangeTracker, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters, ProbabilisticScorerUsingTime};
        use crate::blinded_path::{BlindedHop, BlindedPath};
        use crate::util::config::UserConfig;
-       use crate::util::time::Time;
        use crate::util::time::tests::SinceEpoch;
 
        use crate::ln::channelmanager;
@@ -2384,8 +2361,8 @@ mod tests {
        #[test]
        fn liquidity_bounds_directed_from_lowest_node_id() {
                let logger = TestLogger::new();
-               let last_updated = SinceEpoch::now();
-               let offset_history_last_updated = SinceEpoch::now();
+               let last_updated = Duration::ZERO;
+               let offset_history_last_updated = Duration::ZERO;
                let network_graph = network_graph(&logger);
                let decay_params = ProbabilisticScoringDecayParameters::default();
                let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger)
@@ -2465,8 +2442,8 @@ mod tests {
        #[test]
        fn resets_liquidity_upper_bound_when_crossed_by_lower_bound() {
                let logger = TestLogger::new();
-               let last_updated = SinceEpoch::now();
-               let offset_history_last_updated = SinceEpoch::now();
+               let last_updated = Duration::ZERO;
+               let offset_history_last_updated = Duration::ZERO;
                let network_graph = network_graph(&logger);
                let decay_params = ProbabilisticScoringDecayParameters::default();
                let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger)
@@ -2526,8 +2503,8 @@ mod tests {
        #[test]
        fn resets_liquidity_lower_bound_when_crossed_by_upper_bound() {
                let logger = TestLogger::new();
-               let last_updated = SinceEpoch::now();
-               let offset_history_last_updated = SinceEpoch::now();
+               let last_updated = Duration::ZERO;
+               let offset_history_last_updated = Duration::ZERO;
                let network_graph = network_graph(&logger);
                let decay_params = ProbabilisticScoringDecayParameters::default();
                let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger)
@@ -2639,8 +2616,8 @@ mod tests {
        #[test]
        fn constant_penalty_outside_liquidity_bounds() {
                let logger = TestLogger::new();
-               let last_updated = SinceEpoch::now();
-               let offset_history_last_updated = SinceEpoch::now();
+               let last_updated = Duration::ZERO;
+               let offset_history_last_updated = Duration::ZERO;
                let network_graph = network_graph(&logger);
                let params = ProbabilisticScoringFeeParameters {
                        liquidity_penalty_multiplier_msat: 1_000,