From 3695b2aa13dcd271c9c4252cda3a0b9b79d3d879 Mon Sep 17 00:00:00 2001 From: jbesraa Date: Tue, 22 Aug 2023 18:57:06 +0300 Subject: [PATCH] Split LockableScore responsibilities between read & write operations - Split Score from LockableScore to ScoreLookUp to handle read operations and ScoreUpdate to handle write operations - Change all struct that implemented Score to implement ScoreLookUp and/or ScoreUpdate - Change Mutex's to RwLocks to allow multiple data readers - Change LockableScore to Deref in ScorerAccountingForInFlightHtlcs as we only need to read - Add ScoreLookUp and ScoreUpdate docs - Remove reference(&'a) and Sized from Score in ScorerAccountingForInFlightHtlcs as Score implements Deref - Split MultiThreadedScoreLock into MultiThreadedScoreLockWrite and MultiThreadedScoreLockRead. After splitting LockableScore, we split MultiThreadedScoreLock following the same way, splitting a single score into two srtucts, one for read and other for write. MultiThreadedScoreLock is used in c_bindings. --- lightning-background-processor/src/lib.rs | 14 +- lightning/src/ln/channelmanager.rs | 4 +- lightning/src/ln/functional_test_utils.rs | 8 +- lightning/src/ln/functional_tests.rs | 4 +- lightning/src/ln/outbound_payment.rs | 8 +- lightning/src/ln/payment_tests.rs | 2 +- lightning/src/routing/router.rs | 99 ++++-------- lightning/src/routing/scoring.rs | 179 +++++++++++++++------- lightning/src/util/test_utils.rs | 19 ++- 9 files changed, 191 insertions(+), 146 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 1ed6a2a83..8648920ec 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -34,7 +34,7 @@ use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; use lightning::routing::router::Router; -use lightning::routing::scoring::{Score, WriteableScore}; +use lightning::routing::scoring::{ScoreUpdate, WriteableScore}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; #[cfg(feature = "std")] @@ -241,23 +241,27 @@ fn handle_network_graph_update( fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + WriteableScore<'a>>( scorer: &'a S, event: &Event ) -> bool { - let mut score = scorer.lock(); match event { Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => { + let mut score = scorer.write_lock(); score.payment_path_failed(path, *scid); }, Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => { // Reached if the destination explicitly failed it back. We treat this as a successful probe // because the payment made it all the way to the destination with sufficient liquidity. + let mut score = scorer.write_lock(); score.probe_successful(path); }, Event::PaymentPathSuccessful { path, .. } => { + let mut score = scorer.write_lock(); score.payment_path_successful(path); }, Event::ProbeSuccessful { path, .. } => { + let mut score = scorer.write_lock(); score.probe_successful(path); }, Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => { + let mut score = scorer.write_lock(); score.probe_failed(path, *scid); }, _ => return false, @@ -858,7 +862,7 @@ mod tests { use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; use lightning::routing::router::{DefaultRouter, Path, RouteHop}; - use lightning::routing::scoring::{ChannelUsage, Score}; + use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp}; use lightning::util::config::UserConfig; use lightning::util::ser::Writeable; use lightning::util::test_utils; @@ -1033,12 +1037,14 @@ mod tests { fn write(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) } } - impl Score for TestScorer { + impl ScoreLookUp for TestScorer { type ScoreParams = (); fn channel_penalty_msat( &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams ) -> u64 { unimplemented!(); } + } + impl ScoreUpdate for TestScorer { fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) { if let Some(expectations) = &mut self.event_expectations { match expectations.pop_front().unwrap() { diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 55980dd7a..950451090 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -10582,7 +10582,7 @@ pub mod bench { use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::{Block, BlockHeader, PackedLockTime, Transaction, TxMerkleNode, TxOut}; - use crate::sync::{Arc, Mutex}; + use crate::sync::{Arc, Mutex, RwLock}; use criterion::Criterion; @@ -10619,7 +10619,7 @@ pub mod bench { let tx_broadcaster = test_utils::TestBroadcaster::new(network); let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; let logger_a = test_utils::TestLogger::with_id("node a".to_owned()); - let scorer = Mutex::new(test_utils::TestScorer::new()); + let scorer = RwLock::new(test_utils::TestScorer::new()); let router = test_utils::TestRouter::new(Arc::new(NetworkGraph::new(network, &logger_a)), &scorer); let mut config: UserConfig = Default::default(); diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 34568b07c..f5abe4176 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -43,7 +43,7 @@ use crate::io; use crate::prelude::*; use core::cell::RefCell; use alloc::rc::Rc; -use crate::sync::{Arc, Mutex, LockTestExt}; +use crate::sync::{Arc, Mutex, LockTestExt, RwLock}; use core::mem; use core::iter::repeat; use bitcoin::{PackedLockTime, TxMerkleNode}; @@ -352,7 +352,7 @@ pub struct TestChanMonCfg { pub persister: test_utils::TestPersister, pub logger: test_utils::TestLogger, pub keys_manager: test_utils::TestKeysInterface, - pub scorer: Mutex, + pub scorer: RwLock, } pub struct NodeCfg<'a> { @@ -539,7 +539,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { channel_monitors.insert(monitor.get_funding_txo().0, monitor); } - let scorer = Mutex::new(test_utils::TestScorer::new()); + let scorer = RwLock::new(test_utils::TestScorer::new()); let mut w = test_utils::TestVecWriter(Vec::new()); self.node.write(&mut w).unwrap(); <(BlockHash, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestLogger>)>::read(&mut io::Cursor::new(w.0), ChannelManagerReadArgs { @@ -2634,7 +2634,7 @@ pub fn create_chanmon_cfgs(node_count: usize) -> Vec { let persister = test_utils::TestPersister::new(); let seed = [i as u8; 32]; let keys_manager = test_utils::TestKeysInterface::new(&seed, Network::Testnet); - let scorer = Mutex::new(test_utils::TestScorer::new()); + let scorer = RwLock::new(test_utils::TestScorer::new()); chan_mon_cfgs.push(TestChanMonCfg { tx_broadcaster, fee_estimator, chain_source, logger, persister, keys_manager, scorer }); } diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 2fbc36ce9..998c561a8 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -56,7 +56,7 @@ use alloc::collections::BTreeSet; use core::default::Default; use core::iter::repeat; use bitcoin::hashes::Hash; -use crate::sync::{Arc, Mutex}; +use crate::sync::{Arc, Mutex, RwLock}; use crate::ln::functional_test_utils::*; use crate::ln::chan_utils::CommitmentTransaction; @@ -5434,7 +5434,7 @@ fn test_key_derivation_params() { let keys_manager = test_utils::TestKeysInterface::new(&seed, Network::Testnet); let chain_monitor = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &chanmon_cfgs[0].persister, &keys_manager); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &chanmon_cfgs[0].logger)); - let scorer = Mutex::new(test_utils::TestScorer::new()); + let scorer = RwLock::new(test_utils::TestScorer::new()); let router = test_utils::TestRouter::new(network_graph.clone(), &scorer); let node = NodeCfg { chain_source: &chanmon_cfgs[0].chain_source, logger: &chanmon_cfgs[0].logger, tx_broadcaster: &chanmon_cfgs[0].tx_broadcaster, fee_estimator: &chanmon_cfgs[0].fee_estimator, router, chain_monitor, keys_manager: &keys_manager, network_graph, node_seed: seed, override_init_features: alloc::rc::Rc::new(core::cell::RefCell::new(None)) }; let mut node_cfgs = create_node_cfgs(3, &chanmon_cfgs); diff --git a/lightning/src/ln/outbound_payment.rs b/lightning/src/ln/outbound_payment.rs index 4d6f1f00b..f60bf565e 100644 --- a/lightning/src/ln/outbound_payment.rs +++ b/lightning/src/ln/outbound_payment.rs @@ -1516,7 +1516,7 @@ mod tests { use crate::ln::outbound_payment::{OutboundPayments, Retry, RetryableSendFailure}; use crate::routing::gossip::NetworkGraph; use crate::routing::router::{InFlightHtlcs, Path, PaymentParameters, Route, RouteHop, RouteParameters}; - use crate::sync::{Arc, Mutex}; + use crate::sync::{Arc, Mutex, RwLock}; use crate::util::errors::APIError; use crate::util::test_utils; @@ -1555,7 +1555,7 @@ mod tests { let outbound_payments = OutboundPayments::new(); let logger = test_utils::TestLogger::new(); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &logger)); - let scorer = Mutex::new(test_utils::TestScorer::new()); + let scorer = RwLock::new(test_utils::TestScorer::new()); let router = test_utils::TestRouter::new(network_graph, &scorer); let secp_ctx = Secp256k1::new(); let keys_manager = test_utils::TestKeysInterface::new(&[0; 32], Network::Testnet); @@ -1602,7 +1602,7 @@ mod tests { let outbound_payments = OutboundPayments::new(); let logger = test_utils::TestLogger::new(); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &logger)); - let scorer = Mutex::new(test_utils::TestScorer::new()); + let scorer = RwLock::new(test_utils::TestScorer::new()); let router = test_utils::TestRouter::new(network_graph, &scorer); let secp_ctx = Secp256k1::new(); let keys_manager = test_utils::TestKeysInterface::new(&[0; 32], Network::Testnet); @@ -1644,7 +1644,7 @@ mod tests { let outbound_payments = OutboundPayments::new(); let logger = test_utils::TestLogger::new(); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &logger)); - let scorer = Mutex::new(test_utils::TestScorer::new()); + let scorer = RwLock::new(test_utils::TestScorer::new()); let router = test_utils::TestRouter::new(network_graph, &scorer); let secp_ctx = Secp256k1::new(); let keys_manager = test_utils::TestKeysInterface::new(&[0; 32], Network::Testnet); diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index c26628571..4df2e24e2 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -2544,7 +2544,7 @@ fn retry_multi_path_single_failed_payment() { }, Ok(route.clone())); { - let scorer = chanmon_cfgs[0].scorer.lock().unwrap(); + let scorer = chanmon_cfgs[0].scorer.read().unwrap(); // The initial send attempt, 2 paths scorer.expect_usage(chans[0].short_channel_id.unwrap(), ChannelUsage { amount_msat: 10_000, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown }); scorer.expect_usage(chans[1].short_channel_id.unwrap(), ChannelUsage { amount_msat: 100_000_001, inflight_htlc_msat: 0, effective_capacity: EffectiveCapacity::Unknown }); diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index ded4a5595..962080b0d 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -20,22 +20,22 @@ use crate::ln::features::{Bolt11InvoiceFeatures, Bolt12InvoiceFeatures, ChannelF use crate::ln::msgs::{DecodeError, ErrorAction, LightningError, MAX_VALUE_MSAT}; use crate::offers::invoice::{BlindedPayInfo, Bolt12Invoice}; use crate::routing::gossip::{DirectedChannelInfo, EffectiveCapacity, ReadOnlyNetworkGraph, NetworkGraph, NodeId, RoutingFees}; -use crate::routing::scoring::{ChannelUsage, LockableScore, Score}; +use crate::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp}; use crate::util::ser::{Writeable, Readable, ReadableArgs, Writer}; use crate::util::logger::{Level, Logger}; use crate::util::chacha20::ChaCha20; use crate::io; use crate::prelude::*; -use crate::sync::{Mutex}; +use crate::sync::Mutex; use alloc::collections::BinaryHeap; use core::{cmp, fmt}; -use core::ops::{Deref, DerefMut}; +use core::ops::Deref; /// A [`Router`] implemented using [`find_route`]. -pub struct DefaultRouter>, L: Deref, S: Deref, SP: Sized, Sc: Score> where +pub struct DefaultRouter>, L: Deref, S: Deref, SP: Sized, Sc: ScoreLookUp> where L::Target: Logger, - S::Target: for <'a> LockableScore<'a, Score = Sc>, + S::Target: for <'a> LockableScore<'a, ScoreLookUp = Sc>, { network_graph: G, logger: L, @@ -44,9 +44,9 @@ pub struct DefaultRouter>, L: Deref, S: Deref, score_params: SP } -impl>, L: Deref, S: Deref, SP: Sized, Sc: Score> DefaultRouter where +impl>, L: Deref, S: Deref, SP: Sized, Sc: ScoreLookUp> DefaultRouter where L::Target: Logger, - S::Target: for <'a> LockableScore<'a, Score = Sc>, + S::Target: for <'a> LockableScore<'a, ScoreLookUp = Sc>, { /// Creates a new router. pub fn new(network_graph: G, logger: L, random_seed_bytes: [u8; 32], scorer: S, score_params: SP) -> Self { @@ -55,9 +55,9 @@ impl>, L: Deref, S: Deref, SP: Sized, Sc: Scor } } -impl< G: Deref>, L: Deref, S: Deref, SP: Sized, Sc: Score> Router for DefaultRouter where +impl< G: Deref>, L: Deref, S: Deref, SP: Sized, Sc: ScoreLookUp> Router for DefaultRouter where L::Target: Logger, - S::Target: for <'a> LockableScore<'a, Score = Sc>, + S::Target: for <'a> LockableScore<'a, ScoreLookUp = Sc>, { fn find_route( &self, @@ -73,7 +73,7 @@ impl< G: Deref>, L: Deref, S: Deref, SP: Sized, Sc: Sco }; find_route( payer, params, &self.network_graph, first_hops, &*self.logger, - &ScorerAccountingForInFlightHtlcs::new(self.scorer.lock().deref_mut(), &inflight_htlcs), + &ScorerAccountingForInFlightHtlcs::new(self.scorer.read_lock(), &inflight_htlcs), &self.score_params, &random_seed_bytes ) @@ -106,21 +106,20 @@ pub trait Router { } } -/// [`Score`] implementation that factors in in-flight HTLC liquidity. +/// [`ScoreLookUp`] implementation that factors in in-flight HTLC liquidity. /// -/// Useful for custom [`Router`] implementations to wrap their [`Score`] on-the-fly when calling +/// Useful for custom [`Router`] implementations to wrap their [`ScoreLookUp`] on-the-fly when calling /// [`find_route`]. /// -/// [`Score`]: crate::routing::scoring::Score -pub struct ScorerAccountingForInFlightHtlcs<'a, S: Score, SP: Sized> { - scorer: &'a mut S, +/// [`ScoreLookUp`]: crate::routing::scoring::ScoreLookUp +pub struct ScorerAccountingForInFlightHtlcs<'a, SP: Sized, Sc: 'a + ScoreLookUp, S: Deref> { + scorer: S, // Maps a channel's short channel id and its direction to the liquidity used up. inflight_htlcs: &'a InFlightHtlcs, } - -impl<'a, S: Score, SP: Sized> ScorerAccountingForInFlightHtlcs<'a, S, SP> { +impl<'a, SP: Sized, Sc: ScoreLookUp, S: Deref> ScorerAccountingForInFlightHtlcs<'a, SP, Sc, S> { /// Initialize a new `ScorerAccountingForInFlightHtlcs`. - pub fn new(scorer: &'a mut S, inflight_htlcs: &'a InFlightHtlcs) -> Self { + pub fn new(scorer: S, inflight_htlcs: &'a InFlightHtlcs) -> Self { ScorerAccountingForInFlightHtlcs { scorer, inflight_htlcs @@ -129,12 +128,12 @@ impl<'a, S: Score, SP: Sized> ScorerAccountingForInFlightHtlcs } #[cfg(c_bindings)] -impl<'a, S: Score, SP: Sized> Writeable for ScorerAccountingForInFlightHtlcs<'a, S, SP> { +impl<'a, SP: Sized, Sc: ScoreLookUp, S: Deref> Writeable for ScorerAccountingForInFlightHtlcs<'a, SP, Sc, S> { fn write(&self, writer: &mut W) -> Result<(), io::Error> { self.scorer.write(writer) } } -impl<'a, S: Score, SP: Sized> Score for ScorerAccountingForInFlightHtlcs<'a, S, SP> { - type ScoreParams = S::ScoreParams; +impl<'a, SP: Sized, Sc: 'a + ScoreLookUp, S: Deref> ScoreLookUp for ScorerAccountingForInFlightHtlcs<'a, SP, Sc, S> { + type ScoreParams = Sc::ScoreParams; fn channel_penalty_msat(&self, short_channel_id: u64, source: &NodeId, target: &NodeId, usage: ChannelUsage, score_params: &Self::ScoreParams) -> u64 { if let Some(used_liquidity) = self.inflight_htlcs.used_liquidity_msat( source, target, short_channel_id @@ -149,22 +148,6 @@ impl<'a, S: Score, SP: Sized> Score for ScorerAccountingForInF self.scorer.channel_penalty_msat(short_channel_id, source, target, usage, score_params) } } - - fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64) { - self.scorer.payment_path_failed(path, short_channel_id) - } - - fn payment_path_successful(&mut self, path: &Path) { - self.scorer.payment_path_successful(path) - } - - fn probe_failed(&mut self, path: &Path, short_channel_id: u64) { - self.scorer.probe_failed(path, short_channel_id) - } - - fn probe_successful(&mut self, path: &Path) { - self.scorer.probe_successful(path) - } } /// A data structure for tracking in-flight HTLCs. May be used during pathfinding to account for @@ -1410,7 +1393,7 @@ fn sort_first_hop_channels( /// [`ChannelManager::list_usable_channels`]: crate::ln::channelmanager::ChannelManager::list_usable_channels /// [`Event::PaymentPathFailed`]: crate::events::Event::PaymentPathFailed /// [`NetworkGraph`]: crate::routing::gossip::NetworkGraph -pub fn find_route( +pub fn find_route( our_node_pubkey: &PublicKey, route_params: &RouteParameters, network_graph: &NetworkGraph, first_hops: Option<&[&ChannelDetails]>, logger: L, scorer: &S, score_params: &S::ScoreParams, random_seed_bytes: &[u8; 32] @@ -1424,7 +1407,7 @@ where L::Target: Logger, GL::Target: Logger { Ok(route) } -pub(crate) fn get_route( +pub(crate) fn get_route( our_node_pubkey: &PublicKey, payment_params: &PaymentParameters, network_graph: &ReadOnlyNetworkGraph, first_hops: Option<&[&ChannelDetails]>, final_value_msat: u64, logger: L, scorer: &S, score_params: &S::ScoreParams, _random_seed_bytes: &[u8; 32] @@ -2614,7 +2597,7 @@ fn build_route_from_hops_internal( hop_ids: [Option; MAX_PATH_LENGTH_ESTIMATE as usize], } - impl Score for HopScorer { + impl ScoreLookUp for HopScorer { type ScoreParams = (); fn channel_penalty_msat(&self, _short_channel_id: u64, source: &NodeId, target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams) -> u64 @@ -2632,14 +2615,6 @@ fn build_route_from_hops_internal( } u64::max_value() } - - fn payment_path_failed(&mut self, _path: &Path, _short_channel_id: u64) {} - - fn payment_path_successful(&mut self, _path: &Path) {} - - fn probe_failed(&mut self, _path: &Path, _short_channel_id: u64) {} - - fn probe_successful(&mut self, _path: &Path) {} } impl<'a> Writeable for HopScorer { @@ -2673,7 +2648,7 @@ mod tests { use crate::routing::router::{get_route, build_route_from_hops_internal, add_random_cltv_offset, default_node_features, BlindedTail, InFlightHtlcs, Path, PaymentParameters, Route, RouteHint, RouteHintHop, RouteHop, RoutingFees, DEFAULT_MAX_TOTAL_CLTV_EXPIRY_DELTA, MAX_PATH_LENGTH_ESTIMATE}; - use crate::routing::scoring::{ChannelUsage, FixedPenaltyScorer, Score, ProbabilisticScorer, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters}; + use crate::routing::scoring::{ChannelUsage, FixedPenaltyScorer, ScoreLookUp, ProbabilisticScorer, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters}; use crate::routing::test_utils::{add_channel, add_or_update_node, build_graph, build_line_graph, id_to_feature_flags, get_nodes, update_channel}; use crate::chain::transaction::OutPoint; use crate::sign::EntropySource; @@ -5720,16 +5695,11 @@ mod tests { impl Writeable for BadChannelScorer { fn write(&self, _w: &mut W) -> Result<(), crate::io::Error> { unimplemented!() } } - impl Score for BadChannelScorer { + impl ScoreLookUp for BadChannelScorer { type ScoreParams = (); fn channel_penalty_msat(&self, short_channel_id: u64, _: &NodeId, _: &NodeId, _: ChannelUsage, _score_params:&Self::ScoreParams) -> u64 { if short_channel_id == self.short_channel_id { u64::max_value() } else { 0 } } - - fn payment_path_failed(&mut self, _path: &Path, _short_channel_id: u64) {} - fn payment_path_successful(&mut self, _path: &Path) {} - fn probe_failed(&mut self, _path: &Path, _short_channel_id: u64) {} - fn probe_successful(&mut self, _path: &Path) {} } struct BadNodeScorer { @@ -5741,16 +5711,11 @@ mod tests { fn write(&self, _w: &mut W) -> Result<(), crate::io::Error> { unimplemented!() } } - impl Score for BadNodeScorer { + impl ScoreLookUp for BadNodeScorer { type ScoreParams = (); fn channel_penalty_msat(&self, _: u64, _: &NodeId, target: &NodeId, _: ChannelUsage, _score_params:&Self::ScoreParams) -> u64 { if *target == self.node_id { u64::max_value() } else { 0 } } - - fn payment_path_failed(&mut self, _path: &Path, _short_channel_id: u64) {} - fn payment_path_successful(&mut self, _path: &Path) {} - fn probe_failed(&mut self, _path: &Path, _short_channel_id: u64) {} - fn probe_successful(&mut self, _path: &Path) {} } #[test] @@ -6721,6 +6686,7 @@ pub(crate) mod bench_utils { use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; use crate::chain::transaction::OutPoint; + use crate::routing::scoring::ScoreUpdate; use crate::sign::{EntropySource, KeysManager}; use crate::ln::channelmanager::{self, ChannelCounterparty, ChannelDetails}; use crate::ln::features::Bolt11InvoiceFeatures; @@ -6813,7 +6779,7 @@ pub(crate) mod bench_utils { } } - pub(crate) fn generate_test_routes(graph: &NetworkGraph<&TestLogger>, scorer: &mut S, + pub(crate) fn generate_test_routes(graph: &NetworkGraph<&TestLogger>, scorer: &mut S, score_params: &S::ScoreParams, features: Bolt11InvoiceFeatures, mut seed: u64, starting_amount: u64, route_count: usize, ) -> Vec<(ChannelDetails, PaymentParameters, u64)> { @@ -6839,7 +6805,7 @@ pub(crate) mod bench_utils { let amt = starting_amount + seed % 1_000_000; let path_exists = get_route(&payer, ¶ms, &graph.read_only(), Some(&[&first_hop]), - amt, &TestLogger::new(), &scorer, score_params, &random_seed_bytes).is_ok(); + amt, &TestLogger::new(), scorer, score_params, &random_seed_bytes).is_ok(); if path_exists { // ...and seed the scorer with success and failure data... seed = seed.overflowing_mul(6364136223846793005).0.overflowing_add(1).0; @@ -6853,7 +6819,7 @@ pub(crate) mod bench_utils { .with_bolt11_features(mpp_features).unwrap(); let route_res = get_route(&payer, ¶ms, &graph.read_only(), - Some(&[&first_hop]), score_amt, &TestLogger::new(), &scorer, + Some(&[&first_hop]), score_amt, &TestLogger::new(), scorer, score_params, &random_seed_bytes); if let Ok(route) = route_res { for path in route.paths { @@ -6882,7 +6848,7 @@ pub(crate) mod bench_utils { // requires a too-high CLTV delta. route_endpoints.retain(|(first_hop, params, amt)| { get_route(&payer, params, &graph.read_only(), Some(&[first_hop]), *amt, - &TestLogger::new(), &scorer, score_params, &random_seed_bytes).is_ok() + &TestLogger::new(), scorer, score_params, &random_seed_bytes).is_ok() }); route_endpoints.truncate(route_count); assert_eq!(route_endpoints.len(), route_count); @@ -6893,6 +6859,7 @@ pub(crate) mod bench_utils { #[cfg(ldk_bench)] pub mod benches { use super::*; + use crate::routing::scoring::{ScoreUpdate, ScoreLookUp}; use crate::sign::{EntropySource, KeysManager}; use crate::ln::channelmanager; use crate::ln::features::Bolt11InvoiceFeatures; @@ -6955,7 +6922,7 @@ pub mod benches { "generate_large_mpp_routes_with_probabilistic_scorer"); } - fn generate_routes( + fn generate_routes( bench: &mut Criterion, graph: &NetworkGraph<&TestLogger>, mut scorer: S, score_params: &S::ScoreParams, features: Bolt11InvoiceFeatures, starting_amount: u64, bench_name: &'static str, diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 75dae7224..34bf1da13 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -10,7 +10,7 @@ //! Utilities for scoring payment channels. //! //! [`ProbabilisticScorer`] may be given to [`find_route`] to score payment channels during path -//! finding when a custom [`Score`] implementation is not needed. +//! finding when a custom [`ScoreLookUp`] implementation is not needed. //! //! # Example //! @@ -65,12 +65,12 @@ use crate::util::time::Time; use crate::prelude::*; use core::{cmp, fmt}; -use core::cell::{RefCell, RefMut}; +use core::cell::{RefCell, RefMut, Ref}; use core::convert::TryInto; use core::ops::{Deref, DerefMut}; use core::time::Duration; use crate::io::{self, Read}; -use crate::sync::{Mutex, MutexGuard}; +use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; /// We define Score ever-so-slightly differently based on whether we are being built for C bindings /// or not. For users, `LockableScore` must somehow be writeable to disk. For Rust users, this is @@ -86,8 +86,10 @@ use crate::sync::{Mutex, MutexGuard}; macro_rules! define_score { ($($supertrait: path)*) => { /// An interface used to score payment channels for path finding. /// -/// Scoring is in terms of fees willing to be paid in order to avoid routing through a channel. -pub trait Score $(: $supertrait)* { +/// `ScoreLookUp` is used to determine the penalty for a given channel. +/// +/// Scoring is in terms of fees willing to be paid in order to avoid routing through a channel. +pub trait ScoreLookUp $(: $supertrait)* { /// A configurable type which should contain various passed-in parameters for configuring the scorer, /// on a per-routefinding-call basis through to the scorer methods, /// which are used to determine the parameters for the suitability of channels for use. @@ -103,7 +105,10 @@ pub trait Score $(: $supertrait)* { fn channel_penalty_msat( &self, short_channel_id: u64, source: &NodeId, target: &NodeId, usage: ChannelUsage, score_params: &Self::ScoreParams ) -> u64; +} +/// `ScoreUpdate` is used to update the scorer's internal state after a payment attempt. +pub trait ScoreUpdate $(: $supertrait)* { /// Handles updating channel penalties after failing to route through a channel. fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64); @@ -117,14 +122,16 @@ pub trait Score $(: $supertrait)* { fn probe_successful(&mut self, path: &Path); } -impl $(+ $supertrait)*> Score for T { - type ScoreParams = S::ScoreParams; +impl, T: Deref $(+ $supertrait)*> ScoreLookUp for T { + type ScoreParams = SP; fn channel_penalty_msat( &self, short_channel_id: u64, source: &NodeId, target: &NodeId, usage: ChannelUsage, score_params: &Self::ScoreParams ) -> u64 { self.deref().channel_penalty_msat(short_channel_id, source, target, usage, score_params) } +} +impl $(+ $supertrait)*> ScoreUpdate for T { fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64) { self.deref_mut().payment_path_failed(path, short_channel_id) } @@ -145,26 +152,35 @@ impl $(+ $supertrait)*> Score for T { #[cfg(c_bindings)] define_score!(Writeable); + #[cfg(not(c_bindings))] define_score!(); /// A scorer that is accessed under a lock. /// -/// Needed so that calls to [`Score::channel_penalty_msat`] in [`find_route`] can be made while -/// having shared ownership of a scorer but without requiring internal locking in [`Score`] +/// Needed so that calls to [`ScoreLookUp::channel_penalty_msat`] in [`find_route`] can be made while +/// having shared ownership of a scorer but without requiring internal locking in [`ScoreUpdate`] /// implementations. Internal locking would be detrimental to route finding performance and could -/// result in [`Score::channel_penalty_msat`] returning a different value for the same channel. +/// result in [`ScoreLookUp::channel_penalty_msat`] returning a different value for the same channel. /// /// [`find_route`]: crate::routing::router::find_route pub trait LockableScore<'a> { - /// The [`Score`] type. - type Score: 'a + Score; + /// The [`ScoreUpdate`] type. + type ScoreUpdate: 'a + ScoreUpdate; + /// The [`ScoreLookUp`] type. + type ScoreLookUp: 'a + ScoreLookUp; + + /// The write locked [`ScoreUpdate`] type. + type WriteLocked: DerefMut + Sized; - /// The locked [`Score`] type. - type Locked: DerefMut + Sized; + /// The read locked [`ScoreLookUp`] type. + type ReadLocked: Deref + Sized; - /// Returns the locked scorer. - fn lock(&'a self) -> Self::Locked; + /// Returns read locked scorer. + fn read_lock(&'a self) -> Self::ReadLocked; + + /// Returns write locked scorer. + fn write_lock(&'a self) -> Self::WriteLocked; } /// Refers to a scorer that is accessible under lock and also writeable to disk @@ -176,89 +192,138 @@ pub trait WriteableScore<'a>: LockableScore<'a> + Writeable {} #[cfg(not(c_bindings))] impl<'a, T> WriteableScore<'a> for T where T: LockableScore<'a> + Writeable {} #[cfg(not(c_bindings))] -impl<'a, T: 'a + Score> LockableScore<'a> for Mutex { - type Score = T; - type Locked = MutexGuard<'a, T>; +impl<'a, T: 'a + ScoreLookUp + ScoreUpdate> LockableScore<'a> for Mutex { + type ScoreUpdate = T; + type ScoreLookUp = T; + + type WriteLocked = MutexGuard<'a, Self::ScoreUpdate>; + type ReadLocked = MutexGuard<'a, Self::ScoreLookUp>; + + fn read_lock(&'a self) -> Self::ReadLocked { + Mutex::lock(self).unwrap() + } - fn lock(&'a self) -> Self::Locked { + fn write_lock(&'a self) -> Self::WriteLocked { Mutex::lock(self).unwrap() } } #[cfg(not(c_bindings))] -impl<'a, T: 'a + Score> LockableScore<'a> for RefCell { - type Score = T; - type Locked = RefMut<'a, T>; +impl<'a, T: 'a + ScoreUpdate + ScoreLookUp> LockableScore<'a> for RefCell { + type ScoreUpdate = T; + type ScoreLookUp = T; - fn lock(&'a self) -> Self::Locked { + type WriteLocked = RefMut<'a, Self::ScoreUpdate>; + type ReadLocked = Ref<'a, Self::ScoreLookUp>; + + fn write_lock(&'a self) -> Self::WriteLocked { self.borrow_mut() } + + fn read_lock(&'a self) -> Self::ReadLocked { + self.borrow() + } +} + +#[cfg(not(c_bindings))] +impl<'a, SP:Sized, T: 'a + ScoreUpdate + ScoreLookUp> LockableScore<'a> for RwLock { + type ScoreUpdate = T; + type ScoreLookUp = T; + + type WriteLocked = RwLockWriteGuard<'a, Self::ScoreLookUp>; + type ReadLocked = RwLockReadGuard<'a, Self::ScoreUpdate>; + + fn read_lock(&'a self) -> Self::ReadLocked { + RwLock::read(self).unwrap() + } + + fn write_lock(&'a self) -> Self::WriteLocked { + RwLock::write(self).unwrap() + } } #[cfg(c_bindings)] /// A concrete implementation of [`LockableScore`] which supports multi-threading. -pub struct MultiThreadedLockableScore { - score: Mutex, +pub struct MultiThreadedLockableScore { + score: RwLock, } #[cfg(c_bindings)] -impl<'a, T: 'a + Score> LockableScore<'a> for MultiThreadedLockableScore { - type Score = T; - type Locked = MultiThreadedScoreLock<'a, T>; +impl<'a, SP:Sized, T: 'a + ScoreLookUp + ScoreUpdate> LockableScore<'a> for MultiThreadedLockableScore { + type ScoreUpdate = T; + type ScoreLookUp = T; + type WriteLocked = MultiThreadedScoreLockWrite<'a, Self::ScoreUpdate>; + type ReadLocked = MultiThreadedScoreLockRead<'a, Self::ScoreLookUp>; + + fn read_lock(&'a self) -> Self::ReadLocked { + MultiThreadedScoreLockRead(self.score.read().unwrap()) + } - fn lock(&'a self) -> Self::Locked { - MultiThreadedScoreLock(Mutex::lock(&self.score).unwrap()) + fn write_lock(&'a self) -> Self::WriteLocked { + MultiThreadedScoreLockWrite(self.score.write().unwrap()) } } #[cfg(c_bindings)] -impl Writeable for MultiThreadedLockableScore { +impl Writeable for MultiThreadedLockableScore { fn write(&self, writer: &mut W) -> Result<(), io::Error> { - self.lock().write(writer) + self.score.read().unwrap().write(writer) } } #[cfg(c_bindings)] -impl<'a, T: 'a + Score> WriteableScore<'a> for MultiThreadedLockableScore {} +impl<'a, T: 'a + ScoreUpdate + ScoreLookUp> WriteableScore<'a> for MultiThreadedLockableScore {} #[cfg(c_bindings)] -impl MultiThreadedLockableScore { +impl MultiThreadedLockableScore { /// Creates a new [`MultiThreadedLockableScore`] given an underlying [`Score`]. pub fn new(score: T) -> Self { - MultiThreadedLockableScore { score: Mutex::new(score) } + MultiThreadedLockableScore { score: RwLock::new(score) } } } #[cfg(c_bindings)] /// A locked `MultiThreadedLockableScore`. -pub struct MultiThreadedScoreLock<'a, T: Score>(MutexGuard<'a, T>); +pub struct MultiThreadedScoreLockRead<'a, T: ScoreLookUp>(RwLockReadGuard<'a, T>); #[cfg(c_bindings)] -impl<'a, T: 'a + Score> Writeable for MultiThreadedScoreLock<'a, T> { - fn write(&self, writer: &mut W) -> Result<(), io::Error> { - self.0.write(writer) +/// A locked `MultiThreadedLockableScore`. +pub struct MultiThreadedScoreLockWrite<'a, T: ScoreUpdate>(RwLockWriteGuard<'a, T>); + +#[cfg(c_bindings)] +impl<'a, T: 'a + ScoreLookUp> Deref for MultiThreadedScoreLockRead<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0.deref() } } #[cfg(c_bindings)] -impl<'a, T: 'a + Score> DerefMut for MultiThreadedScoreLock<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - self.0.deref_mut() - } +impl<'a, T: 'a + ScoreUpdate> Writeable for MultiThreadedScoreLockWrite<'a, T> { + fn write(&self, writer: &mut W) -> Result<(), io::Error> { + self.0.write(writer) + } } #[cfg(c_bindings)] -impl<'a, T: 'a + Score> Deref for MultiThreadedScoreLock<'a, T> { +impl<'a, T: 'a + ScoreUpdate> Deref for MultiThreadedScoreLockWrite<'a, T> { type Target = T; - fn deref(&self) -> &Self::Target { - self.0.deref() - } + fn deref(&self) -> &Self::Target { + self.0.deref() + } } +#[cfg(c_bindings)] +impl<'a, T: 'a + ScoreUpdate> DerefMut for MultiThreadedScoreLockWrite<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.deref_mut() + } +} -/// Proposed use of a channel passed as a parameter to [`Score::channel_penalty_msat`]. +/// Proposed use of a channel passed as a parameter to [`ScoreLookUp::channel_penalty_msat`]. #[derive(Clone, Copy, Debug, PartialEq)] pub struct ChannelUsage { /// The amount to send through the channel, denominated in millisatoshis. @@ -273,7 +338,7 @@ pub struct ChannelUsage { } #[derive(Clone)] -/// [`Score`] implementation that uses a fixed penalty. +/// [`ScoreLookUp`] implementation that uses a fixed penalty. pub struct FixedPenaltyScorer { penalty_msat: u64, } @@ -285,12 +350,14 @@ impl FixedPenaltyScorer { } } -impl Score for FixedPenaltyScorer { +impl ScoreLookUp for FixedPenaltyScorer { type ScoreParams = (); fn channel_penalty_msat(&self, _: u64, _: &NodeId, _: &NodeId, _: ChannelUsage, _score_params: &Self::ScoreParams) -> u64 { self.penalty_msat } +} +impl ScoreUpdate for FixedPenaltyScorer { fn payment_path_failed(&mut self, _path: &Path, _short_channel_id: u64) {} fn payment_path_successful(&mut self, _path: &Path) {} @@ -323,7 +390,7 @@ use crate::util::time::Eternity; #[cfg(feature = "no-std")] type ConfiguredTime = Eternity; -/// [`Score`] implementation using channel success probability distributions. +/// [`ScoreLookUp`] implementation using channel success probability distributions. /// /// Channels are tracked with upper and lower liquidity bounds - when an HTLC fails at a channel, /// we learn that the upper-bound on the available liquidity is lower than the amount of the HTLC. @@ -361,7 +428,7 @@ type ConfiguredTime = Eternity; /// [`historical_liquidity_penalty_amount_multiplier_msat`]: ProbabilisticScoringFeeParameters::historical_liquidity_penalty_amount_multiplier_msat pub type ProbabilisticScorer = ProbabilisticScorerUsingTime::; -/// Probabilistic [`Score`] implementation. +/// Probabilistic [`ScoreLookUp`] implementation. /// /// This is not exported to bindings users generally all users should use the [`ProbabilisticScorer`] type alias. pub struct ProbabilisticScorerUsingTime>, L: Deref, T: Time> @@ -1235,7 +1302,7 @@ impl, BRT: DerefMut>, L: Deref, T: Time> Score for ProbabilisticScorerUsingTime where L::Target: Logger { +impl>, L: Deref, T: Time> ScoreLookUp for ProbabilisticScorerUsingTime where L::Target: Logger { type ScoreParams = ProbabilisticScoringFeeParameters; fn channel_penalty_msat( &self, short_channel_id: u64, source: &NodeId, target: &NodeId, usage: ChannelUsage, score_params: &ProbabilisticScoringFeeParameters @@ -1278,7 +1345,9 @@ impl>, L: Deref, T: Time> Score for Probabilis .saturating_add(anti_probing_penalty_msat) .saturating_add(base_penalty_msat) } +} +impl>, L: Deref, T: Time> ScoreUpdate for ProbabilisticScorerUsingTime where L::Target: Logger { fn payment_path_failed(&mut self, path: &Path, short_channel_id: u64) { let amount_msat = path.final_value_msat(); log_trace!(self.logger, "Scoring path through to SCID {} as having failed at {} msat", short_channel_id, amount_msat); @@ -1757,7 +1826,7 @@ mod tests { use crate::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; use crate::routing::gossip::{EffectiveCapacity, NetworkGraph, NodeId}; use crate::routing::router::{BlindedTail, Path, RouteHop}; - use crate::routing::scoring::{ChannelUsage, Score}; + use crate::routing::scoring::{ChannelUsage, ScoreLookUp, ScoreUpdate}; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::test_utils::{self, TestLogger}; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index bcd460ee1..881f2e751 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -31,7 +31,8 @@ use crate::offers::invoice_request::UnsignedInvoiceRequest; use crate::routing::gossip::{EffectiveCapacity, NetworkGraph, NodeId}; use crate::routing::utxo::{UtxoLookup, UtxoLookupError, UtxoResult}; use crate::routing::router::{find_route, InFlightHtlcs, Path, Route, RouteParameters, Router, ScorerAccountingForInFlightHtlcs}; -use crate::routing::scoring::{ChannelUsage, Score}; +use crate::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp}; +use crate::sync::RwLock; use crate::util::config::UserConfig; use crate::util::enforcing_trait_impls::{EnforcingSigner, EnforcementState}; use crate::util::logger::{Logger, Level, Record}; @@ -59,7 +60,7 @@ use regex; use crate::io; use crate::prelude::*; use core::cell::RefCell; -use core::ops::DerefMut; +use core::ops::Deref; use core::time::Duration; use crate::sync::{Mutex, Arc}; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -100,11 +101,11 @@ impl chaininterface::FeeEstimator for TestFeeEstimator { pub struct TestRouter<'a> { pub network_graph: Arc>, pub next_routes: Mutex)>>, - pub scorer: &'a Mutex, + pub scorer: &'a RwLock, } impl<'a> TestRouter<'a> { - pub fn new(network_graph: Arc>, scorer: &'a Mutex) -> Self { + pub fn new(network_graph: Arc>, scorer: &'a RwLock) -> Self { Self { network_graph, next_routes: Mutex::new(VecDeque::new()), scorer } } @@ -122,8 +123,8 @@ impl<'a> Router for TestRouter<'a> { if let Some((find_route_query, find_route_res)) = self.next_routes.lock().unwrap().pop_front() { assert_eq!(find_route_query, *params); if let Ok(ref route) = find_route_res { - let mut binding = self.scorer.lock().unwrap(); - let scorer = ScorerAccountingForInFlightHtlcs::new(binding.deref_mut(), &inflight_htlcs); + let scorer = self.scorer.read().unwrap(); + let scorer = ScorerAccountingForInFlightHtlcs::new(scorer, &inflight_htlcs); for path in &route.paths { let mut aggregate_msat = 0u64; for (idx, hop) in path.hops.iter().rev().enumerate() { @@ -150,7 +151,7 @@ impl<'a> Router for TestRouter<'a> { let logger = TestLogger::new(); find_route( payer, params, &self.network_graph, first_hops, &logger, - &ScorerAccountingForInFlightHtlcs::new(self.scorer.lock().unwrap().deref_mut(), &inflight_htlcs), &(), + &ScorerAccountingForInFlightHtlcs::new(self.scorer.read().unwrap(), &inflight_htlcs), &(), &[42; 32] ) } @@ -1160,7 +1161,7 @@ impl crate::util::ser::Writeable for TestScorer { fn write(&self, _: &mut W) -> Result<(), crate::io::Error> { unreachable!(); } } -impl Score for TestScorer { +impl ScoreLookUp for TestScorer { type ScoreParams = (); fn channel_penalty_msat( &self, short_channel_id: u64, _source: &NodeId, _target: &NodeId, usage: ChannelUsage, _score_params: &Self::ScoreParams @@ -1176,7 +1177,9 @@ impl Score for TestScorer { } 0 } +} +impl ScoreUpdate for TestScorer { fn payment_path_failed(&mut self, _actual_path: &Path, _actual_short_channel_id: u64) {} fn payment_path_successful(&mut self, _actual_path: &Path) {} -- 2.39.5