]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Keep track of inflight HTLCs across payments
authorjurvis <hello@jurvis.co>
Tue, 30 Aug 2022 05:50:44 +0000 (22:50 -0700)
committerjurvis <hello@jurvis.co>
Thu, 1 Sep 2022 01:50:02 +0000 (18:50 -0700)
Added two methods, `process_path_inflight_htlcs` and
`remove_path_inflight_htlcs`, that updates that `payment_cache` map with
path information that may have failed, succeeded, or have been given up
on.

Introduced `AccountForInflightHtlcs`, which will wrap our user-provided
scorer. We move the `S:Score` type parameterization from the `Router` to
`find_route`, so we can use our newly introduced
`AccountForInflightHtlcs`.

`AccountForInflightHtlcs` keeps track of a map of inflight HTLCs by
their short channel id, direction, and give us the value that is being
used up.

This map will in turn be populated prior to calling `find_route`, where
we’ll use `create_inflight_map`, to generate a current map of all
inflight HTLCs based on what was stored in `payment_cache`.

lightning-invoice/src/payment.rs
lightning-invoice/src/utils.rs
lightning/src/routing/gossip.rs
lightning/src/routing/scoring.rs

index a2bdb8876a4870dcf7f3582990599eaefa208266..8854fcbc9afdc8b15e0d46f87ac587a8d21e9da0 100644 (file)
@@ -77,8 +77,8 @@
 //! # }
 //! #
 //! # struct FakeRouter {}
-//! # impl<S: Score> Router<S> for FakeRouter {
-//! #     fn find_route(
+//! # impl Router for FakeRouter {
+//! #     fn find_route<S: Score>(
 //! #         &self, payer: &PublicKey, params: &RouteParameters, payment_hash: &PaymentHash,
 //! #         first_hops: Option<&[&ChannelDetails]>, scorer: &S
 //! #     ) -> Result<Route, LightningError> { unimplemented!() }
@@ -144,8 +144,10 @@ use crate::prelude::*;
 use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
 use lightning::ln::channelmanager::{ChannelDetails, PaymentId, PaymentSendFailure};
 use lightning::ln::msgs::LightningError;
-use lightning::routing::scoring::{LockableScore, Score};
+use lightning::routing::gossip::NodeId;
+use lightning::routing::scoring::{ChannelUsage, LockableScore, Score};
 use lightning::routing::router::{PaymentParameters, Route, RouteHop, RouteParameters};
+use lightning::util::errors::APIError;
 use lightning::util::events::{Event, EventHandler};
 use lightning::util::logger::Logger;
 use time_utils::Time;
@@ -175,10 +177,9 @@ use time_utils;
 type ConfiguredTime = time_utils::Eternity;
 
 /// (C-not exported) generally all users should use the [`InvoicePayer`] type alias.
-pub struct InvoicePayerUsingTime<P: Deref, R, S: Deref, L: Deref, E: EventHandler, T: Time>
+pub struct InvoicePayerUsingTime<P: Deref, R: Router, S: Deref, L: Deref, E: EventHandler, T: Time>
 where
        P::Target: Payer,
-       R: for <'a> Router<<<S as Deref>::Target as LockableScore<'a>>::Locked>,
        S::Target: for <'a> LockableScore<'a>,
        L::Target: Logger,
 {
@@ -207,6 +208,50 @@ impl<T: Time> PaymentInfo<T> {
                }
        }
 }
+
+/// Used to store information about all the HTLCs that are inflight across all payment attempts
+struct AccountForInFlightHtlcs<'a, S: Score> {
+       scorer: &'a mut S,
+       /// Maps a channel's short channel id and its direction to the liquidity used up.
+       inflight_htlcs: HashMap<(u64, bool), u64>,
+}
+
+#[cfg(c_bindings)]
+impl<'a, S:Score> lightning::util::ser::Writeable for AccountForInFlightHtlcs<'a, S> {
+       fn write<W: lightning::util::ser::Writer>(&self, writer: &mut W) -> Result<(), std::io::Error> { self.scorer.write(writer) }
+}
+
+impl<'a, S: Score> Score for AccountForInFlightHtlcs<'a, S> {
+       fn channel_penalty_msat(&self, short_channel_id: u64, source: &NodeId, target: &NodeId, usage: ChannelUsage) -> u64 {
+               if let Some(used_liqudity) = self.inflight_htlcs.get(&(short_channel_id, source < target)) {
+                       let usage = ChannelUsage {
+                               inflight_htlc_msat: usage.inflight_htlc_msat + used_liqudity,
+                               ..usage
+                       };
+
+                       self.scorer.channel_penalty_msat(short_channel_id, source, target, usage)
+               } else {
+                       self.scorer.channel_penalty_msat(short_channel_id, source, target, usage)
+               }
+       }
+
+       fn payment_path_failed(&mut self, path: &[&RouteHop], short_channel_id: u64) {
+               self.scorer.payment_path_failed(path, short_channel_id)
+       }
+
+       fn payment_path_successful(&mut self, path: &[&RouteHop]) {
+               self.scorer.payment_path_successful(path)
+       }
+
+       fn probe_failed(&mut self, path: &[&RouteHop], short_channel_id: u64) {
+               self.scorer.probe_failed(path, short_channel_id)
+       }
+
+       fn probe_successful(&mut self, path: &[&RouteHop]) {
+               self.scorer.probe_successful(path)
+       }
+}
+
 /// Storing minimal payment attempts information required for determining if a outbound payment can
 /// be retried.
 #[derive(Clone, Copy)]
@@ -267,9 +312,9 @@ pub trait Payer {
 }
 
 /// A trait defining behavior for routing an [`Invoice`] payment.
-pub trait Router<S: Score> {
+pub trait Router {
        /// Finds a [`Route`] between `payer` and `payee` for a payment with the given values.
-       fn find_route(
+       fn find_route<S: Score>(
                &self, payer: &PublicKey, route_params: &RouteParameters, payment_hash: &PaymentHash,
                first_hops: Option<&[&ChannelDetails]>, scorer: &S
        ) -> Result<Route, LightningError>;
@@ -314,10 +359,9 @@ pub enum PaymentError {
        Sending(PaymentSendFailure),
 }
 
-impl<P: Deref, R, S: Deref, L: Deref, E: EventHandler, T: Time> InvoicePayerUsingTime<P, R, S, L, E, T>
+impl<P: Deref, R: Router, S: Deref, L: Deref, E: EventHandler, T: Time> InvoicePayerUsingTime<P, R, S, L, E, T>
 where
        P::Target: Payer,
-       R: for <'a> Router<<<S as Deref>::Target as LockableScore<'a>>::Locked>,
        S::Target: for <'a> LockableScore<'a>,
        L::Target: Logger,
 {
@@ -440,13 +484,19 @@ where
 
                let payer = self.payer.node_id();
                let first_hops = self.payer.first_hops();
+               let inflight_htlcs = self.create_inflight_map();
                let route = self.router.find_route(
-                       &payer, params, &payment_hash, Some(&first_hops.iter().collect::<Vec<_>>()),
-                       &self.scorer.lock()
+                       &payer, &params, &payment_hash, Some(&first_hops.iter().collect::<Vec<_>>()),
+                       &AccountForInFlightHtlcs { scorer: &mut self.scorer.lock(), inflight_htlcs }
                ).map_err(|e| PaymentError::Routing(e))?;
 
                match send_payment(&route) {
-                       Ok(payment_id) => Ok(payment_id),
+                       Ok(payment_id) => {
+                               for path in route.paths {
+                                       self.process_path_inflight_htlcs(payment_hash, path);
+                               }
+                               Ok(payment_id)
+                       },
                        Err(e) => match e {
                                PaymentSendFailure::ParameterError(_) => Err(e),
                                PaymentSendFailure::PathParameterError(_) => Err(e),
@@ -461,7 +511,19 @@ where
                                                Err(e)
                                        }
                                },
-                               PaymentSendFailure::PartialFailure { failed_paths_retry, payment_id, .. } => {
+                               PaymentSendFailure::PartialFailure { failed_paths_retry, payment_id, results } => {
+                                       // If a `PartialFailure` event returns a result that is an `Ok()`, it means that
+                                       // part of our payment is retried. When we receive `MonitorUpdateFailed`, it
+                                       // means that we are still waiting for our channel monitor update to be completed.
+                                       for (result, path) in results.iter().zip(route.paths.into_iter()) {
+                                               match result {
+                                                       Ok(_) | Err(APIError::MonitorUpdateFailed) => {
+                                                               self.process_path_inflight_htlcs(payment_hash, path);
+                                                       },
+                                                       _ => {},
+                                               }
+                                       }
+
                                        if let Some(retry_data) = failed_paths_retry {
                                                // Some paths were sent, even if we failed to send the full MPP value our
                                                // recipient may misbehave and claim the funds, at which point we have to
@@ -481,6 +543,24 @@ where
                }.map_err(|e| PaymentError::Sending(e))
        }
 
+       // Takes in a path to have its information stored in `payment_cache`. This is done for paths
+       // that are pending retry.
+       fn process_path_inflight_htlcs(&self, payment_hash: PaymentHash, path: Vec<RouteHop>) {
+               self.payment_cache.lock().unwrap().entry(payment_hash)
+                       .or_insert_with(|| PaymentInfo::new())
+                       .paths.push(path);
+       }
+
+       // Find the path we want to remove in `payment_cache`. If it doesn't exist, do nothing.
+       fn remove_path_inflight_htlcs(&self, payment_hash: PaymentHash, path: &Vec<RouteHop>) {
+               self.payment_cache.lock().unwrap().entry(payment_hash)
+                       .and_modify(|payment_info| {
+                               if let Some(idx) = payment_info.paths.iter().position(|p| p == path) {
+                                       payment_info.paths.swap_remove(idx);
+                               }
+                       });
+       }
+
        fn retry_payment(
                &self, payment_id: PaymentId, payment_hash: PaymentHash, params: &RouteParameters
        ) -> Result<(), ()> {
@@ -508,17 +588,25 @@ where
 
                let payer = self.payer.node_id();
                let first_hops = self.payer.first_hops();
+               let inflight_htlcs = self.create_inflight_map();
+
                let route = self.router.find_route(
                        &payer, &params, &payment_hash, Some(&first_hops.iter().collect::<Vec<_>>()),
-                       &self.scorer.lock()
+                       &AccountForInFlightHtlcs { scorer: &mut self.scorer.lock(), inflight_htlcs }
                );
+
                if route.is_err() {
                        log_trace!(self.logger, "Failed to find a route for payment {}; not retrying ({:})", log_bytes!(payment_hash.0), attempts);
                        return Err(());
                }
 
-               match self.payer.retry_payment(&route.unwrap(), payment_id) {
-                       Ok(()) => Ok(()),
+               match self.payer.retry_payment(&route.as_ref().unwrap(), payment_id) {
+                       Ok(()) => {
+                               for path in route.unwrap().paths.into_iter() {
+                                       self.process_path_inflight_htlcs(payment_hash, path);
+                               }
+                               Ok(())
+                       },
                        Err(PaymentSendFailure::ParameterError(_)) |
                        Err(PaymentSendFailure::PathParameterError(_)) => {
                                log_trace!(self.logger, "Failed to retry for payment {} due to bogus route/payment data, not retrying.", log_bytes!(payment_hash.0));
@@ -527,7 +615,19 @@ where
                        Err(PaymentSendFailure::AllFailedRetrySafe(_)) => {
                                self.retry_payment(payment_id, payment_hash, params)
                        },
-                       Err(PaymentSendFailure::PartialFailure { failed_paths_retry, .. }) => {
+                       Err(PaymentSendFailure::PartialFailure { failed_paths_retry, results, .. }) => {
+                               // If a `PartialFailure` error contains a result that is an `Ok()`, it means that
+                               // part of our payment is retried. When we receive `MonitorUpdateFailed`, it
+                               // means that we are still waiting for our channel monitor update to complete.
+                               for (result, path) in results.iter().zip(route.unwrap().paths.into_iter()) {
+                                       match result {
+                                               Ok(_) | Err(APIError::MonitorUpdateFailed) => {
+                                                       self.process_path_inflight_htlcs(payment_hash, path);
+                                               },
+                                               _ => {},
+                                       }
+                               }
+
                                if let Some(retry) = failed_paths_retry {
                                        // Always return Ok for the same reason as noted in pay_internal.
                                        let _ = self.retry_payment(payment_id, payment_hash, &retry);
@@ -544,6 +644,47 @@ where
        pub fn remove_cached_payment(&self, payment_hash: &PaymentHash) {
                self.payment_cache.lock().unwrap().remove(payment_hash);
        }
+
+       /// Given a [`PaymentHash`], this function looks up inflight path attempts in the payment_cache.
+       /// Then, it uses the path information inside the cache to construct a HashMap mapping a channel's
+       /// short channel id and direction to the amount being sent through it.
+       ///
+       /// This function should be called whenever we need information about currently used up liquidity
+       /// across payments.
+       fn create_inflight_map(&self) -> HashMap<(u64, bool), u64> {
+               let mut total_inflight_map: HashMap<(u64, bool), u64> = HashMap::new();
+               // Make an attempt at finding existing payment information from `payment_cache`. If it
+               // does not exist, it probably is a fresh payment and we can just return an empty
+               // HashMap.
+               for payment_info in self.payment_cache.lock().unwrap().values() {
+                       for path in &payment_info.paths {
+                               if path.is_empty() { break };
+                               // total_inflight_map needs to be direction-sensitive when keeping track of the HTLC value
+                               // that is held up. However, the `hops` array, which is a path returned by `find_route` in
+                               // the router excludes the payer node. In the following lines, the payer's information is
+                               // hardcoded with an inflight value of 0 so that we can correctly represent the first hop
+                               // in our sliding window of two.
+                               let our_node_id: PublicKey = self.payer.node_id();
+                               let reversed_hops_with_payer = path.iter().rev().skip(1)
+                                       .map(|hop| hop.pubkey)
+                                       .chain(core::iter::once(our_node_id));
+                               let mut cumulative_msat = 0;
+
+                               // Taking the reversed vector from above, we zip it with just the reversed hops list to
+                               // work "backwards" of the given path, since the last hop's `fee_msat` actually represents
+                               // the total amount sent.
+                               for (next_hop, prev_hop) in path.iter().rev().zip(reversed_hops_with_payer) {
+                                       cumulative_msat += next_hop.fee_msat;
+                                       total_inflight_map
+                                               .entry((next_hop.short_channel_id, NodeId::from_pubkey(&prev_hop) < NodeId::from_pubkey(&next_hop.pubkey)))
+                                               .and_modify(|used_liquidity_msat| *used_liquidity_msat += cumulative_msat)
+                                               .or_insert(cumulative_msat);
+                               }
+                       }
+               }
+
+               total_inflight_map
+       }
 }
 
 fn expiry_time_from_unix_epoch(invoice: &Invoice) -> Duration {
@@ -557,14 +698,23 @@ fn has_expired(route_params: &RouteParameters) -> bool {
        } else { false }
 }
 
-impl<P: Deref, R, S: Deref, L: Deref, E: EventHandler, T: Time> EventHandler for InvoicePayerUsingTime<P, R, S, L, E, T>
+impl<P: Deref, R: Router, S: Deref, L: Deref, E: EventHandler, T: Time> EventHandler for InvoicePayerUsingTime<P, R, S, L, E, T>
 where
        P::Target: Payer,
-       R: for <'a> Router<<<S as Deref>::Target as LockableScore<'a>>::Locked>,
        S::Target: for <'a> LockableScore<'a>,
        L::Target: Logger,
 {
        fn handle_event(&self, event: &Event) {
+               match event {
+                       Event::PaymentPathFailed { payment_hash, path, ..  }
+                       | Event::PaymentPathSuccessful { path, payment_hash: Some(payment_hash), .. }
+                       | Event::ProbeSuccessful { payment_hash, path, .. }
+                       | Event::ProbeFailed { payment_hash, path, .. } => {
+                               self.remove_path_inflight_htlcs(*payment_hash, path);
+                       },
+                       _ => {},
+               }
+
                match event {
                        Event::PaymentPathFailed {
                                payment_id, payment_hash, rejected_by_dest, path, short_channel_id, retry, ..
@@ -633,7 +783,7 @@ mod tests {
        use lightning::ln::features::{ChannelFeatures, NodeFeatures, InitFeatures};
        use lightning::ln::functional_test_utils::*;
        use lightning::ln::msgs::{ChannelMessageHandler, ErrorAction, LightningError};
-       use lightning::routing::gossip::NodeId;
+       use lightning::routing::gossip::{EffectiveCapacity, NodeId};
        use lightning::routing::router::{PaymentParameters, Route, RouteHop};
        use lightning::routing::scoring::ChannelUsage;
        use lightning::util::test_utils::TestLogger;
@@ -645,6 +795,7 @@ mod tests {
        use std::time::{SystemTime, Duration};
        use time_utils::tests::SinceEpoch;
        use DEFAULT_EXPIRY_TIME;
+       use lightning::util::errors::APIError::{ChannelUnavailable, MonitorUpdateFailed};
 
        fn invoice(payment_preimage: PaymentPreimage) -> Invoice {
                let payment_hash = Sha256::hash(&payment_preimage.0);
@@ -792,8 +943,8 @@ mod tests {
                let final_value_msat = invoice.amount_milli_satoshis().unwrap();
 
                let payer = TestPayer::new()
-                       .fails_with_partial_failure(retry.clone(), OnAttempt(1))
-                       .fails_with_partial_failure(retry, OnAttempt(2))
+                       .fails_with_partial_failure(retry.clone(), OnAttempt(1), None)
+                       .fails_with_partial_failure(retry, OnAttempt(2), None)
                        .expect_send(Amount::ForInvoice(final_value_msat))
                        .expect_send(Amount::OnRetry(final_value_msat / 2))
                        .expect_send(Amount::OnRetry(final_value_msat / 2));
@@ -1381,18 +1532,263 @@ mod tests {
                invoice_payer.handle_event(&event);
        }
 
+       #[test]
+       fn generates_correct_inflight_map_data() {
+               let event_handled = core::cell::RefCell::new(false);
+               let event_handler = |_: &_| { *event_handled.borrow_mut() = true; };
+
+               let payment_preimage = PaymentPreimage([1; 32]);
+               let invoice = invoice(payment_preimage);
+               let payment_hash = Some(PaymentHash(invoice.payment_hash().clone().into_inner()));
+               let final_value_msat = invoice.amount_milli_satoshis().unwrap();
+
+               let payer = TestPayer::new().expect_send(Amount::ForInvoice(final_value_msat));
+               let final_value_msat = invoice.amount_milli_satoshis().unwrap();
+               let route = TestRouter::route_for_value(final_value_msat);
+               let router = TestRouter {};
+               let scorer = RefCell::new(TestScorer::new());
+               let logger = TestLogger::new();
+               let invoice_payer =
+                       InvoicePayer::new(&payer, router, &scorer, &logger, event_handler, Retry::Attempts(0));
+
+               let payment_id = invoice_payer.pay_invoice(&invoice).unwrap();
+
+               let inflight_map = invoice_payer.create_inflight_map();
+               // First path check
+               assert_eq!(inflight_map.get(&(0, false)).unwrap().clone(), 94);
+               assert_eq!(inflight_map.get(&(1, true)).unwrap().clone(), 84);
+               assert_eq!(inflight_map.get(&(2, false)).unwrap().clone(), 64);
+
+               // Second path check
+               assert_eq!(inflight_map.get(&(1, false)).unwrap().clone(), 64);
+
+               invoice_payer.handle_event(&Event::PaymentPathSuccessful {
+                       payment_id, payment_hash, path: route.paths[0].clone()
+               });
+
+               let inflight_map = invoice_payer.create_inflight_map();
+
+               assert_eq!(inflight_map.get(&(0, false)), None);
+               assert_eq!(inflight_map.get(&(1, true)), None);
+               assert_eq!(inflight_map.get(&(2, false)), None);
+
+               // Second path should still be inflight
+               assert_eq!(inflight_map.get(&(1, false)).unwrap().clone(), 64)
+       }
+
+       #[test]
+       fn considers_inflight_htlcs_between_invoice_payments_when_path_succeeds() {
+               // First, let's just send a payment through, but only make sure one of the path completes
+               let event_handled = core::cell::RefCell::new(false);
+               let event_handler = |_: &_| { *event_handled.borrow_mut() = true; };
+
+               let payment_preimage = PaymentPreimage([1; 32]);
+               let payment_invoice = invoice(payment_preimage);
+               let payment_hash = Some(PaymentHash(payment_invoice.payment_hash().clone().into_inner()));
+               let final_value_msat = payment_invoice.amount_milli_satoshis().unwrap();
+
+               let payer = TestPayer::new()
+                       .expect_send(Amount::ForInvoice(final_value_msat))
+                       .expect_send(Amount::ForInvoice(final_value_msat));
+               let final_value_msat = payment_invoice.amount_milli_satoshis().unwrap();
+               let route = TestRouter::route_for_value(final_value_msat);
+               let router = TestRouter {};
+               let scorer = RefCell::new(TestScorer::new()
+                       // 1st invoice, 1st path
+                       .expect_usage(ChannelUsage { amount_msat: 10, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 20, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 64, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       // 1st invoice, 2nd path
+                       .expect_usage(ChannelUsage { amount_msat: 64, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       // 2nd invoice, 1st path
+                       .expect_usage(ChannelUsage { amount_msat: 10, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 20, inflight_htlc_msat: 64, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 64, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       // 2nd invoice, 2nd path
+                       .expect_usage(ChannelUsage { amount_msat: 64, inflight_htlc_msat: 64, effective_capacity: EffectiveCapacity::Unknown } )
+               );
+               let logger = TestLogger::new();
+               let invoice_payer =
+                       InvoicePayer::new(&payer, router, &scorer, &logger, event_handler, Retry::Attempts(0));
+
+               // Succeed 1st path, leave 2nd path inflight
+               let payment_id = invoice_payer.pay_invoice(&payment_invoice).unwrap();
+               invoice_payer.handle_event(&Event::PaymentPathSuccessful {
+                       payment_id, payment_hash, path: route.paths[0].clone()
+               });
+
+               // Let's pay a second invoice that will be using the same path. This should trigger the
+               // assertions that expect the last 4 ChannelUsage values above where TestScorer is initialized.
+               // Particularly, the 2nd path of the 1st payment, since it is not yet complete, should still
+               // have 64 msats inflight for paths considering the channel with scid of 1.
+               let payment_preimage_2 = PaymentPreimage([2; 32]);
+               let payment_invoice_2 = invoice(payment_preimage_2);
+               invoice_payer.pay_invoice(&payment_invoice_2).unwrap();
+       }
+
+       #[test]
+       fn considers_inflight_htlcs_between_retries() {
+               // First, let's just send a payment through, but only make sure one of the path completes
+               let event_handled = core::cell::RefCell::new(false);
+               let event_handler = |_: &_| { *event_handled.borrow_mut() = true; };
+
+               let payment_preimage = PaymentPreimage([1; 32]);
+               let payment_invoice = invoice(payment_preimage);
+               let payment_hash = PaymentHash(payment_invoice.payment_hash().clone().into_inner());
+               let final_value_msat = payment_invoice.amount_milli_satoshis().unwrap();
+
+               let payer = TestPayer::new()
+                       .expect_send(Amount::ForInvoice(final_value_msat))
+                       .expect_send(Amount::OnRetry(final_value_msat / 2))
+                       .expect_send(Amount::OnRetry(final_value_msat / 2));
+               let final_value_msat = payment_invoice.amount_milli_satoshis().unwrap();
+               let router = TestRouter {};
+               let scorer = RefCell::new(TestScorer::new()
+                       // 1st invoice, 1st path
+                       .expect_usage(ChannelUsage { amount_msat: 10, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 20, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 64, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       // 1st invoice, 2nd path
+                       .expect_usage(ChannelUsage { amount_msat: 64, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       // Retry 1
+                       .expect_usage(ChannelUsage { amount_msat: 10, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 20, inflight_htlc_msat: 64, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 32, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 32, inflight_htlc_msat: 64, effective_capacity: EffectiveCapacity::Unknown } )
+                       // Retry 2
+                       .expect_usage(ChannelUsage { amount_msat: 10, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 20, inflight_htlc_msat: 96, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 32, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown } )
+                       .expect_usage(ChannelUsage { amount_msat: 32, inflight_htlc_msat: 96, effective_capacity: EffectiveCapacity::Unknown } )
+               );
+               let logger = TestLogger::new();
+               let invoice_payer =
+                       InvoicePayer::new(&payer, router, &scorer, &logger, event_handler, Retry::Attempts(2));
+
+               // Fail 1st path, leave 2nd path inflight
+               let payment_id = Some(invoice_payer.pay_invoice(&payment_invoice).unwrap());
+               invoice_payer.handle_event(&Event::PaymentPathFailed {
+                       payment_id,
+                       payment_hash,
+                       network_update: None,
+                       rejected_by_dest: false,
+                       all_paths_failed: false,
+                       path: TestRouter::path_for_value(final_value_msat),
+                       short_channel_id: None,
+                       retry: Some(TestRouter::retry_for_invoice(&payment_invoice)),
+               });
+
+               // Fails again the 1st path of our retry
+               invoice_payer.handle_event(&Event::PaymentPathFailed {
+                       payment_id,
+                       payment_hash,
+                       network_update: None,
+                       rejected_by_dest: false,
+                       all_paths_failed: false,
+                       path: TestRouter::path_for_value(final_value_msat / 2),
+                       short_channel_id: None,
+                       retry: Some(RouteParameters {
+                               final_value_msat: final_value_msat / 2,
+                               ..TestRouter::retry_for_invoice(&payment_invoice)
+                       }),
+               });
+       }
+
+       #[test]
+       fn accounts_for_some_inflight_htlcs_sent_during_partial_failure() {
+               let event_handled = core::cell::RefCell::new(false);
+               let event_handler = |_: &_| { *event_handled.borrow_mut() = true; };
+
+               let payment_preimage = PaymentPreimage([1; 32]);
+               let invoice_to_pay = invoice(payment_preimage);
+               let final_value_msat = invoice_to_pay.amount_milli_satoshis().unwrap();
+
+               let retry = TestRouter::retry_for_invoice(&invoice_to_pay);
+               let payer = TestPayer::new()
+                       .fails_with_partial_failure(
+                               retry.clone(), OnAttempt(1),
+                               Some(vec![
+                                       Err(ChannelUnavailable { err: "abc".to_string() }), Err(MonitorUpdateFailed)
+                               ]))
+                       .expect_send(Amount::ForInvoice(final_value_msat));
+
+               let router = TestRouter {};
+               let scorer = RefCell::new(TestScorer::new());
+               let logger = TestLogger::new();
+               let invoice_payer =
+                       InvoicePayer::new(&payer, router, &scorer, &logger, event_handler, Retry::Attempts(0));
+
+               invoice_payer.pay_invoice(&invoice_to_pay).unwrap();
+               let inflight_map = invoice_payer.create_inflight_map();
+
+               // Only the second path, which failed with `MonitorUpdateFailed` should be added to our
+               // inflight map because retries are disabled.
+               assert_eq!(inflight_map.len(), 1);
+       }
+
+       #[test]
+       fn accounts_for_all_inflight_htlcs_sent_during_partial_failure() {
+               let event_handled = core::cell::RefCell::new(false);
+               let event_handler = |_: &_| { *event_handled.borrow_mut() = true; };
+
+               let payment_preimage = PaymentPreimage([1; 32]);
+               let invoice_to_pay = invoice(payment_preimage);
+               let final_value_msat = invoice_to_pay.amount_milli_satoshis().unwrap();
+
+               let retry = TestRouter::retry_for_invoice(&invoice_to_pay);
+               let payer = TestPayer::new()
+                       .fails_with_partial_failure(
+                               retry.clone(), OnAttempt(1),
+                               Some(vec![
+                                       Ok(()), Err(MonitorUpdateFailed)
+                               ]))
+                       .expect_send(Amount::ForInvoice(final_value_msat));
+
+               let router = TestRouter {};
+               let scorer = RefCell::new(TestScorer::new());
+               let logger = TestLogger::new();
+               let invoice_payer =
+                       InvoicePayer::new(&payer, router, &scorer, &logger, event_handler, Retry::Attempts(0));
+
+               invoice_payer.pay_invoice(&invoice_to_pay).unwrap();
+               let inflight_map = invoice_payer.create_inflight_map();
+
+               // All paths successful, hence we check of the existence of all 4 hops.
+               assert_eq!(inflight_map.len(), 4);
+       }
+
        struct TestRouter;
 
        impl TestRouter {
                fn route_for_value(final_value_msat: u64) -> Route {
                        Route {
                                paths: vec![
-                                       vec![RouteHop {
-                                               pubkey: PublicKey::from_slice(&hex::decode("02eec7245d6b7d2ccb30380bfbe2a3648cd7a942653f5aa340edcea1f283686619").unwrap()[..]).unwrap(),
-                                               channel_features: ChannelFeatures::empty(),
-                                               node_features: NodeFeatures::empty(),
-                                               short_channel_id: 0, fee_msat: final_value_msat / 2, cltv_expiry_delta: 144
-                                       }],
+                                       vec![
+                                               RouteHop {
+                                                       pubkey: PublicKey::from_slice(&hex::decode("02eec7245d6b7d2ccb30380bfbe2a3648cd7a942653f5aa340edcea1f283686619").unwrap()[..]).unwrap(),
+                                                       channel_features: ChannelFeatures::empty(),
+                                                       node_features: NodeFeatures::empty(),
+                                                       short_channel_id: 0,
+                                                       fee_msat: 10,
+                                                       cltv_expiry_delta: 0
+                                               },
+                                               RouteHop {
+                                                       pubkey: PublicKey::from_slice(&hex::decode("0324653eac434488002cc06bbfb7f10fe18991e35f9fe4302dbea6d2353dc0ab1c").unwrap()[..]).unwrap(),
+                                                       channel_features: ChannelFeatures::empty(),
+                                                       node_features: NodeFeatures::empty(),
+                                                       short_channel_id: 1,
+                                                       fee_msat: 20,
+                                                       cltv_expiry_delta: 0
+                                               },
+                                               RouteHop {
+                                                       pubkey: PublicKey::from_slice(&hex::decode("027f31ebc5462c1fdce1b737ecff52d37d75dea43ce11c74d25aa297165faa2007").unwrap()[..]).unwrap(),
+                                                       channel_features: ChannelFeatures::empty(),
+                                                       node_features: NodeFeatures::empty(),
+                                                       short_channel_id: 2,
+                                                       fee_msat: final_value_msat / 2,
+                                                       cltv_expiry_delta: 0
+                                               },
+                                       ],
                                        vec![RouteHop {
                                                pubkey: PublicKey::from_slice(&hex::decode("0324653eac434488002cc06bbfb7f10fe18991e35f9fe4302dbea6d2353dc0ab1c").unwrap()[..]).unwrap(),
                                                channel_features: ChannelFeatures::empty(),
@@ -1424,11 +1820,24 @@ mod tests {
                }
        }
 
-       impl<S: Score> Router<S> for TestRouter {
-               fn find_route(
-                       &self, _payer: &PublicKey, route_params: &RouteParameters, _payment_hash: &PaymentHash,
-                       _first_hops: Option<&[&ChannelDetails]>, _scorer: &S
+       impl Router for TestRouter {
+               fn find_route<S: Score>(
+                       &self, payer: &PublicKey, route_params: &RouteParameters, _payment_hash: &PaymentHash,
+                       _first_hops: Option<&[&ChannelDetails]>, scorer: &S
                ) -> Result<Route, LightningError> {
+                       // Simulate calling the Scorer just as you would in find_route
+                       let route = Self::route_for_value(route_params.final_value_msat);
+                       for path in route.paths {
+                               for hop in path {
+                                       let usage = ChannelUsage {
+                                               amount_msat: hop.fee_msat,
+                                               inflight_htlc_msat: 0,
+                                               effective_capacity: EffectiveCapacity::Unknown,
+                                       };
+                                       scorer.channel_penalty_msat(hop.short_channel_id, &NodeId::from_pubkey(payer), &NodeId::from_pubkey(&hop.pubkey), usage);
+                               }
+                       }
+
                        Ok(Route {
                                payment_params: Some(route_params.payment_params.clone()), ..Self::route_for_value(route_params.final_value_msat)
                        })
@@ -1437,8 +1846,8 @@ mod tests {
 
        struct FailingRouter;
 
-       impl<S: Score> Router<S> for FailingRouter {
-               fn find_route(
+       impl Router for FailingRouter {
+               fn find_route<S: Score>(
                        &self, _payer: &PublicKey, _params: &RouteParameters, _payment_hash: &PaymentHash,
                        _first_hops: Option<&[&ChannelDetails]>, _scorer: &S
                ) -> Result<Route, LightningError> {
@@ -1447,7 +1856,8 @@ mod tests {
        }
 
        struct TestScorer {
-               expectations: Option<VecDeque<TestResult>>,
+               event_expectations: Option<VecDeque<TestResult>>,
+               scorer_expectations: RefCell<Option<VecDeque<ChannelUsage>>>,
        }
 
        #[derive(Debug)]
@@ -1459,12 +1869,18 @@ mod tests {
        impl TestScorer {
                fn new() -> Self {
                        Self {
-                               expectations: None,
+                               event_expectations: None,
+                               scorer_expectations: RefCell::new(None),
                        }
                }
 
                fn expect(mut self, expectation: TestResult) -> Self {
-                       self.expectations.get_or_insert_with(|| VecDeque::new()).push_back(expectation);
+                       self.event_expectations.get_or_insert_with(|| VecDeque::new()).push_back(expectation);
+                       self
+               }
+
+               fn expect_usage(self, expectation: ChannelUsage) -> Self {
+                       self.scorer_expectations.borrow_mut().get_or_insert_with(|| VecDeque::new()).push_back(expectation);
                        self
                }
        }
@@ -1476,11 +1892,22 @@ mod tests {
 
        impl Score for TestScorer {
                fn channel_penalty_msat(
-                       &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
-               ) -> u64 { 0 }
+                       &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, usage: ChannelUsage
+               ) -> u64 {
+                       if let Some(scorer_expectations) = self.scorer_expectations.borrow_mut().as_mut() {
+                               match scorer_expectations.pop_front() {
+                                       Some(expectation) => {
+                                               assert_eq!(expectation.amount_msat, usage.amount_msat);
+                                               assert_eq!(expectation.inflight_htlc_msat, usage.inflight_htlc_msat);
+                                       },
+                                       None => {},
+                               }
+                       }
+                       0
+               }
 
                fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
-                       if let Some(expectations) = &mut self.expectations {
+                       if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front() {
                                        Some(TestResult::PaymentFailure { path, short_channel_id }) => {
                                                assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
@@ -1495,7 +1922,7 @@ mod tests {
                }
 
                fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
-                       if let Some(expectations) = &mut self.expectations {
+                       if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front() {
                                        Some(TestResult::PaymentFailure { path, .. }) => {
                                                panic!("Unexpected payment path failure: {:?}", path)
@@ -1509,7 +1936,7 @@ mod tests {
                }
 
                fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
-                       if let Some(expectations) = &mut self.expectations {
+                       if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front() {
                                        Some(TestResult::PaymentFailure { path, .. }) => {
                                                panic!("Unexpected failed payment path: {:?}", path)
@@ -1522,7 +1949,7 @@ mod tests {
                        }
                }
                fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
-                       if let Some(expectations) = &mut self.expectations {
+                       if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front() {
                                        Some(TestResult::PaymentFailure { path, .. }) => {
                                                panic!("Unexpected payment path failure: {:?}", path)
@@ -1542,9 +1969,15 @@ mod tests {
                                return;
                        }
 
-                       if let Some(expectations) = &self.expectations {
-                               if !expectations.is_empty() {
-                                       panic!("Unsatisfied scorer expectations: {:?}", expectations);
+                       if let Some(event_expectations) = &self.event_expectations {
+                               if !event_expectations.is_empty() {
+                                       panic!("Unsatisfied event expectations: {:?}", event_expectations);
+                               }
+                       }
+
+                       if let Some(scorer_expectations) = self.scorer_expectations.borrow().as_ref() {
+                               if !scorer_expectations.is_empty() {
+                                       panic!("Unsatisfied scorer expectations: {:?}", scorer_expectations)
                                }
                        }
                }
@@ -1584,9 +2017,9 @@ mod tests {
                        self.fails_with(failure, OnAttempt(attempt))
                }
 
-               fn fails_with_partial_failure(self, retry: RouteParameters, attempt: OnAttempt) -> Self {
+               fn fails_with_partial_failure(self, retry: RouteParameters, attempt: OnAttempt, results: Option<Vec<Result<(), APIError>>>) -> Self {
                        self.fails_with(PaymentSendFailure::PartialFailure {
-                               results: vec![],
+                               results: results.unwrap_or(vec![]),
                                failed_paths_retry: Some(retry),
                                payment_id: PaymentId([1; 32]),
                        }, attempt)
@@ -1667,8 +2100,8 @@ mod tests {
        // *** Full Featured Functional Tests with a Real ChannelManager ***
        struct ManualRouter(RefCell<VecDeque<Result<Route, LightningError>>>);
 
-       impl<S: Score> Router<S> for ManualRouter {
-               fn find_route(
+       impl Router for ManualRouter {
+               fn find_route<S: Score>(
                        &self, _payer: &PublicKey, _params: &RouteParameters, _payment_hash: &PaymentHash,
                        _first_hops: Option<&[&ChannelDetails]>, _scorer: &S
                ) -> Result<Route, LightningError> {
index 9ef2d2c8d39214e5a94423fa0c3e98006da9051c..72d022eeab123e228427f1fee48c2125661b0f0b 100644 (file)
@@ -455,9 +455,9 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> DefaultRouter<G, L> where L::
        }
 }
 
-impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, S: Score> Router<S> for DefaultRouter<G, L>
+impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> Router for DefaultRouter<G, L>
 where L::Target: Logger {
-       fn find_route(
+       fn find_route<S: Score>(
                &self, payer: &PublicKey, params: &RouteParameters, _payment_hash: &PaymentHash,
                first_hops: Option<&[&ChannelDetails]>, scorer: &S
        ) -> Result<Route, LightningError> {
index 160609216e84bc2c36ee0f1d2f6388fff657d9ac..637a8c046ed2944bb3f32ceb348584881666c1ec 100644 (file)
@@ -915,7 +915,7 @@ impl<'a> fmt::Debug for DirectedChannelInfoWithUpdate<'a> {
 ///
 /// While this may be smaller than the actual channel capacity, amounts greater than
 /// [`Self::as_msat`] should not be routed through the channel.
-#[derive(Clone, Copy)]
+#[derive(Clone, Copy, Debug)]
 pub enum EffectiveCapacity {
        /// The available liquidity in the channel known from being a channel counterparty, and thus a
        /// direct hop.
index 6ae339eae2bbc01776b840d655c2ef68ce6e83aa..a2492accf59cd9fe33fdf9d8ed533d8ff234c5a4 100644 (file)
@@ -221,7 +221,7 @@ impl<'a, S: Writeable> Writeable for MutexGuard<'a, S> {
 }
 
 /// Proposed use of a channel passed as a parameter to [`Score::channel_penalty_msat`].
-#[derive(Clone, Copy)]
+#[derive(Clone, Copy, Debug)]
 pub struct ChannelUsage {
        /// The amount to send through the channel, denominated in millisatoshis.
        pub amount_msat: u64,