X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=f953ba1c753750133b6e79a3cc43eb1e05c81962;hb=36ecc8e729775be2a12e1392066926bed760524a;hp=6a36874a384bf9ca23a91d2cef6557853943aa2f;hpb=daf79f515fd4344c4ca278782500cb876b61749c;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 6a36874a..f953ba1c 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -270,12 +270,13 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri } macro_rules! define_run_body { - ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, - $channel_manager: ident, $process_channel_manager_events: expr, - $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, - $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, - $check_slow_await: expr) - => { { + ( + $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $channel_manager: ident, $process_channel_manager_events: expr, + $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, + $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, + $check_slow_await: expr + ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); @@ -501,15 +502,15 @@ use core::task; /// could setup `process_events_async` like this: /// ``` /// # use lightning::io; -/// # use std::sync::{Arc, Mutex}; +/// # use std::sync::{Arc, RwLock}; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use lightning_background_processor::{process_events_async, GossipSync}; /// # struct MyStore {} /// # impl lightning::util::persist::KVStore for MyStore { -/// # fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } -/// # fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } -/// # fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } -/// # fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result> { Ok(Vec::new()) } +/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } +/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } +/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } +/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } /// # } /// # struct MyEventHandler {} /// # impl MyEventHandler { @@ -528,11 +529,11 @@ use core::task; /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; -/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; +/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, MyLogger>; /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; -/// # type MyScorer = Mutex, Arc>>; +/// # type MyScorer = RwLock, Arc>>; /// /// # async fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_gossip_sync: Arc, my_logger: Arc, my_scorer: Arc, my_peer_manager: Arc) { /// let background_persister = Arc::clone(&my_persister); @@ -599,12 +600,11 @@ pub async fn process_events_async< EventHandlerFuture: core::future::Future, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, - M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - APM: APeerManager + Send + Sync, - PM: 'static + Deref + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, @@ -617,7 +617,7 @@ pub async fn process_events_async< where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::Signer>, + CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, ES::Target: 'static + EntropySource, NS::Target: 'static + NodeSigner, @@ -625,8 +625,9 @@ where F::Target: 'static + FeeEstimator, R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::Signer>, + P::Target: 'static + Persist<::EcdsaSigner>, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + PM::Target: APeerManager + Send + Sync, { let mut should_break = false; let async_event_handler = |event| { @@ -650,8 +651,9 @@ where event_handler(event).await; } }; - define_run_body!(persister, - chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, + define_run_body!( + persister, chain_monitor, + chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, gossip_sync, peer_manager, logger, scorer, should_break, { let fut = Selector { @@ -673,7 +675,8 @@ where task::Poll::Ready(exit) => { should_break = exit; true }, task::Poll::Pending => false, } - }, mobile_interruptable_platform) + }, mobile_interruptable_platform + ) } #[cfg(feature = "std")] @@ -738,12 +741,11 @@ impl BackgroundProcessor { P: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, - M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - APM: APeerManager + Send + Sync, - PM: 'static + Deref + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for <'b> WriteableScore<'b>, >( @@ -753,7 +755,7 @@ impl BackgroundProcessor { where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::Signer>, + CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, ES::Target: 'static + EntropySource, NS::Target: 'static + NodeSigner, @@ -761,8 +763,9 @@ impl BackgroundProcessor { F::Target: 'static + FeeEstimator, R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::Signer>, + P::Target: 'static + Persist<::EcdsaSigner>, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + PM::Target: APeerManager + Send + Sync, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -782,14 +785,16 @@ impl BackgroundProcessor { } event_handler.handle_event(event); }; - define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), + define_run_body!( + persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)); }, - |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) + |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false + ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } @@ -845,7 +850,7 @@ impl Drop for BackgroundProcessor { #[cfg(all(feature = "std", test))] mod tests { use bitcoin::blockdata::constants::{genesis_block, ChainHash}; - use bitcoin::blockdata::locktime::PackedLockTime; + use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1}; @@ -864,11 +869,14 @@ mod tests { use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; use lightning::routing::router::{DefaultRouter, Path, RouteHop}; - use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp}; + use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore}; use lightning::util::config::UserConfig; use lightning::util::ser::Writeable; use lightning::util::test_utils; - use lightning::util::persist::{KVStore, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_SUB_NAMESPACE, SCORER_PERSISTENCE_KEY}; + use lightning::util::persist::{KVStore, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY}; use lightning_persister::fs_store::FilesystemStore; use std::collections::VecDeque; use std::{fs, env}; @@ -891,6 +899,11 @@ mod tests { fn disconnect_socket(&mut self) {} } + #[cfg(c_bindings)] + type LockingWrapper = lightning::routing::scoring::MultiThreadedLockableScore; + #[cfg(not(c_bindings))] + type LockingWrapper = Mutex; + type ChannelManager = channelmanager::ChannelManager< Arc, @@ -902,7 +915,7 @@ mod tests { Arc>>, Arc, - Arc>, + Arc>, (), TestScorer> >, @@ -924,7 +937,7 @@ mod tests { network_graph: Arc>>, logger: Arc, best_block: BestBlock, - scorer: Arc>, + scorer: Arc>, } impl Node { @@ -983,13 +996,13 @@ mod tests { } impl KVStore for Persister { - fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> lightning::io::Result> { - self.kv_store.read(namespace, sub_namespace, key) + fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result> { + self.kv_store.read(primary_namespace, secondary_namespace, key) } - fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> { - if namespace == CHANNEL_MANAGER_PERSISTENCE_NAMESPACE && - sub_namespace == CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE && + fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> { + if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE && key == CHANNEL_MANAGER_PERSISTENCE_KEY { if let Some((error, message)) = self.manager_error { @@ -997,8 +1010,8 @@ mod tests { } } - if namespace == NETWORK_GRAPH_PERSISTENCE_NAMESPACE && - sub_namespace == NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE && + if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE && key == NETWORK_GRAPH_PERSISTENCE_KEY { if let Some(sender) = &self.graph_persistence_notifier { @@ -1013,8 +1026,8 @@ mod tests { } } - if namespace == SCORER_PERSISTENCE_NAMESPACE && - sub_namespace == SCORER_PERSISTENCE_SUB_NAMESPACE && + if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE && key == SCORER_PERSISTENCE_KEY { if let Some((error, message)) = self.scorer_error { @@ -1022,15 +1035,15 @@ mod tests { } } - self.kv_store.write(namespace, sub_namespace, key, buf) + self.kv_store.write(primary_namespace, secondary_namespace, key, buf) } - fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> { - self.kv_store.remove(namespace, sub_namespace, key, lazy) + fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> { + self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy) } - fn list(&self, namespace: &str, sub_namespace: &str) -> lightning::io::Result> { - self.kv_store.list(namespace, sub_namespace) + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result> { + self.kv_store.list(primary_namespace, secondary_namespace) } } @@ -1145,6 +1158,9 @@ mod tests { } } + #[cfg(c_bindings)] + impl lightning::routing::scoring::Score for TestScorer {} + impl Drop for TestScorer { fn drop(&mut self) { if std::thread::panicking() { @@ -1176,9 +1192,9 @@ mod tests { let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); let genesis_block = genesis_block(network); let network_graph = Arc::new(NetworkGraph::new(network, logger.clone())); - let scorer = Arc::new(Mutex::new(TestScorer::new())); + let scorer = Arc::new(LockingWrapper::new(TestScorer::new())); let seed = [i as u8; 32]; - let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ())); + let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default())); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); let now = Duration::from_secs(genesis_block.header.time as u64); @@ -1230,7 +1246,7 @@ mod tests { macro_rules! begin_open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ - $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap(); + $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None).unwrap(); $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); }} @@ -1243,7 +1259,7 @@ mod tests { assert_eq!(channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); - let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut { + let tx = Transaction { version: 1 as i32, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut { value: channel_value_satoshis, script_pubkey: output_script.clone(), }]}; (temporary_channel_id, tx) @@ -1364,9 +1380,9 @@ mod tests { let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string(); let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() && - log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() && - log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() { + if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() { break } } @@ -1545,7 +1561,7 @@ mod tests { loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let expected_log = "Persisting scorer".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() { + if log_entries.get(&("lightning_background_processor", expected_log)).is_some() { break } } @@ -1569,7 +1585,7 @@ mod tests { $sleep; let log_entries = $nodes[0].logger.lines.lock().unwrap(); let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string(); - if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter)) + if *log_entries.get(&("lightning_background_processor", loop_counter)) .unwrap_or(&0) > 1 { // Wait until the loop has gone around at least twice. @@ -1683,9 +1699,10 @@ mod tests { channel_features: ChannelFeatures::empty(), fee_msat: 0, cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32, + maybe_announced_channel: true, }], blinded_tail: None }; - $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); + $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { payment_id: None, payment_hash: PaymentHash([42; 32]), @@ -1702,7 +1719,7 @@ mod tests { // Ensure we'll score payments that were explicitly failed back by the destination as // ProbeSuccess. - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { payment_id: None, payment_hash: PaymentHash([42; 32]), @@ -1717,7 +1734,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful { payment_id: PaymentId([42; 32]), payment_hash: None, @@ -1729,7 +1746,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::ProbeSuccessful { payment_id: PaymentId([42; 32]), payment_hash: PaymentHash([42; 32]), @@ -1741,7 +1758,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() }); $nodes[0].node.push_pending_event(Event::ProbeFailed { payment_id: PaymentId([42; 32]), payment_hash: PaymentHash([42; 32]), @@ -1780,7 +1797,7 @@ mod tests { let log_entries = nodes[0].logger.lines.lock().unwrap(); let expected_log = "Persisting scorer after update".to_string(); - assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5); + assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5); } #[tokio::test] @@ -1826,7 +1843,7 @@ mod tests { let log_entries = nodes[0].logger.lines.lock().unwrap(); let expected_log = "Persisting scorer after update".to_string(); - assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5); + assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5); }); let (r1, r2) = tokio::join!(t1, t2);