X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=aa6d0b0615ef21e221e745e3563f1ddc9c2bf620;hb=refs%2Fheads%2F2023-11-less-graph-memory-frag;hp=6a36874a384bf9ca23a91d2cef6557853943aa2f;hpb=24db35eeea7049a3abde9766baf8dda69b462c3f;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 6a36874a..aa6d0b06 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -501,15 +501,15 @@ use core::task; /// could setup `process_events_async` like this: /// ``` /// # use lightning::io; -/// # use std::sync::{Arc, Mutex}; +/// # use std::sync::{Arc, RwLock}; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use lightning_background_processor::{process_events_async, GossipSync}; /// # struct MyStore {} /// # impl lightning::util::persist::KVStore for MyStore { -/// # fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } -/// # fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } -/// # fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } -/// # fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result> { Ok(Vec::new()) } +/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } +/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } +/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } +/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } /// # } /// # struct MyEventHandler {} /// # impl MyEventHandler { @@ -528,11 +528,11 @@ use core::task; /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; -/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; +/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, MyLogger>; /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; -/// # type MyScorer = Mutex, Arc>>; +/// # type MyScorer = RwLock, Arc>>; /// /// # async fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_gossip_sync: Arc, my_logger: Arc, my_scorer: Arc, my_peer_manager: Arc) { /// let background_persister = Arc::clone(&my_persister); @@ -864,11 +864,14 @@ mod tests { use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; use lightning::routing::router::{DefaultRouter, Path, RouteHop}; - use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp}; + use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore}; use lightning::util::config::UserConfig; use lightning::util::ser::Writeable; use lightning::util::test_utils; - use lightning::util::persist::{KVStore, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_SUB_NAMESPACE, SCORER_PERSISTENCE_KEY}; + use lightning::util::persist::{KVStore, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY}; use lightning_persister::fs_store::FilesystemStore; use std::collections::VecDeque; use std::{fs, env}; @@ -891,6 +894,11 @@ mod tests { fn disconnect_socket(&mut self) {} } + #[cfg(c_bindings)] + type LockingWrapper = lightning::routing::scoring::MultiThreadedLockableScore; + #[cfg(not(c_bindings))] + type LockingWrapper = Mutex; + type ChannelManager = channelmanager::ChannelManager< Arc, @@ -902,7 +910,7 @@ mod tests { Arc>>, Arc, - Arc>, + Arc>, (), TestScorer> >, @@ -924,7 +932,7 @@ mod tests { network_graph: Arc>>, logger: Arc, best_block: BestBlock, - scorer: Arc>, + scorer: Arc>, } impl Node { @@ -983,13 +991,13 @@ mod tests { } impl KVStore for Persister { - fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> lightning::io::Result> { - self.kv_store.read(namespace, sub_namespace, key) + fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result> { + self.kv_store.read(primary_namespace, secondary_namespace, key) } - fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> { - if namespace == CHANNEL_MANAGER_PERSISTENCE_NAMESPACE && - sub_namespace == CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE && + fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> { + if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE && key == CHANNEL_MANAGER_PERSISTENCE_KEY { if let Some((error, message)) = self.manager_error { @@ -997,8 +1005,8 @@ mod tests { } } - if namespace == NETWORK_GRAPH_PERSISTENCE_NAMESPACE && - sub_namespace == NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE && + if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE && key == NETWORK_GRAPH_PERSISTENCE_KEY { if let Some(sender) = &self.graph_persistence_notifier { @@ -1013,8 +1021,8 @@ mod tests { } } - if namespace == SCORER_PERSISTENCE_NAMESPACE && - sub_namespace == SCORER_PERSISTENCE_SUB_NAMESPACE && + if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE && key == SCORER_PERSISTENCE_KEY { if let Some((error, message)) = self.scorer_error { @@ -1022,15 +1030,15 @@ mod tests { } } - self.kv_store.write(namespace, sub_namespace, key, buf) + self.kv_store.write(primary_namespace, secondary_namespace, key, buf) } - fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> { - self.kv_store.remove(namespace, sub_namespace, key, lazy) + fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> { + self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy) } - fn list(&self, namespace: &str, sub_namespace: &str) -> lightning::io::Result> { - self.kv_store.list(namespace, sub_namespace) + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result> { + self.kv_store.list(primary_namespace, secondary_namespace) } } @@ -1145,6 +1153,9 @@ mod tests { } } + #[cfg(c_bindings)] + impl lightning::routing::scoring::Score for TestScorer {} + impl Drop for TestScorer { fn drop(&mut self) { if std::thread::panicking() { @@ -1176,9 +1187,9 @@ mod tests { let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); let genesis_block = genesis_block(network); let network_graph = Arc::new(NetworkGraph::new(network, logger.clone())); - let scorer = Arc::new(Mutex::new(TestScorer::new())); + let scorer = Arc::new(LockingWrapper::new(TestScorer::new())); let seed = [i as u8; 32]; - let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ())); + let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default())); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); let now = Duration::from_secs(genesis_block.header.time as u64); @@ -1683,9 +1694,10 @@ mod tests { channel_features: ChannelFeatures::empty(), fee_msat: 0, cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32, + maybe_announced_channel: true, }], blinded_tail: None }; - $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); + $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { payment_id: None, payment_hash: PaymentHash([42; 32]), @@ -1702,7 +1714,7 @@ mod tests { // Ensure we'll score payments that were explicitly failed back by the destination as // ProbeSuccess. - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { payment_id: None, payment_hash: PaymentHash([42; 32]), @@ -1717,7 +1729,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful { payment_id: PaymentId([42; 32]), payment_hash: None, @@ -1729,7 +1741,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::ProbeSuccessful { payment_id: PaymentId([42; 32]), payment_hash: PaymentHash([42; 32]), @@ -1741,7 +1753,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() }); $nodes[0].node.push_pending_event(Event::ProbeFailed { payment_id: PaymentId([42; 32]), payment_hash: PaymentHash([42; 32]),