use prelude::*;
use core::fmt;
use core::cell::{RefCell, RefMut};
+use core::convert::TryInto;
use core::ops::{Deref, DerefMut};
use core::time::Duration;
use io::{self, Read};
pub considered_impossible_penalty_msat: u64,
}
+/// Tracks the historical state of a distribution as a weighted average of how much time was spent
+/// in each of 8 buckets.
+#[derive(Clone, Copy)]
+struct HistoricalBucketRangeTracker {
+ buckets: [u16; 8],
+}
+
+impl HistoricalBucketRangeTracker {
+ fn new() -> Self { Self { buckets: [0; 8] } }
+ fn track_datapoint(&mut self, bucket_idx: u8) {
+ // We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
+ // we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part.
+ //
+ // Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to
+ // the buckets for the current min and max liquidity offset positions.
+ //
+ // We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a
+ // non-power-of-two). This ensures we can't actually overflow the u16 - when we get to
+ // 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457.
+ //
+ // In total, this allows us to track data for the last 8,000 or so payments across a given
+ // channel.
+ //
+ // These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead,
+ // and need to balance having more bits in the decimal part (to ensure decay isn't too
+ // non-linear) with having too few bits in the mantissa, causing us to not store very many
+ // datapoints.
+ //
+ // The constants were picked experimentally, selecting a decay amount that restricts us
+ // from overflowing buckets without having to cap them manually.
+ debug_assert!(bucket_idx < 8);
+ if bucket_idx < 8 {
+ for e in self.buckets.iter_mut() {
+ *e = ((*e as u32) * 2047 / 2048) as u16;
+ }
+ self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
+ }
+ }
+}
+
+impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
+
/// Accounting for channel liquidity balance uncertainty.
///
/// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the
/// Time when the liquidity bounds were last modified.
last_updated: T,
+
+ min_liquidity_offset_history: HistoricalBucketRangeTracker,
+ max_liquidity_offset_history: HistoricalBucketRangeTracker,
}
/// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity and
/// decayed with a given half life.
-struct DirectedChannelLiquidity<L: Deref<Target = u64>, T: Time, U: Deref<Target = T>> {
+struct DirectedChannelLiquidity<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Time, U: Deref<Target = T>> {
min_liquidity_offset_msat: L,
max_liquidity_offset_msat: L,
+ min_liquidity_offset_history: BRT,
+ max_liquidity_offset_history: BRT,
capacity_msat: u64,
last_updated: U,
now: T,
Self {
min_liquidity_offset_msat: 0,
max_liquidity_offset_msat: 0,
+ min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+ max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
last_updated: T::now(),
}
}
/// `capacity_msat`.
fn as_directed(
&self, source: &NodeId, target: &NodeId, capacity_msat: u64, half_life: Duration
- ) -> DirectedChannelLiquidity<&u64, T, &T> {
- let (min_liquidity_offset_msat, max_liquidity_offset_msat) = if source < target {
- (&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat)
- } else {
- (&self.max_liquidity_offset_msat, &self.min_liquidity_offset_msat)
- };
+ ) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, T, &T> {
+ let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
+ if source < target {
+ (&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat,
+ &self.min_liquidity_offset_history, &self.max_liquidity_offset_history)
+ } else {
+ (&self.max_liquidity_offset_msat, &self.min_liquidity_offset_msat,
+ &self.max_liquidity_offset_history, &self.min_liquidity_offset_history)
+ };
DirectedChannelLiquidity {
min_liquidity_offset_msat,
max_liquidity_offset_msat,
+ min_liquidity_offset_history,
+ max_liquidity_offset_history,
capacity_msat,
last_updated: &self.last_updated,
now: T::now(),
/// `capacity_msat`.
fn as_directed_mut(
&mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, half_life: Duration
- ) -> DirectedChannelLiquidity<&mut u64, T, &mut T> {
- let (min_liquidity_offset_msat, max_liquidity_offset_msat) = if source < target {
- (&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat)
- } else {
- (&mut self.max_liquidity_offset_msat, &mut self.min_liquidity_offset_msat)
- };
+ ) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, T, &mut T> {
+ let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
+ if source < target {
+ (&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat,
+ &mut self.min_liquidity_offset_history, &mut self.max_liquidity_offset_history)
+ } else {
+ (&mut self.max_liquidity_offset_msat, &mut self.min_liquidity_offset_msat,
+ &mut self.max_liquidity_offset_history, &mut self.min_liquidity_offset_history)
+ };
DirectedChannelLiquidity {
min_liquidity_offset_msat,
max_liquidity_offset_msat,
+ min_liquidity_offset_history,
+ max_liquidity_offset_history,
capacity_msat,
last_updated: &mut self.last_updated,
now: T::now(),
const AMOUNT_PENALTY_DIVISOR: u64 = 1 << 20;
const BASE_AMOUNT_PENALTY_DIVISOR: u64 = 1 << 30;
-impl<L: Deref<Target = u64>, T: Time, U: Deref<Target = T>> DirectedChannelLiquidity<L, T, U> {
+impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Time, U: Deref<Target = T>> DirectedChannelLiquidity<L, BRT, T, U> {
/// Returns a liquidity penalty for routing the given HTLC `amount_msat` through the channel in
/// this direction.
fn penalty_msat(&self, amount_msat: u64, params: &ProbabilisticScoringParameters) -> u64 {
}
}
-impl<L: DerefMut<Target = u64>, T: Time, U: DerefMut<Target = T>> DirectedChannelLiquidity<L, T, U> {
+impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTracker>, T: Time, U: DerefMut<Target = T>> DirectedChannelLiquidity<L, BRT, T, U> {
/// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`.
fn failed_at_channel<Log: Deref>(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger {
if amount_msat < self.max_liquidity_msat() {
self.set_max_liquidity_msat(max_liquidity_msat);
}
+ fn update_history_buckets(&mut self) {
+ debug_assert!(*self.min_liquidity_offset_msat <= self.capacity_msat);
+ self.min_liquidity_offset_history.track_datapoint(
+ // Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset
+ // is zero or the channel's capacity, though the second should generally never happen.
+ (self.min_liquidity_offset_msat.saturating_sub(1) * 8 / self.capacity_msat)
+ .try_into().unwrap_or(32)); // 32 is bogus for 8 buckets, and will be ignored
+ debug_assert!(*self.max_liquidity_offset_msat <= self.capacity_msat);
+ self.max_liquidity_offset_history.track_datapoint(
+ // Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset
+ // is zero or the channel's capacity, though the second should generally never happen.
+ (self.max_liquidity_offset_msat.saturating_sub(1) * 8 / self.capacity_msat)
+ .try_into().unwrap_or(32)); // 32 is bogus for 8 buckets, and will be ignored
+ }
+
/// Adjusts the lower bound of the channel liquidity balance in this direction.
fn set_min_liquidity_msat(&mut self, amount_msat: u64) {
*self.min_liquidity_offset_msat = amount_msat;
self.decayed_offset_msat(*self.max_liquidity_offset_msat)
};
*self.last_updated = self.now;
+ self.update_history_buckets();
}
/// Adjusts the upper bound of the channel liquidity balance in this direction.
self.decayed_offset_msat(*self.min_liquidity_offset_msat)
};
*self.last_updated = self.now;
+ self.update_history_buckets();
}
}
let duration_since_epoch = T::duration_since_epoch() - self.last_updated.elapsed();
write_tlv_fields!(w, {
(0, self.min_liquidity_offset_msat, required),
+ (1, Some(self.min_liquidity_offset_history), option),
(2, self.max_liquidity_offset_msat, required),
+ (3, Some(self.max_liquidity_offset_history), option),
(4, duration_since_epoch, required),
});
Ok(())
fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
let mut min_liquidity_offset_msat = 0;
let mut max_liquidity_offset_msat = 0;
+ let mut min_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new());
+ let mut max_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new());
let mut duration_since_epoch = Duration::from_secs(0);
read_tlv_fields!(r, {
(0, min_liquidity_offset_msat, required),
+ (1, min_liquidity_offset_history, option),
(2, max_liquidity_offset_msat, required),
+ (3, max_liquidity_offset_history, option),
(4, duration_since_epoch, required),
});
// On rust prior to 1.60 `Instant::duration_since` will panic if time goes backwards.
Ok(Self {
min_liquidity_offset_msat,
max_liquidity_offset_msat,
+ min_liquidity_offset_history: min_liquidity_offset_history.unwrap(),
+ max_liquidity_offset_history: max_liquidity_offset_history.unwrap(),
last_updated,
})
}
#[cfg(test)]
mod tests {
- use super::{ChannelLiquidity, ProbabilisticScoringParameters, ProbabilisticScorerUsingTime};
+ use super::{ChannelLiquidity, HistoricalBucketRangeTracker, ProbabilisticScoringParameters, ProbabilisticScorerUsingTime};
use util::time::Time;
use util::time::tests::SinceEpoch;
let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
.with_channel(42,
ChannelLiquidity {
- min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated
+ min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated,
+ min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+ max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
})
.with_channel(43,
ChannelLiquidity {
- min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated
+ min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated,
+ min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+ max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
});
let source = source_node_id();
let target = target_node_id();
let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
.with_channel(42,
ChannelLiquidity {
- min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated
+ min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated,
+ min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+ max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
});
let source = source_node_id();
let target = target_node_id();
let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
.with_channel(42,
ChannelLiquidity {
- min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated
+ min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated,
+ min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+ max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
});
let source = source_node_id();
let target = target_node_id();
let scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
.with_channel(42,
ChannelLiquidity {
- min_liquidity_offset_msat: 40, max_liquidity_offset_msat: 40, last_updated
+ min_liquidity_offset_msat: 40, max_liquidity_offset_msat: 40, last_updated,
+ min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
+ max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
});
let source = source_node_id();
let target = target_node_id();