Track history of where channel liquidities have been in the past
authorMatt Corallo <git@bluematt.me>
Tue, 19 Jul 2022 22:37:16 +0000 (22:37 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 6 Oct 2022 21:10:23 +0000 (21:10 +0000)
This introduces two new fields to the `ChannelLiquidity` struct -
`min_liquidity_offset_history` and `max_liquidity_offset_history`,
both an array of 8 `u16`s. Each entry represents the proportion of
time that we spent with the min or max liquidity offset in the
given 1/8th of the channel's liquidity range. ie the first bucket
in `min_liquidity_offset_history` represents the proportion of time
we've thought the channel's minimum liquidity is lower than 1/8th's
the channel's capacity.

Each bucket is stored, effectively, as a fixed-point number with
5 bits for the fractional part, which is incremented by one (ie 32)
each time we update our liquidity estimates and decide our
estimates are in that bucket. We then decay each bucket by
2047/2048.

Thus, memory of a payment sticks around for more than 8,000
data points, though the majority of that memory decays after 1,387
data points.

lightning/src/routing/scoring.rs
lightning/src/util/ser.rs

index 6ae339eae2bbc01776b840d655c2ef68ce6e83aa..2bae959f4fbe98e9e29de2cf3a510dc961f32ff0 100644 (file)
@@ -64,6 +64,7 @@ use util::time::Time;
 use prelude::*;
 use core::fmt;
 use core::cell::{RefCell, RefMut};
+use core::convert::TryInto;
 use core::ops::{Deref, DerefMut};
 use core::time::Duration;
 use io::{self, Read};
@@ -432,6 +433,48 @@ pub struct ProbabilisticScoringParameters {
        pub considered_impossible_penalty_msat: u64,
 }
 
+/// Tracks the historical state of a distribution as a weighted average of how much time was spent
+/// in each of 8 buckets.
+#[derive(Clone, Copy)]
+struct HistoricalBucketRangeTracker {
+       buckets: [u16; 8],
+}
+
+impl HistoricalBucketRangeTracker {
+       fn new() -> Self { Self { buckets: [0; 8] } }
+       fn track_datapoint(&mut self, bucket_idx: u8) {
+               // We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
+               // we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part.
+               //
+               // Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to
+               // the buckets for the current min and max liquidity offset positions.
+               //
+               // We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a
+               // non-power-of-two). This ensures we can't actually overflow the u16 - when we get to
+               // 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457.
+               //
+               // In total, this allows us to track data for the last 8,000 or so payments across a given
+               // channel.
+               //
+               // These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead,
+               // and need to balance having more bits in the decimal part (to ensure decay isn't too
+               // non-linear) with having too few bits in the mantissa, causing us to not store very many
+               // datapoints.
+               //
+               // The constants were picked experimentally, selecting a decay amount that restricts us
+               // from overflowing buckets without having to cap them manually.
+               debug_assert!(bucket_idx < 8);
+               if bucket_idx < 8 {
+                       for e in self.buckets.iter_mut() {
+                               *e = ((*e as u32) * 2047 / 2048) as u16;
+                       }
+                       self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
+               }
+       }
+}
+
+impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
+
 /// Accounting for channel liquidity balance uncertainty.
 ///
 /// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the
@@ -446,13 +489,18 @@ struct ChannelLiquidity<T: Time> {
 
        /// Time when the liquidity bounds were last modified.
        last_updated: T,
+
+       min_liquidity_offset_history: HistoricalBucketRangeTracker,
+       max_liquidity_offset_history: HistoricalBucketRangeTracker,
 }
 
 /// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity and
 /// decayed with a given half life.
-struct DirectedChannelLiquidity<L: Deref<Target = u64>, T: Time, U: Deref<Target = T>> {
+struct DirectedChannelLiquidity<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Time, U: Deref<Target = T>> {
        min_liquidity_offset_msat: L,
        max_liquidity_offset_msat: L,
+       min_liquidity_offset_history: BRT,
+       max_liquidity_offset_history: BRT,
        capacity_msat: u64,
        last_updated: U,
        now: T,
@@ -593,6 +641,8 @@ impl<T: Time> ChannelLiquidity<T> {
                Self {
                        min_liquidity_offset_msat: 0,
                        max_liquidity_offset_msat: 0,
+                       min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+                       max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
                        last_updated: T::now(),
                }
        }
@@ -601,16 +651,21 @@ impl<T: Time> ChannelLiquidity<T> {
        /// `capacity_msat`.
        fn as_directed(
                &self, source: &NodeId, target: &NodeId, capacity_msat: u64, half_life: Duration
-       ) -> DirectedChannelLiquidity<&u64, T, &T> {
-               let (min_liquidity_offset_msat, max_liquidity_offset_msat) = if source < target {
-                       (&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat)
-               } else {
-                       (&self.max_liquidity_offset_msat, &self.min_liquidity_offset_msat)
-               };
+       ) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, T, &T> {
+               let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
+                       if source < target {
+                               (&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat,
+                                       &self.min_liquidity_offset_history, &self.max_liquidity_offset_history)
+                       } else {
+                               (&self.max_liquidity_offset_msat, &self.min_liquidity_offset_msat,
+                                       &self.max_liquidity_offset_history, &self.min_liquidity_offset_history)
+                       };
 
                DirectedChannelLiquidity {
                        min_liquidity_offset_msat,
                        max_liquidity_offset_msat,
+                       min_liquidity_offset_history,
+                       max_liquidity_offset_history,
                        capacity_msat,
                        last_updated: &self.last_updated,
                        now: T::now(),
@@ -622,16 +677,21 @@ impl<T: Time> ChannelLiquidity<T> {
        /// `capacity_msat`.
        fn as_directed_mut(
                &mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, half_life: Duration
-       ) -> DirectedChannelLiquidity<&mut u64, T, &mut T> {
-               let (min_liquidity_offset_msat, max_liquidity_offset_msat) = if source < target {
-                       (&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat)
-               } else {
-                       (&mut self.max_liquidity_offset_msat, &mut self.min_liquidity_offset_msat)
-               };
+       ) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, T, &mut T> {
+               let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
+                       if source < target {
+                               (&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat,
+                                       &mut self.min_liquidity_offset_history, &mut self.max_liquidity_offset_history)
+                       } else {
+                               (&mut self.max_liquidity_offset_msat, &mut self.min_liquidity_offset_msat,
+                                       &mut self.max_liquidity_offset_history, &mut self.min_liquidity_offset_history)
+                       };
 
                DirectedChannelLiquidity {
                        min_liquidity_offset_msat,
                        max_liquidity_offset_msat,
+                       min_liquidity_offset_history,
+                       max_liquidity_offset_history,
                        capacity_msat,
                        last_updated: &mut self.last_updated,
                        now: T::now(),
@@ -652,7 +712,7 @@ const PRECISION_LOWER_BOUND_DENOMINATOR: u64 = approx::LOWER_BITS_BOUND;
 const AMOUNT_PENALTY_DIVISOR: u64 = 1 << 20;
 const BASE_AMOUNT_PENALTY_DIVISOR: u64 = 1 << 30;
 
-impl<L: Deref<Target = u64>, T: Time, U: Deref<Target = T>> DirectedChannelLiquidity<L, T, U> {
+impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Time, U: Deref<Target = T>> DirectedChannelLiquidity<L, BRT, T, U> {
        /// Returns a liquidity penalty for routing the given HTLC `amount_msat` through the channel in
        /// this direction.
        fn penalty_msat(&self, amount_msat: u64, params: &ProbabilisticScoringParameters) -> u64 {
@@ -722,7 +782,7 @@ impl<L: Deref<Target = u64>, T: Time, U: Deref<Target = T>> DirectedChannelLiqui
        }
 }
 
-impl<L: DerefMut<Target = u64>, T: Time, U: DerefMut<Target = T>> DirectedChannelLiquidity<L, T, U> {
+impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTracker>, T: Time, U: DerefMut<Target = T>> DirectedChannelLiquidity<L, BRT, T, U> {
        /// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`.
        fn failed_at_channel<Log: Deref>(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger {
                if amount_msat < self.max_liquidity_msat() {
@@ -750,6 +810,21 @@ impl<L: DerefMut<Target = u64>, T: Time, U: DerefMut<Target = T>> DirectedChanne
                self.set_max_liquidity_msat(max_liquidity_msat);
        }
 
+       fn update_history_buckets(&mut self) {
+               debug_assert!(*self.min_liquidity_offset_msat <= self.capacity_msat);
+               self.min_liquidity_offset_history.track_datapoint(
+                       // Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset
+                       // is zero or the channel's capacity, though the second should generally never happen.
+                       (self.min_liquidity_offset_msat.saturating_sub(1) * 8 / self.capacity_msat)
+                       .try_into().unwrap_or(32)); // 32 is bogus for 8 buckets, and will be ignored
+               debug_assert!(*self.max_liquidity_offset_msat <= self.capacity_msat);
+               self.max_liquidity_offset_history.track_datapoint(
+                       // Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset
+                       // is zero or the channel's capacity, though the second should generally never happen.
+                       (self.max_liquidity_offset_msat.saturating_sub(1) * 8 / self.capacity_msat)
+                       .try_into().unwrap_or(32)); // 32 is bogus for 8 buckets, and will be ignored
+       }
+
        /// Adjusts the lower bound of the channel liquidity balance in this direction.
        fn set_min_liquidity_msat(&mut self, amount_msat: u64) {
                *self.min_liquidity_offset_msat = amount_msat;
@@ -759,6 +834,7 @@ impl<L: DerefMut<Target = u64>, T: Time, U: DerefMut<Target = T>> DirectedChanne
                        self.decayed_offset_msat(*self.max_liquidity_offset_msat)
                };
                *self.last_updated = self.now;
+               self.update_history_buckets();
        }
 
        /// Adjusts the upper bound of the channel liquidity balance in this direction.
@@ -770,6 +846,7 @@ impl<L: DerefMut<Target = u64>, T: Time, U: DerefMut<Target = T>> DirectedChanne
                        self.decayed_offset_msat(*self.min_liquidity_offset_msat)
                };
                *self.last_updated = self.now;
+               self.update_history_buckets();
        }
 }
 
@@ -1236,7 +1313,9 @@ impl<T: Time> Writeable for ChannelLiquidity<T> {
                let duration_since_epoch = T::duration_since_epoch() - self.last_updated.elapsed();
                write_tlv_fields!(w, {
                        (0, self.min_liquidity_offset_msat, required),
+                       (1, Some(self.min_liquidity_offset_history), option),
                        (2, self.max_liquidity_offset_msat, required),
+                       (3, Some(self.max_liquidity_offset_history), option),
                        (4, duration_since_epoch, required),
                });
                Ok(())
@@ -1248,10 +1327,14 @@ impl<T: Time> Readable for ChannelLiquidity<T> {
        fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
                let mut min_liquidity_offset_msat = 0;
                let mut max_liquidity_offset_msat = 0;
+               let mut min_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new());
+               let mut max_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new());
                let mut duration_since_epoch = Duration::from_secs(0);
                read_tlv_fields!(r, {
                        (0, min_liquidity_offset_msat, required),
+                       (1, min_liquidity_offset_history, option),
                        (2, max_liquidity_offset_msat, required),
+                       (3, max_liquidity_offset_history, option),
                        (4, duration_since_epoch, required),
                });
                // On rust prior to 1.60 `Instant::duration_since` will panic if time goes backwards.
@@ -1269,6 +1352,8 @@ impl<T: Time> Readable for ChannelLiquidity<T> {
                Ok(Self {
                        min_liquidity_offset_msat,
                        max_liquidity_offset_msat,
+                       min_liquidity_offset_history: min_liquidity_offset_history.unwrap(),
+                       max_liquidity_offset_history: max_liquidity_offset_history.unwrap(),
                        last_updated,
                })
        }
@@ -1276,7 +1361,7 @@ impl<T: Time> Readable for ChannelLiquidity<T> {
 
 #[cfg(test)]
 mod tests {
-       use super::{ChannelLiquidity, ProbabilisticScoringParameters, ProbabilisticScorerUsingTime};
+       use super::{ChannelLiquidity, HistoricalBucketRangeTracker, ProbabilisticScoringParameters, ProbabilisticScorerUsingTime};
        use util::time::Time;
        use util::time::tests::SinceEpoch;
 
@@ -1459,11 +1544,15 @@ mod tests {
                let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
                        .with_channel(42,
                                ChannelLiquidity {
-                                       min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated
+                                       min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated,
+                                       min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+                                       max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
                                })
                        .with_channel(43,
                                ChannelLiquidity {
-                                       min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated
+                                       min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated,
+                                       min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+                                       max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
                                });
                let source = source_node_id();
                let target = target_node_id();
@@ -1534,7 +1623,9 @@ mod tests {
                let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
                        .with_channel(42,
                                ChannelLiquidity {
-                                       min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated
+                                       min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated,
+                                       min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+                                       max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
                                });
                let source = source_node_id();
                let target = target_node_id();
@@ -1592,7 +1683,9 @@ mod tests {
                let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
                        .with_channel(42,
                                ChannelLiquidity {
-                                       min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated
+                                       min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated,
+                                       min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+                                       max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
                                });
                let source = source_node_id();
                let target = target_node_id();
@@ -1699,7 +1792,9 @@ mod tests {
                let scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
                        .with_channel(42,
                                ChannelLiquidity {
-                                       min_liquidity_offset_msat: 40, max_liquidity_offset_msat: 40, last_updated
+                                       min_liquidity_offset_msat: 40, max_liquidity_offset_msat: 40, last_updated,
+                                       min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+                                       max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
                                });
                let source = source_node_id();
                let target = target_node_id();
index 5b1a86a6a95f27a276f1dc221bd705f57fbf8ed4..845d13a5d236539758477df38884c3b0384b3f43 100644 (file)
@@ -515,6 +515,29 @@ impl_array!(PUBLIC_KEY_SIZE); // for PublicKey
 impl_array!(COMPACT_SIGNATURE_SIZE); // for Signature
 impl_array!(1300); // for OnionPacket.hop_data
 
+impl Writeable for [u16; 8] {
+       #[inline]
+       fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
+               for v in self.iter() {
+                       w.write_all(&v.to_be_bytes())?
+               }
+               Ok(())
+       }
+}
+
+impl Readable for [u16; 8] {
+       #[inline]
+       fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
+               let mut buf = [0u8; 16];
+               r.read_exact(&mut buf)?;
+               let mut res = [0u16; 8];
+               for (idx, v) in res.iter_mut().enumerate() {
+                       *v = (buf[idx] as u16) << 8 | (buf[idx + 1] as u16)
+               }
+               Ok(res)
+       }
+}
+
 // HashMap
 impl<K, V> Writeable for HashMap<K, V>
        where K: Writeable + Eq + Hash,