Stop writing signer data as a part of channels
[rust-lightning] / lightning-background-processor / src / lib.rs
index 7ae14b4b4aabd5e92c3a610b5cfd622c2314264f..aa6d0b0615ef21e221e745e3563f1ddc9c2bf620 100644 (file)
@@ -501,15 +501,15 @@ use core::task;
 /// could setup `process_events_async` like this:
 /// ```
 /// # use lightning::io;
-/// # use std::sync::{Arc, Mutex};
+/// # use std::sync::{Arc, RwLock};
 /// # use std::sync::atomic::{AtomicBool, Ordering};
 /// # use lightning_background_processor::{process_events_async, GossipSync};
 /// # struct MyStore {}
 /// # impl lightning::util::persist::KVStore for MyStore {
-/// #     fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
-/// #     fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
-/// #     fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
-/// #     fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
+/// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
+/// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
+/// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
+/// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
 /// # }
 /// # struct MyEventHandler {}
 /// # impl MyEventHandler {
@@ -528,11 +528,11 @@ use core::task;
 /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
 /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
 /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
-/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger>;
+/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
 /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
 /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
 /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
-/// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
+/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
 ///
 /// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
 ///    let background_persister = Arc::clone(&my_persister);
@@ -864,11 +864,14 @@ mod tests {
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
        use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
        use lightning::routing::router::{DefaultRouter, Path, RouteHop};
-       use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp};
+       use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
        use lightning::util::config::UserConfig;
        use lightning::util::ser::Writeable;
        use lightning::util::test_utils;
-       use lightning::util::persist::{KVStore, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_SUB_NAMESPACE, SCORER_PERSISTENCE_KEY};
+       use lightning::util::persist::{KVStore,
+               CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY,
+               NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
+               SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY};
        use lightning_persister::fs_store::FilesystemStore;
        use std::collections::VecDeque;
        use std::{fs, env};
@@ -891,6 +894,11 @@ mod tests {
                fn disconnect_socket(&mut self) {}
        }
 
+       #[cfg(c_bindings)]
+       type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
+       #[cfg(not(c_bindings))]
+       type LockingWrapper<T> = Mutex<T>;
+
        type ChannelManager =
                channelmanager::ChannelManager<
                        Arc<ChainMonitor>,
@@ -902,7 +910,7 @@ mod tests {
                        Arc<DefaultRouter<
                                Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
                                Arc<test_utils::TestLogger>,
-                               Arc<Mutex<TestScorer>>,
+                               Arc<LockingWrapper<TestScorer>>,
                                (),
                                TestScorer>
                        >,
@@ -924,7 +932,7 @@ mod tests {
                network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
                logger: Arc<test_utils::TestLogger>,
                best_block: BestBlock,
-               scorer: Arc<Mutex<TestScorer>>,
+               scorer: Arc<LockingWrapper<TestScorer>>,
        }
 
        impl Node {
@@ -983,13 +991,13 @@ mod tests {
        }
 
        impl KVStore for Persister {
-               fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
-                       self.kv_store.read(namespace, sub_namespace, key)
+               fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
+                       self.kv_store.read(primary_namespace, secondary_namespace, key)
                }
 
-               fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
-                       if namespace == CHANNEL_MANAGER_PERSISTENCE_NAMESPACE &&
-                               sub_namespace == CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE &&
+               fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
+                       if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE &&
+                               secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE &&
                                key == CHANNEL_MANAGER_PERSISTENCE_KEY
                        {
                                if let Some((error, message)) = self.manager_error {
@@ -997,8 +1005,8 @@ mod tests {
                                }
                        }
 
-                       if namespace == NETWORK_GRAPH_PERSISTENCE_NAMESPACE &&
-                               sub_namespace == NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE &&
+                       if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE &&
+                               secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE &&
                                key == NETWORK_GRAPH_PERSISTENCE_KEY
                        {
                                if let Some(sender) = &self.graph_persistence_notifier {
@@ -1013,8 +1021,8 @@ mod tests {
                                }
                        }
 
-                       if namespace == SCORER_PERSISTENCE_NAMESPACE &&
-                               sub_namespace == SCORER_PERSISTENCE_SUB_NAMESPACE &&
+                       if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE &&
+                               secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE &&
                                key == SCORER_PERSISTENCE_KEY
                        {
                                if let Some((error, message)) = self.scorer_error {
@@ -1022,15 +1030,15 @@ mod tests {
                                }
                        }
 
-                       self.kv_store.write(namespace, sub_namespace, key, buf)
+                       self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
                }
 
-               fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
-                       self.kv_store.remove(namespace, sub_namespace, key, lazy)
+               fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
+                       self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
                }
 
-               fn list(&self, namespace: &str, sub_namespace: &str) -> lightning::io::Result<Vec<String>> {
-                       self.kv_store.list(namespace, sub_namespace)
+               fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result<Vec<String>> {
+                       self.kv_store.list(primary_namespace, secondary_namespace)
                }
        }
 
@@ -1145,6 +1153,9 @@ mod tests {
                }
        }
 
+       #[cfg(c_bindings)]
+       impl lightning::routing::scoring::Score for TestScorer {}
+
        impl Drop for TestScorer {
                fn drop(&mut self) {
                        if std::thread::panicking() {
@@ -1176,9 +1187,9 @@ mod tests {
                        let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
                        let genesis_block = genesis_block(network);
                        let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
-                       let scorer = Arc::new(Mutex::new(TestScorer::new()));
+                       let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
                        let seed = [i as u8; 32];
-                       let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ()));
+                       let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default()));
                        let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
                        let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
                        let now = Duration::from_secs(genesis_block.header.time as u64);
@@ -1686,7 +1697,7 @@ mod tests {
                                maybe_announced_channel: true,
                        }], blinded_tail: None };
 
-                       $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
+                       $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
                        $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
                                payment_id: None,
                                payment_hash: PaymentHash([42; 32]),
@@ -1703,7 +1714,7 @@ mod tests {
 
                        // Ensure we'll score payments that were explicitly failed back by the destination as
                        // ProbeSuccess.
-                       $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
+                       $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
                        $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
                                payment_id: None,
                                payment_hash: PaymentHash([42; 32]),
@@ -1718,7 +1729,7 @@ mod tests {
                                _ => panic!("Unexpected event"),
                        }
 
-                       $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
+                       $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
                        $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
                                payment_id: PaymentId([42; 32]),
                                payment_hash: None,
@@ -1730,7 +1741,7 @@ mod tests {
                                _ => panic!("Unexpected event"),
                        }
 
-                       $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
+                       $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
                        $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
                                payment_id: PaymentId([42; 32]),
                                payment_hash: PaymentHash([42; 32]),
@@ -1742,7 +1753,7 @@ mod tests {
                                _ => panic!("Unexpected event"),
                        }
 
-                       $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
+                       $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
                        $nodes[0].node.push_pending_event(Event::ProbeFailed {
                                payment_id: PaymentId([42; 32]),
                                payment_hash: PaymentHash([42; 32]),