1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
38 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
39 /// responsibilities are:
40 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
41 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
42 /// writing it to disk/backups by invoking the callback given to it at startup.
43 /// [`ChannelManager`] persistence should be done in the background.
44 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
45 /// at the appropriate intervals.
46 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
47 /// is provided to [`BackgroundProcessor::start`]).
49 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
50 /// upon as doing so may result in high latency.
54 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
55 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
56 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
57 /// unilateral chain closure fees are at risk.
59 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
60 /// [`Event`]: lightning::util::events::Event
61 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
62 pub struct BackgroundProcessor {
63 stop_thread: Arc<AtomicBool>,
64 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 const FRESHNESS_TIMER: u64 = 60;
70 const FRESHNESS_TIMER: u64 = 1;
72 #[cfg(all(not(test), not(debug_assertions)))]
73 const PING_TIMER: u64 = 10;
74 /// Signature operations take a lot longer without compiler optimisations.
75 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
76 /// timeout is reached.
77 #[cfg(all(not(test), debug_assertions))]
78 const PING_TIMER: u64 = 30;
80 const PING_TIMER: u64 = 1;
82 /// Prune the network graph of stale entries hourly.
83 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
86 const SCORER_PERSIST_TIMER: u64 = 30;
88 const SCORER_PERSIST_TIMER: u64 = 1;
91 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
93 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
95 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
97 P: Deref<Target = P2PGossipSync<G, A, L>>,
98 R: Deref<Target = RapidGossipSync<G, L>>,
99 G: Deref<Target = NetworkGraph<L>>,
103 where A::Target: chain::Access, L::Target: Logger {
104 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
106 /// Rapid gossip sync from a trusted server.
113 P: Deref<Target = P2PGossipSync<G, A, L>>,
114 R: Deref<Target = RapidGossipSync<G, L>>,
115 G: Deref<Target = NetworkGraph<L>>,
118 > GossipSync<P, R, G, A, L>
119 where A::Target: chain::Access, L::Target: Logger {
120 fn network_graph(&self) -> Option<&G> {
122 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
123 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
124 GossipSync::None => None,
128 fn prunable_network_graph(&self) -> Option<&G> {
130 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
131 GossipSync::Rapid(gossip_sync) => {
132 if gossip_sync.is_initial_sync_complete() {
133 Some(gossip_sync.network_graph())
138 GossipSync::None => None,
143 /// (C-not exported) as the bindings concretize everything and have constructors for us
144 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
145 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
147 A::Target: chain::Access,
150 /// Initializes a new [`GossipSync::P2P`] variant.
151 pub fn p2p(gossip_sync: P) -> Self {
152 GossipSync::P2P(gossip_sync)
156 /// (C-not exported) as the bindings concretize everything and have constructors for us
157 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
159 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
162 &'a (dyn chain::Access + Send + Sync),
168 /// Initializes a new [`GossipSync::Rapid`] variant.
169 pub fn rapid(gossip_sync: R) -> Self {
170 GossipSync::Rapid(gossip_sync)
174 /// (C-not exported) as the bindings concretize everything and have constructors for us
177 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
178 &RapidGossipSync<&'a NetworkGraph<L>, L>,
180 &'a (dyn chain::Access + Send + Sync),
186 /// Initializes a new [`GossipSync::None`] variant.
187 pub fn none() -> Self {
192 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
193 struct DecoratingEventHandler<
196 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
197 RGS: Deref<Target = RapidGossipSync<G, L>>,
198 G: Deref<Target = NetworkGraph<L>>,
202 where A::Target: chain::Access, L::Target: Logger {
204 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
210 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
211 RGS: Deref<Target = RapidGossipSync<G, L>>,
212 G: Deref<Target = NetworkGraph<L>>,
215 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
216 where A::Target: chain::Access, L::Target: Logger {
217 fn handle_event(&self, event: &Event) {
218 if let Some(network_graph) = self.gossip_sync.network_graph() {
219 network_graph.handle_event(event);
221 self.event_handler.handle_event(event);
225 impl BackgroundProcessor {
226 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
229 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
230 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
231 /// either [`join`] or [`stop`].
233 /// # Data Persistence
235 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
236 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
237 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
238 /// provided implementation.
240 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
241 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
242 /// See the `lightning-persister` crate for LDK's provided implementation.
244 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
245 /// error or call [`join`] and handle any error that may arise. For the latter case,
246 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
250 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
251 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
252 /// functionality implemented by other handlers.
253 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
255 /// # Rapid Gossip Sync
257 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
258 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
259 /// until the [`RapidGossipSync`] instance completes its first sync.
261 /// [top-level documentation]: BackgroundProcessor
262 /// [`join`]: Self::join
263 /// [`stop`]: Self::stop
264 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
265 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
266 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
267 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
268 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
269 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
272 Signer: 'static + Sign,
273 CA: 'static + Deref + Send + Sync,
274 CF: 'static + Deref + Send + Sync,
275 CW: 'static + Deref + Send + Sync,
276 T: 'static + Deref + Send + Sync,
277 K: 'static + Deref + Send + Sync,
278 F: 'static + Deref + Send + Sync,
279 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
280 L: 'static + Deref + Send + Sync,
281 P: 'static + Deref + Send + Sync,
282 Descriptor: 'static + SocketDescriptor + Send + Sync,
283 CMH: 'static + Deref + Send + Sync,
284 OMH: 'static + Deref + Send + Sync,
285 RMH: 'static + Deref + Send + Sync,
286 EH: 'static + EventHandler + Send,
287 PS: 'static + Deref + Send,
288 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
289 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
290 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
291 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
292 UMH: 'static + Deref + Send + Sync,
293 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
294 S: 'static + Deref<Target = SC> + Send + Sync,
295 SC: WriteableScore<'a>,
297 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
298 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
301 CA::Target: 'static + chain::Access,
302 CF::Target: 'static + chain::Filter,
303 CW::Target: 'static + chain::Watch<Signer>,
304 T::Target: 'static + BroadcasterInterface,
305 K::Target: 'static + KeysInterface<Signer = Signer>,
306 F::Target: 'static + FeeEstimator,
307 L::Target: 'static + Logger,
308 P::Target: 'static + Persist<Signer>,
309 CMH::Target: 'static + ChannelMessageHandler,
310 OMH::Target: 'static + OnionMessageHandler,
311 RMH::Target: 'static + RoutingMessageHandler,
312 UMH::Target: 'static + CustomMessageHandler,
313 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
315 let stop_thread = Arc::new(AtomicBool::new(false));
316 let stop_thread_clone = stop_thread.clone();
317 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
318 let event_handler = DecoratingEventHandler {
320 gossip_sync: &gossip_sync,
323 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
324 channel_manager.timer_tick_occurred();
326 let mut last_freshness_call = Instant::now();
327 let mut last_ping_call = Instant::now();
328 let mut last_prune_call = Instant::now();
329 let mut last_scorer_persist_call = Instant::now();
330 let mut have_pruned = false;
333 channel_manager.process_pending_events(&event_handler);
334 chain_monitor.process_pending_events(&event_handler);
336 // Note that the PeerManager::process_events may block on ChannelManager's locks,
337 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
338 // we want to ensure we get into `persist_manager` as quickly as we can, especially
339 // without running the normal event processing above and handing events to users.
341 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
342 // processing a message effectively at any point during this loop. In order to
343 // minimize the time between such processing completing and persisting the updated
344 // ChannelManager, we want to minimize methods blocking on a ChannelManager
345 // generally, and as a fallback place such blocking only immediately before
347 peer_manager.process_events();
349 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
350 // see `await_start`'s use below.
351 let await_start = Instant::now();
352 let updates_available =
353 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
354 let await_time = await_start.elapsed();
356 if updates_available {
357 log_trace!(logger, "Persisting ChannelManager...");
358 persister.persist_manager(&*channel_manager)?;
359 log_trace!(logger, "Done persisting ChannelManager.");
361 // Exit the loop if the background processor was requested to stop.
362 if stop_thread.load(Ordering::Acquire) == true {
363 log_trace!(logger, "Terminating background processor.");
366 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
367 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
368 channel_manager.timer_tick_occurred();
369 last_freshness_call = Instant::now();
371 if await_time > Duration::from_secs(1) {
372 // On various platforms, we may be starved of CPU cycles for several reasons.
373 // E.g. on iOS, if we've been in the background, we will be entirely paused.
374 // Similarly, if we're on a desktop platform and the device has been asleep, we
375 // may not get any cycles.
376 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
377 // full second, at which point we assume sockets may have been killed (they
378 // appear to be at least on some platforms, even if it has only been a second).
379 // Note that we have to take care to not get here just because user event
380 // processing was slow at the top of the loop. For example, the sample client
381 // may call Bitcoin Core RPCs during event handling, which very often takes
382 // more than a handful of seconds to complete, and shouldn't disconnect all our
384 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
385 peer_manager.disconnect_all_peers();
386 last_ping_call = Instant::now();
387 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
388 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
389 peer_manager.timer_tick_occurred();
390 last_ping_call = Instant::now();
393 // Note that we want to run a graph prune once not long after startup before
394 // falling back to our usual hourly prunes. This avoids short-lived clients never
395 // pruning their network graph. We run once 60 seconds after startup before
396 // continuing our normal cadence.
397 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
398 // The network graph must not be pruned while rapid sync completion is pending
399 log_trace!(logger, "Assessing prunability of network graph");
400 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
401 network_graph.remove_stale_channels();
403 if let Err(e) = persister.persist_graph(network_graph) {
404 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
407 last_prune_call = Instant::now();
410 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
414 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
415 if let Some(ref scorer) = scorer {
416 log_trace!(logger, "Persisting scorer");
417 if let Err(e) = persister.persist_scorer(&scorer) {
418 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
421 last_scorer_persist_call = Instant::now();
425 // After we exit, ensure we persist the ChannelManager one final time - this avoids
426 // some races where users quit while channel updates were in-flight, with
427 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
428 persister.persist_manager(&*channel_manager)?;
430 // Persist Scorer on exit
431 if let Some(ref scorer) = scorer {
432 persister.persist_scorer(&scorer)?;
435 // Persist NetworkGraph on exit
436 if let Some(network_graph) = gossip_sync.network_graph() {
437 persister.persist_graph(network_graph)?;
442 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
445 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
446 /// [`ChannelManager`].
450 /// This function panics if the background thread has panicked such as while persisting or
453 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
454 pub fn join(mut self) -> Result<(), std::io::Error> {
455 assert!(self.thread_handle.is_some());
459 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
460 /// [`ChannelManager`].
464 /// This function panics if the background thread has panicked such as while persisting or
467 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
468 pub fn stop(mut self) -> Result<(), std::io::Error> {
469 assert!(self.thread_handle.is_some());
470 self.stop_and_join_thread()
473 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
474 self.stop_thread.store(true, Ordering::Release);
478 fn join_thread(&mut self) -> Result<(), std::io::Error> {
479 match self.thread_handle.take() {
480 Some(handle) => handle.join().unwrap(),
486 impl Drop for BackgroundProcessor {
488 self.stop_and_join_thread().unwrap();
494 use bitcoin::blockdata::block::BlockHeader;
495 use bitcoin::blockdata::constants::genesis_block;
496 use bitcoin::blockdata::locktime::PackedLockTime;
497 use bitcoin::blockdata::transaction::{Transaction, TxOut};
498 use bitcoin::network::constants::Network;
499 use lightning::chain::{BestBlock, Confirm, chainmonitor};
500 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
501 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
502 use lightning::chain::transaction::OutPoint;
503 use lightning::get_event_msg;
504 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
505 use lightning::ln::features::{ChannelFeatures, InitFeatures};
506 use lightning::ln::msgs::{ChannelMessageHandler, Init};
507 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
508 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
509 use lightning::util::config::UserConfig;
510 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
511 use lightning::util::ser::Writeable;
512 use lightning::util::test_utils;
513 use lightning::util::persist::KVStorePersister;
514 use lightning_invoice::payment::{InvoicePayer, Retry};
515 use lightning_invoice::utils::DefaultRouter;
516 use lightning_persister::FilesystemPersister;
518 use std::path::PathBuf;
519 use std::sync::{Arc, Mutex};
520 use std::sync::mpsc::SyncSender;
521 use std::time::Duration;
522 use bitcoin::hashes::Hash;
523 use bitcoin::TxMerkleNode;
524 use lightning::routing::scoring::{FixedPenaltyScorer};
525 use lightning_rapid_gossip_sync::RapidGossipSync;
526 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
528 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
530 #[derive(Clone, Eq, Hash, PartialEq)]
531 struct TestDescriptor{}
532 impl SocketDescriptor for TestDescriptor {
533 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
537 fn disconnect_socket(&mut self) {}
540 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
542 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
543 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
546 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
547 p2p_gossip_sync: PGS,
548 rapid_gossip_sync: RGS,
549 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
550 chain_monitor: Arc<ChainMonitor>,
551 persister: Arc<FilesystemPersister>,
552 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
553 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
554 logger: Arc<test_utils::TestLogger>,
555 best_block: BestBlock,
556 scorer: Arc<Mutex<FixedPenaltyScorer>>,
560 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
561 GossipSync::P2P(self.p2p_gossip_sync.clone())
564 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
565 GossipSync::Rapid(self.rapid_gossip_sync.clone())
568 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
575 let data_dir = self.persister.get_data_dir();
576 match fs::remove_dir_all(data_dir.clone()) {
577 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
584 graph_error: Option<(std::io::ErrorKind, &'static str)>,
585 graph_persistence_notifier: Option<SyncSender<()>>,
586 manager_error: Option<(std::io::ErrorKind, &'static str)>,
587 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
588 filesystem_persister: FilesystemPersister,
592 fn new(data_dir: String) -> Self {
593 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
594 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
597 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
598 Self { graph_error: Some((error, message)), ..self }
601 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
602 Self { graph_persistence_notifier: Some(sender), ..self }
605 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
606 Self { manager_error: Some((error, message)), ..self }
609 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
610 Self { scorer_error: Some((error, message)), ..self }
614 impl KVStorePersister for Persister {
615 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
616 if key == "manager" {
617 if let Some((error, message)) = self.manager_error {
618 return Err(std::io::Error::new(error, message))
622 if key == "network_graph" {
623 if let Some(sender) = &self.graph_persistence_notifier {
624 sender.send(()).unwrap();
627 if let Some((error, message)) = self.graph_error {
628 return Err(std::io::Error::new(error, message))
633 if let Some((error, message)) = self.scorer_error {
634 return Err(std::io::Error::new(error, message))
638 self.filesystem_persister.persist(key, object)
642 fn get_full_filepath(filepath: String, filename: String) -> String {
643 let mut path = PathBuf::from(filepath);
645 path.to_str().unwrap().to_string()
648 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
649 let mut nodes = Vec::new();
650 for i in 0..num_nodes {
651 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
652 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
653 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
654 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
655 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
656 let seed = [i as u8; 32];
657 let network = Network::Testnet;
658 let genesis_block = genesis_block(network);
659 let now = Duration::from_secs(genesis_block.header.time as u64);
660 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
661 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
662 let best_block = BestBlock::from_genesis(network);
663 let params = ChainParameters { network, best_block };
664 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
665 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
666 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
667 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
668 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
669 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
670 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
671 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
675 for i in 0..num_nodes {
676 for j in (i+1)..num_nodes {
677 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
678 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
685 macro_rules! open_channel {
686 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
687 begin_open_channel!($node_a, $node_b, $channel_value);
688 let events = $node_a.node.get_and_clear_pending_events();
689 assert_eq!(events.len(), 1);
690 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
691 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
696 macro_rules! begin_open_channel {
697 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
698 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
699 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
700 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
704 macro_rules! handle_funding_generation_ready {
705 ($event: expr, $channel_value: expr) => {{
707 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
708 assert_eq!(channel_value_satoshis, $channel_value);
709 assert_eq!(user_channel_id, 42);
711 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
712 value: channel_value_satoshis, script_pubkey: output_script.clone(),
714 (temporary_channel_id, tx)
716 _ => panic!("Unexpected event"),
721 macro_rules! end_open_channel {
722 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
723 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
724 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
725 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
729 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
731 let prev_blockhash = node.best_block.block_hash();
732 let height = node.best_block.height() + 1;
733 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
734 let txdata = vec![(0, tx)];
735 node.best_block = BestBlock::new(header.block_hash(), height);
738 node.node.transactions_confirmed(&header, &txdata, height);
739 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
742 node.node.best_block_updated(&header, height);
743 node.chain_monitor.best_block_updated(&header, height);
749 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
750 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
754 fn test_background_processor() {
755 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
756 // updates. Also test that when new updates are available, the manager signals that it needs
757 // re-persistence and is successfully re-persisted.
758 let nodes = create_nodes(2, "test_background_processor".to_string());
760 // Go through the channel creation process so that each node has something to persist. Since
761 // open_channel consumes events, it must complete before starting BackgroundProcessor to
762 // avoid a race with processing events.
763 let tx = open_channel!(nodes[0], nodes[1], 100000);
765 // Initiate the background processors to watch each node.
766 let data_dir = nodes[0].persister.get_data_dir();
767 let persister = Arc::new(Persister::new(data_dir));
768 let event_handler = |_: &_| {};
769 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
771 macro_rules! check_persisted_data {
772 ($node: expr, $filepath: expr) => {
773 let mut expected_bytes = Vec::new();
775 expected_bytes.clear();
776 match $node.write(&mut expected_bytes) {
778 match std::fs::read($filepath) {
780 if bytes == expected_bytes {
789 Err(e) => panic!("Unexpected error: {}", e)
795 // Check that the initial channel manager data is persisted as expected.
796 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
797 check_persisted_data!(nodes[0].node, filepath.clone());
800 if !nodes[0].node.get_persistence_condvar_value() { break }
803 // Force-close the channel.
804 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
806 // Check that the force-close updates are persisted.
807 check_persisted_data!(nodes[0].node, filepath.clone());
809 if !nodes[0].node.get_persistence_condvar_value() { break }
812 // Check network graph is persisted
813 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
814 check_persisted_data!(nodes[0].network_graph, filepath.clone());
816 // Check scorer is persisted
817 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
818 check_persisted_data!(nodes[0].scorer, filepath.clone());
820 assert!(bg_processor.stop().is_ok());
824 fn test_timer_tick_called() {
825 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
826 // `FRESHNESS_TIMER`.
827 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
828 let data_dir = nodes[0].persister.get_data_dir();
829 let persister = Arc::new(Persister::new(data_dir));
830 let event_handler = |_: &_| {};
831 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
833 let log_entries = nodes[0].logger.lines.lock().unwrap();
834 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
835 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
836 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
837 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
842 assert!(bg_processor.stop().is_ok());
846 fn test_channel_manager_persist_error() {
847 // Test that if we encounter an error during manager persistence, the thread panics.
848 let nodes = create_nodes(2, "test_persist_error".to_string());
849 open_channel!(nodes[0], nodes[1], 100000);
851 let data_dir = nodes[0].persister.get_data_dir();
852 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
853 let event_handler = |_: &_| {};
854 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
855 match bg_processor.join() {
856 Ok(_) => panic!("Expected error persisting manager"),
858 assert_eq!(e.kind(), std::io::ErrorKind::Other);
859 assert_eq!(e.get_ref().unwrap().to_string(), "test");
865 fn test_network_graph_persist_error() {
866 // Test that if we encounter an error during network graph persistence, an error gets returned.
867 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
868 let data_dir = nodes[0].persister.get_data_dir();
869 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
870 let event_handler = |_: &_| {};
871 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
873 match bg_processor.stop() {
874 Ok(_) => panic!("Expected error persisting network graph"),
876 assert_eq!(e.kind(), std::io::ErrorKind::Other);
877 assert_eq!(e.get_ref().unwrap().to_string(), "test");
883 fn test_scorer_persist_error() {
884 // Test that if we encounter an error during scorer persistence, an error gets returned.
885 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
886 let data_dir = nodes[0].persister.get_data_dir();
887 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
888 let event_handler = |_: &_| {};
889 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
891 match bg_processor.stop() {
892 Ok(_) => panic!("Expected error persisting scorer"),
894 assert_eq!(e.kind(), std::io::ErrorKind::Other);
895 assert_eq!(e.get_ref().unwrap().to_string(), "test");
901 fn test_background_event_handling() {
902 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
903 let channel_value = 100000;
904 let data_dir = nodes[0].persister.get_data_dir();
905 let persister = Arc::new(Persister::new(data_dir.clone()));
907 // Set up a background event handler for FundingGenerationReady events.
908 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
909 let event_handler = move |event: &Event| {
910 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
912 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
914 // Open a channel and check that the FundingGenerationReady event was handled.
915 begin_open_channel!(nodes[0], nodes[1], channel_value);
916 let (temporary_channel_id, funding_tx) = receiver
917 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
918 .expect("FundingGenerationReady not handled within deadline");
919 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
921 // Confirm the funding transaction.
922 confirm_transaction(&mut nodes[0], &funding_tx);
923 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
924 confirm_transaction(&mut nodes[1], &funding_tx);
925 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
926 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
927 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
928 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
929 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
931 assert!(bg_processor.stop().is_ok());
933 // Set up a background event handler for SpendableOutputs events.
934 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
935 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
936 let persister = Arc::new(Persister::new(data_dir));
937 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
939 // Force close the channel and check that the SpendableOutputs event was handled.
940 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
941 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
942 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
944 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
945 .expect("SpendableOutputs not handled within deadline");
947 Event::SpendableOutputs { .. } => {},
948 Event::ChannelClosed { .. } => {},
949 _ => panic!("Unexpected event: {:?}", event),
952 assert!(bg_processor.stop().is_ok());
956 fn test_scorer_persistence() {
957 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
958 let data_dir = nodes[0].persister.get_data_dir();
959 let persister = Arc::new(Persister::new(data_dir));
960 let event_handler = |_: &_| {};
961 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
964 let log_entries = nodes[0].logger.lines.lock().unwrap();
965 let expected_log = "Persisting scorer".to_string();
966 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
971 assert!(bg_processor.stop().is_ok());
975 fn test_not_pruning_network_graph_until_graph_sync_completion() {
976 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
977 let data_dir = nodes[0].persister.get_data_dir();
978 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
979 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
980 let network_graph = nodes[0].network_graph.clone();
981 let features = ChannelFeatures::empty();
982 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
983 .expect("Failed to update channel from partial announcement");
984 let original_graph_description = network_graph.to_string();
985 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
986 assert_eq!(network_graph.read_only().channels().len(), 1);
988 let event_handler = |_: &_| {};
989 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
992 let log_entries = nodes[0].logger.lines.lock().unwrap();
993 let expected_log_a = "Assessing prunability of network graph".to_string();
994 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
995 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
996 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1001 let initialization_input = vec![
1002 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1003 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1004 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1005 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1006 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1007 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1008 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1009 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1010 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1011 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1012 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1013 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1014 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1016 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1018 // this should have added two channels
1019 assert_eq!(network_graph.read_only().channels().len(), 3);
1022 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1023 .expect("Network graph not pruned within deadline");
1025 background_processor.stop().unwrap();
1027 // all channels should now be pruned
1028 assert_eq!(network_graph.read_only().channels().len(), 0);
1032 fn test_invoice_payer() {
1033 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1034 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1035 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1037 // Initiate the background processors to watch each node.
1038 let data_dir = nodes[0].persister.get_data_dir();
1039 let persister = Arc::new(Persister::new(data_dir));
1040 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1041 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1042 let event_handler = Arc::clone(&invoice_payer);
1043 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1044 assert!(bg_processor.stop().is_ok());