X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=aa6d0b0615ef21e221e745e3563f1ddc9c2bf620;hb=refs%2Fheads%2F2023-11-less-graph-memory-frag;hp=e0c71bfcf92b83abed9bce9e138c9cb34c823f5b;hpb=1cb810358dd24e20f2228ad07526f3bd40c5fd0f;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index e0c71bfc..aa6d0b06 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -501,15 +501,15 @@ use core::task; /// could setup `process_events_async` like this: /// ``` /// # use lightning::io; -/// # use std::sync::{Arc, Mutex}; +/// # use std::sync::{Arc, RwLock}; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use lightning_background_processor::{process_events_async, GossipSync}; /// # struct MyStore {} /// # impl lightning::util::persist::KVStore for MyStore { -/// # fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } -/// # fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } -/// # fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } -/// # fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result> { Ok(Vec::new()) } +/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } +/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } +/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } +/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } /// # } /// # struct MyEventHandler {} /// # impl MyEventHandler { @@ -528,11 +528,11 @@ use core::task; /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; -/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; +/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, MyLogger>; /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; -/// # type MyScorer = Mutex, Arc>>; +/// # type MyScorer = RwLock, Arc>>; /// /// # async fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_gossip_sync: Arc, my_logger: Arc, my_scorer: Arc, my_peer_manager: Arc) { /// let background_persister = Arc::clone(&my_persister); @@ -864,7 +864,7 @@ mod tests { use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; use lightning::routing::router::{DefaultRouter, Path, RouteHop}; - use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp}; + use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore}; use lightning::util::config::UserConfig; use lightning::util::ser::Writeable; use lightning::util::test_utils; @@ -894,6 +894,11 @@ mod tests { fn disconnect_socket(&mut self) {} } + #[cfg(c_bindings)] + type LockingWrapper = lightning::routing::scoring::MultiThreadedLockableScore; + #[cfg(not(c_bindings))] + type LockingWrapper = Mutex; + type ChannelManager = channelmanager::ChannelManager< Arc, @@ -905,7 +910,7 @@ mod tests { Arc>>, Arc, - Arc>, + Arc>, (), TestScorer> >, @@ -927,7 +932,7 @@ mod tests { network_graph: Arc>>, logger: Arc, best_block: BestBlock, - scorer: Arc>, + scorer: Arc>, } impl Node { @@ -986,13 +991,13 @@ mod tests { } impl KVStore for Persister { - fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> lightning::io::Result> { - self.kv_store.read(namespace, sub_namespace, key) + fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result> { + self.kv_store.read(primary_namespace, secondary_namespace, key) } - fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> { - if namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE && - sub_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE && + fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> { + if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE && key == CHANNEL_MANAGER_PERSISTENCE_KEY { if let Some((error, message)) = self.manager_error { @@ -1000,8 +1005,8 @@ mod tests { } } - if namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE && - sub_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE && + if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE && key == NETWORK_GRAPH_PERSISTENCE_KEY { if let Some(sender) = &self.graph_persistence_notifier { @@ -1016,8 +1021,8 @@ mod tests { } } - if namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE && - sub_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE && + if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE && key == SCORER_PERSISTENCE_KEY { if let Some((error, message)) = self.scorer_error { @@ -1025,15 +1030,15 @@ mod tests { } } - self.kv_store.write(namespace, sub_namespace, key, buf) + self.kv_store.write(primary_namespace, secondary_namespace, key, buf) } - fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> { - self.kv_store.remove(namespace, sub_namespace, key, lazy) + fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> { + self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy) } - fn list(&self, namespace: &str, sub_namespace: &str) -> lightning::io::Result> { - self.kv_store.list(namespace, sub_namespace) + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result> { + self.kv_store.list(primary_namespace, secondary_namespace) } } @@ -1148,6 +1153,9 @@ mod tests { } } + #[cfg(c_bindings)] + impl lightning::routing::scoring::Score for TestScorer {} + impl Drop for TestScorer { fn drop(&mut self) { if std::thread::panicking() { @@ -1179,9 +1187,9 @@ mod tests { let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); let genesis_block = genesis_block(network); let network_graph = Arc::new(NetworkGraph::new(network, logger.clone())); - let scorer = Arc::new(Mutex::new(TestScorer::new())); + let scorer = Arc::new(LockingWrapper::new(TestScorer::new())); let seed = [i as u8; 32]; - let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ())); + let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default())); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); let now = Duration::from_secs(genesis_block.header.time as u64); @@ -1689,7 +1697,7 @@ mod tests { maybe_announced_channel: true, }], blinded_tail: None }; - $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); + $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { payment_id: None, payment_hash: PaymentHash([42; 32]), @@ -1706,7 +1714,7 @@ mod tests { // Ensure we'll score payments that were explicitly failed back by the destination as // ProbeSuccess. - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { payment_id: None, payment_hash: PaymentHash([42; 32]), @@ -1721,7 +1729,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful { payment_id: PaymentId([42; 32]), payment_hash: None, @@ -1733,7 +1741,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::ProbeSuccessful { payment_id: PaymentId([42; 32]), payment_hash: PaymentHash([42; 32]), @@ -1745,7 +1753,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() }); $nodes[0].node.push_pending_event(Event::ProbeFailed { payment_id: PaymentId([42; 32]), payment_hash: PaymentHash([42; 32]),