1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
38 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
39 /// responsibilities are:
40 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
41 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
42 /// writing it to disk/backups by invoking the callback given to it at startup.
43 /// [`ChannelManager`] persistence should be done in the background.
44 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
45 /// at the appropriate intervals.
46 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
47 /// is provided to [`BackgroundProcessor::start`]).
49 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
50 /// upon as doing so may result in high latency.
54 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
55 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
56 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
57 /// unilateral chain closure fees are at risk.
59 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
60 /// [`Event`]: lightning::util::events::Event
61 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
62 pub struct BackgroundProcessor {
63 stop_thread: Arc<AtomicBool>,
64 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 const FRESHNESS_TIMER: u64 = 60;
70 const FRESHNESS_TIMER: u64 = 1;
72 #[cfg(all(not(test), not(debug_assertions)))]
73 const PING_TIMER: u64 = 10;
74 /// Signature operations take a lot longer without compiler optimisations.
75 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
76 /// timeout is reached.
77 #[cfg(all(not(test), debug_assertions))]
78 const PING_TIMER: u64 = 30;
80 const PING_TIMER: u64 = 1;
82 /// Prune the network graph of stale entries hourly.
83 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
86 const SCORER_PERSIST_TIMER: u64 = 30;
88 const SCORER_PERSIST_TIMER: u64 = 1;
91 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
93 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
95 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
97 P: Deref<Target = P2PGossipSync<G, A, L>>,
98 R: Deref<Target = RapidGossipSync<G, L>>,
99 G: Deref<Target = NetworkGraph<L>>,
103 where A::Target: chain::Access, L::Target: Logger {
104 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
106 /// Rapid gossip sync from a trusted server.
113 P: Deref<Target = P2PGossipSync<G, A, L>>,
114 R: Deref<Target = RapidGossipSync<G, L>>,
115 G: Deref<Target = NetworkGraph<L>>,
118 > GossipSync<P, R, G, A, L>
119 where A::Target: chain::Access, L::Target: Logger {
120 fn network_graph(&self) -> Option<&G> {
122 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
123 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
124 GossipSync::None => None,
128 fn prunable_network_graph(&self) -> Option<&G> {
130 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
131 GossipSync::Rapid(gossip_sync) => {
132 if gossip_sync.is_initial_sync_complete() {
133 Some(gossip_sync.network_graph())
138 GossipSync::None => None,
143 /// (C-not exported) as the bindings concretize everything and have constructors for us
144 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
145 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
147 A::Target: chain::Access,
150 /// Initializes a new [`GossipSync::P2P`] variant.
151 pub fn p2p(gossip_sync: P) -> Self {
152 GossipSync::P2P(gossip_sync)
156 /// (C-not exported) as the bindings concretize everything and have constructors for us
157 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
159 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
162 &'a (dyn chain::Access + Send + Sync),
168 /// Initializes a new [`GossipSync::Rapid`] variant.
169 pub fn rapid(gossip_sync: R) -> Self {
170 GossipSync::Rapid(gossip_sync)
174 /// (C-not exported) as the bindings concretize everything and have constructors for us
177 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
178 &RapidGossipSync<&'a NetworkGraph<L>, L>,
180 &'a (dyn chain::Access + Send + Sync),
186 /// Initializes a new [`GossipSync::None`] variant.
187 pub fn none() -> Self {
192 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
193 struct DecoratingEventHandler<
196 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
197 RGS: Deref<Target = RapidGossipSync<G, L>>,
198 G: Deref<Target = NetworkGraph<L>>,
202 where A::Target: chain::Access, L::Target: Logger {
204 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
210 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
211 RGS: Deref<Target = RapidGossipSync<G, L>>,
212 G: Deref<Target = NetworkGraph<L>>,
215 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
216 where A::Target: chain::Access, L::Target: Logger {
217 fn handle_event(&self, event: &Event) {
218 if let Some(network_graph) = self.gossip_sync.network_graph() {
219 network_graph.handle_event(event);
221 self.event_handler.handle_event(event);
225 impl BackgroundProcessor {
226 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
229 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
230 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
231 /// either [`join`] or [`stop`].
233 /// # Data Persistence
235 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
236 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
237 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
238 /// provided implementation.
240 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
241 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
242 /// See the `lightning-persister` crate for LDK's provided implementation.
244 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
245 /// error or call [`join`] and handle any error that may arise. For the latter case,
246 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
250 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
251 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
252 /// functionality implemented by other handlers.
253 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
255 /// # Rapid Gossip Sync
257 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
258 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
259 /// until the [`RapidGossipSync`] instance completes its first sync.
261 /// [top-level documentation]: BackgroundProcessor
262 /// [`join`]: Self::join
263 /// [`stop`]: Self::stop
264 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
265 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
266 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
267 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
268 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
269 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
272 Signer: 'static + Sign,
273 CA: 'static + Deref + Send + Sync,
274 CF: 'static + Deref + Send + Sync,
275 CW: 'static + Deref + Send + Sync,
276 T: 'static + Deref + Send + Sync,
277 K: 'static + Deref + Send + Sync,
278 F: 'static + Deref + Send + Sync,
279 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
280 L: 'static + Deref + Send + Sync,
281 P: 'static + Deref + Send + Sync,
282 Descriptor: 'static + SocketDescriptor + Send + Sync,
283 CMH: 'static + Deref + Send + Sync,
284 RMH: 'static + Deref + Send + Sync,
285 EH: 'static + EventHandler + Send,
286 PS: 'static + Deref + Send,
287 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
288 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
289 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
290 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
291 UMH: 'static + Deref + Send + Sync,
292 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
293 S: 'static + Deref<Target = SC> + Send + Sync,
294 SC: WriteableScore<'a>,
296 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
297 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
300 CA::Target: 'static + chain::Access,
301 CF::Target: 'static + chain::Filter,
302 CW::Target: 'static + chain::Watch<Signer>,
303 T::Target: 'static + BroadcasterInterface,
304 K::Target: 'static + KeysInterface<Signer = Signer>,
305 F::Target: 'static + FeeEstimator,
306 L::Target: 'static + Logger,
307 P::Target: 'static + Persist<Signer>,
308 CMH::Target: 'static + ChannelMessageHandler,
309 RMH::Target: 'static + RoutingMessageHandler,
310 UMH::Target: 'static + CustomMessageHandler,
311 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
313 let stop_thread = Arc::new(AtomicBool::new(false));
314 let stop_thread_clone = stop_thread.clone();
315 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
316 let event_handler = DecoratingEventHandler {
318 gossip_sync: &gossip_sync,
321 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
322 channel_manager.timer_tick_occurred();
324 let mut last_freshness_call = Instant::now();
325 let mut last_ping_call = Instant::now();
326 let mut last_prune_call = Instant::now();
327 let mut last_scorer_persist_call = Instant::now();
328 let mut have_pruned = false;
331 channel_manager.process_pending_events(&event_handler);
332 chain_monitor.process_pending_events(&event_handler);
334 // Note that the PeerManager::process_events may block on ChannelManager's locks,
335 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
336 // we want to ensure we get into `persist_manager` as quickly as we can, especially
337 // without running the normal event processing above and handing events to users.
339 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
340 // processing a message effectively at any point during this loop. In order to
341 // minimize the time between such processing completing and persisting the updated
342 // ChannelManager, we want to minimize methods blocking on a ChannelManager
343 // generally, and as a fallback place such blocking only immediately before
345 peer_manager.process_events();
347 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
348 // see `await_start`'s use below.
349 let await_start = Instant::now();
350 let updates_available =
351 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
352 let await_time = await_start.elapsed();
354 if updates_available {
355 log_trace!(logger, "Persisting ChannelManager...");
356 persister.persist_manager(&*channel_manager)?;
357 log_trace!(logger, "Done persisting ChannelManager.");
359 // Exit the loop if the background processor was requested to stop.
360 if stop_thread.load(Ordering::Acquire) == true {
361 log_trace!(logger, "Terminating background processor.");
364 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
365 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
366 channel_manager.timer_tick_occurred();
367 last_freshness_call = Instant::now();
369 if await_time > Duration::from_secs(1) {
370 // On various platforms, we may be starved of CPU cycles for several reasons.
371 // E.g. on iOS, if we've been in the background, we will be entirely paused.
372 // Similarly, if we're on a desktop platform and the device has been asleep, we
373 // may not get any cycles.
374 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
375 // full second, at which point we assume sockets may have been killed (they
376 // appear to be at least on some platforms, even if it has only been a second).
377 // Note that we have to take care to not get here just because user event
378 // processing was slow at the top of the loop. For example, the sample client
379 // may call Bitcoin Core RPCs during event handling, which very often takes
380 // more than a handful of seconds to complete, and shouldn't disconnect all our
382 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
383 peer_manager.disconnect_all_peers();
384 last_ping_call = Instant::now();
385 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
386 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
387 peer_manager.timer_tick_occurred();
388 last_ping_call = Instant::now();
391 // Note that we want to run a graph prune once not long after startup before
392 // falling back to our usual hourly prunes. This avoids short-lived clients never
393 // pruning their network graph. We run once 60 seconds after startup before
394 // continuing our normal cadence.
395 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
396 // The network graph must not be pruned while rapid sync completion is pending
397 log_trace!(logger, "Assessing prunability of network graph");
398 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
399 network_graph.remove_stale_channels();
401 if let Err(e) = persister.persist_graph(network_graph) {
402 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
405 last_prune_call = Instant::now();
408 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
412 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
413 if let Some(ref scorer) = scorer {
414 log_trace!(logger, "Persisting scorer");
415 if let Err(e) = persister.persist_scorer(&scorer) {
416 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
419 last_scorer_persist_call = Instant::now();
423 // After we exit, ensure we persist the ChannelManager one final time - this avoids
424 // some races where users quit while channel updates were in-flight, with
425 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
426 persister.persist_manager(&*channel_manager)?;
428 // Persist Scorer on exit
429 if let Some(ref scorer) = scorer {
430 persister.persist_scorer(&scorer)?;
433 // Persist NetworkGraph on exit
434 if let Some(network_graph) = gossip_sync.network_graph() {
435 persister.persist_graph(network_graph)?;
440 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
443 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
444 /// [`ChannelManager`].
448 /// This function panics if the background thread has panicked such as while persisting or
451 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
452 pub fn join(mut self) -> Result<(), std::io::Error> {
453 assert!(self.thread_handle.is_some());
457 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
458 /// [`ChannelManager`].
462 /// This function panics if the background thread has panicked such as while persisting or
465 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
466 pub fn stop(mut self) -> Result<(), std::io::Error> {
467 assert!(self.thread_handle.is_some());
468 self.stop_and_join_thread()
471 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
472 self.stop_thread.store(true, Ordering::Release);
476 fn join_thread(&mut self) -> Result<(), std::io::Error> {
477 match self.thread_handle.take() {
478 Some(handle) => handle.join().unwrap(),
484 impl Drop for BackgroundProcessor {
486 self.stop_and_join_thread().unwrap();
492 use bitcoin::blockdata::block::BlockHeader;
493 use bitcoin::blockdata::constants::genesis_block;
494 use bitcoin::blockdata::locktime::PackedLockTime;
495 use bitcoin::blockdata::transaction::{Transaction, TxOut};
496 use bitcoin::network::constants::Network;
497 use lightning::chain::{BestBlock, Confirm, chainmonitor};
498 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
499 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
500 use lightning::chain::transaction::OutPoint;
501 use lightning::get_event_msg;
502 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
503 use lightning::ln::features::{ChannelFeatures, InitFeatures};
504 use lightning::ln::msgs::{ChannelMessageHandler, Init};
505 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
506 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
507 use lightning::util::config::UserConfig;
508 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
509 use lightning::util::ser::Writeable;
510 use lightning::util::test_utils;
511 use lightning::util::persist::KVStorePersister;
512 use lightning_invoice::payment::{InvoicePayer, Retry};
513 use lightning_invoice::utils::DefaultRouter;
514 use lightning_persister::FilesystemPersister;
516 use std::path::PathBuf;
517 use std::sync::{Arc, Mutex};
518 use std::sync::mpsc::SyncSender;
519 use std::time::Duration;
520 use bitcoin::hashes::Hash;
521 use bitcoin::TxMerkleNode;
522 use lightning::routing::scoring::{FixedPenaltyScorer};
523 use lightning_rapid_gossip_sync::RapidGossipSync;
524 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
526 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
528 #[derive(Clone, Eq, Hash, PartialEq)]
529 struct TestDescriptor{}
530 impl SocketDescriptor for TestDescriptor {
531 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
535 fn disconnect_socket(&mut self) {}
538 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
540 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
541 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
544 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
545 p2p_gossip_sync: PGS,
546 rapid_gossip_sync: RGS,
547 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
548 chain_monitor: Arc<ChainMonitor>,
549 persister: Arc<FilesystemPersister>,
550 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
551 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
552 logger: Arc<test_utils::TestLogger>,
553 best_block: BestBlock,
554 scorer: Arc<Mutex<FixedPenaltyScorer>>,
558 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
559 GossipSync::P2P(self.p2p_gossip_sync.clone())
562 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
563 GossipSync::Rapid(self.rapid_gossip_sync.clone())
566 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
573 let data_dir = self.persister.get_data_dir();
574 match fs::remove_dir_all(data_dir.clone()) {
575 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
582 graph_error: Option<(std::io::ErrorKind, &'static str)>,
583 graph_persistence_notifier: Option<SyncSender<()>>,
584 manager_error: Option<(std::io::ErrorKind, &'static str)>,
585 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
586 filesystem_persister: FilesystemPersister,
590 fn new(data_dir: String) -> Self {
591 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
592 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
595 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
596 Self { graph_error: Some((error, message)), ..self }
599 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
600 Self { graph_persistence_notifier: Some(sender), ..self }
603 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
604 Self { manager_error: Some((error, message)), ..self }
607 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
608 Self { scorer_error: Some((error, message)), ..self }
612 impl KVStorePersister for Persister {
613 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
614 if key == "manager" {
615 if let Some((error, message)) = self.manager_error {
616 return Err(std::io::Error::new(error, message))
620 if key == "network_graph" {
621 if let Some(sender) = &self.graph_persistence_notifier {
622 sender.send(()).unwrap();
625 if let Some((error, message)) = self.graph_error {
626 return Err(std::io::Error::new(error, message))
631 if let Some((error, message)) = self.scorer_error {
632 return Err(std::io::Error::new(error, message))
636 self.filesystem_persister.persist(key, object)
640 fn get_full_filepath(filepath: String, filename: String) -> String {
641 let mut path = PathBuf::from(filepath);
643 path.to_str().unwrap().to_string()
646 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
647 let mut nodes = Vec::new();
648 for i in 0..num_nodes {
649 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
650 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
651 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
652 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
653 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
654 let seed = [i as u8; 32];
655 let network = Network::Testnet;
656 let genesis_block = genesis_block(network);
657 let now = Duration::from_secs(genesis_block.header.time as u64);
658 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
659 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
660 let best_block = BestBlock::from_genesis(network);
661 let params = ChainParameters { network, best_block };
662 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
663 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
664 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
665 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
666 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
667 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
668 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
669 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
673 for i in 0..num_nodes {
674 for j in (i+1)..num_nodes {
675 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
676 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
683 macro_rules! open_channel {
684 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
685 begin_open_channel!($node_a, $node_b, $channel_value);
686 let events = $node_a.node.get_and_clear_pending_events();
687 assert_eq!(events.len(), 1);
688 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
689 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
694 macro_rules! begin_open_channel {
695 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
696 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
697 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
698 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
702 macro_rules! handle_funding_generation_ready {
703 ($event: expr, $channel_value: expr) => {{
705 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
706 assert_eq!(channel_value_satoshis, $channel_value);
707 assert_eq!(user_channel_id, 42);
709 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
710 value: channel_value_satoshis, script_pubkey: output_script.clone(),
712 (temporary_channel_id, tx)
714 _ => panic!("Unexpected event"),
719 macro_rules! end_open_channel {
720 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
721 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
722 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
723 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
727 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
729 let prev_blockhash = node.best_block.block_hash();
730 let height = node.best_block.height() + 1;
731 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
732 let txdata = vec![(0, tx)];
733 node.best_block = BestBlock::new(header.block_hash(), height);
736 node.node.transactions_confirmed(&header, &txdata, height);
737 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
740 node.node.best_block_updated(&header, height);
741 node.chain_monitor.best_block_updated(&header, height);
747 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
748 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
752 fn test_background_processor() {
753 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
754 // updates. Also test that when new updates are available, the manager signals that it needs
755 // re-persistence and is successfully re-persisted.
756 let nodes = create_nodes(2, "test_background_processor".to_string());
758 // Go through the channel creation process so that each node has something to persist. Since
759 // open_channel consumes events, it must complete before starting BackgroundProcessor to
760 // avoid a race with processing events.
761 let tx = open_channel!(nodes[0], nodes[1], 100000);
763 // Initiate the background processors to watch each node.
764 let data_dir = nodes[0].persister.get_data_dir();
765 let persister = Arc::new(Persister::new(data_dir));
766 let event_handler = |_: &_| {};
767 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
769 macro_rules! check_persisted_data {
770 ($node: expr, $filepath: expr) => {
771 let mut expected_bytes = Vec::new();
773 expected_bytes.clear();
774 match $node.write(&mut expected_bytes) {
776 match std::fs::read($filepath) {
778 if bytes == expected_bytes {
787 Err(e) => panic!("Unexpected error: {}", e)
793 // Check that the initial channel manager data is persisted as expected.
794 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
795 check_persisted_data!(nodes[0].node, filepath.clone());
798 if !nodes[0].node.get_persistence_condvar_value() { break }
801 // Force-close the channel.
802 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
804 // Check that the force-close updates are persisted.
805 check_persisted_data!(nodes[0].node, filepath.clone());
807 if !nodes[0].node.get_persistence_condvar_value() { break }
810 // Check network graph is persisted
811 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
812 check_persisted_data!(nodes[0].network_graph, filepath.clone());
814 // Check scorer is persisted
815 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
816 check_persisted_data!(nodes[0].scorer, filepath.clone());
818 assert!(bg_processor.stop().is_ok());
822 fn test_timer_tick_called() {
823 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
824 // `FRESHNESS_TIMER`.
825 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
826 let data_dir = nodes[0].persister.get_data_dir();
827 let persister = Arc::new(Persister::new(data_dir));
828 let event_handler = |_: &_| {};
829 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
831 let log_entries = nodes[0].logger.lines.lock().unwrap();
832 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
833 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
834 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
835 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
840 assert!(bg_processor.stop().is_ok());
844 fn test_channel_manager_persist_error() {
845 // Test that if we encounter an error during manager persistence, the thread panics.
846 let nodes = create_nodes(2, "test_persist_error".to_string());
847 open_channel!(nodes[0], nodes[1], 100000);
849 let data_dir = nodes[0].persister.get_data_dir();
850 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
851 let event_handler = |_: &_| {};
852 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
853 match bg_processor.join() {
854 Ok(_) => panic!("Expected error persisting manager"),
856 assert_eq!(e.kind(), std::io::ErrorKind::Other);
857 assert_eq!(e.get_ref().unwrap().to_string(), "test");
863 fn test_network_graph_persist_error() {
864 // Test that if we encounter an error during network graph persistence, an error gets returned.
865 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
866 let data_dir = nodes[0].persister.get_data_dir();
867 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
868 let event_handler = |_: &_| {};
869 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
871 match bg_processor.stop() {
872 Ok(_) => panic!("Expected error persisting network graph"),
874 assert_eq!(e.kind(), std::io::ErrorKind::Other);
875 assert_eq!(e.get_ref().unwrap().to_string(), "test");
881 fn test_scorer_persist_error() {
882 // Test that if we encounter an error during scorer persistence, an error gets returned.
883 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
884 let data_dir = nodes[0].persister.get_data_dir();
885 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
886 let event_handler = |_: &_| {};
887 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
889 match bg_processor.stop() {
890 Ok(_) => panic!("Expected error persisting scorer"),
892 assert_eq!(e.kind(), std::io::ErrorKind::Other);
893 assert_eq!(e.get_ref().unwrap().to_string(), "test");
899 fn test_background_event_handling() {
900 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
901 let channel_value = 100000;
902 let data_dir = nodes[0].persister.get_data_dir();
903 let persister = Arc::new(Persister::new(data_dir.clone()));
905 // Set up a background event handler for FundingGenerationReady events.
906 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
907 let event_handler = move |event: &Event| {
908 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
910 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
912 // Open a channel and check that the FundingGenerationReady event was handled.
913 begin_open_channel!(nodes[0], nodes[1], channel_value);
914 let (temporary_channel_id, funding_tx) = receiver
915 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
916 .expect("FundingGenerationReady not handled within deadline");
917 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
919 // Confirm the funding transaction.
920 confirm_transaction(&mut nodes[0], &funding_tx);
921 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
922 confirm_transaction(&mut nodes[1], &funding_tx);
923 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
924 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
925 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
926 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
927 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
929 assert!(bg_processor.stop().is_ok());
931 // Set up a background event handler for SpendableOutputs events.
932 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
933 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
934 let persister = Arc::new(Persister::new(data_dir));
935 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
937 // Force close the channel and check that the SpendableOutputs event was handled.
938 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
939 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
940 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
942 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
943 .expect("SpendableOutputs not handled within deadline");
945 Event::SpendableOutputs { .. } => {},
946 Event::ChannelClosed { .. } => {},
947 _ => panic!("Unexpected event: {:?}", event),
950 assert!(bg_processor.stop().is_ok());
954 fn test_scorer_persistence() {
955 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
956 let data_dir = nodes[0].persister.get_data_dir();
957 let persister = Arc::new(Persister::new(data_dir));
958 let event_handler = |_: &_| {};
959 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
962 let log_entries = nodes[0].logger.lines.lock().unwrap();
963 let expected_log = "Persisting scorer".to_string();
964 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
969 assert!(bg_processor.stop().is_ok());
973 fn test_not_pruning_network_graph_until_graph_sync_completion() {
974 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
975 let data_dir = nodes[0].persister.get_data_dir();
976 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
977 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
978 let network_graph = nodes[0].network_graph.clone();
979 let features = ChannelFeatures::empty();
980 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
981 .expect("Failed to update channel from partial announcement");
982 let original_graph_description = network_graph.to_string();
983 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
984 assert_eq!(network_graph.read_only().channels().len(), 1);
986 let event_handler = |_: &_| {};
987 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
990 let log_entries = nodes[0].logger.lines.lock().unwrap();
991 let expected_log_a = "Assessing prunability of network graph".to_string();
992 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
993 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
994 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
999 let initialization_input = vec![
1000 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1001 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1002 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1003 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1004 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1005 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1006 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1007 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1008 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1009 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1010 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1011 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1012 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1014 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1016 // this should have added two channels
1017 assert_eq!(network_graph.read_only().channels().len(), 3);
1020 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1021 .expect("Network graph not pruned within deadline");
1023 background_processor.stop().unwrap();
1025 // all channels should now be pruned
1026 assert_eq!(network_graph.read_only().channels().len(), 0);
1030 fn test_invoice_payer() {
1031 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1032 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1033 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1035 // Initiate the background processors to watch each node.
1036 let data_dir = nodes[0].persister.get_data_dir();
1037 let persister = Arc::new(Persister::new(data_dir));
1038 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1039 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1040 let event_handler = Arc::clone(&invoice_payer);
1041 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1042 assert!(bg_processor.stop().is_ok());