1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 #[cfg(feature = "futures")]
38 use futures::{select, future::FutureExt};
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 /// writing it to disk/backups by invoking the callback given to it at startup.
46 /// [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 /// at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66 stop_thread: Arc<AtomicBool>,
67 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
71 const FRESHNESS_TIMER: u64 = 60;
73 const FRESHNESS_TIMER: u64 = 1;
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
83 const PING_TIMER: u64 = 1;
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
89 const SCORER_PERSIST_TIMER: u64 = 30;
91 const SCORER_PERSIST_TIMER: u64 = 1;
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 P: Deref<Target = P2PGossipSync<G, A, L>>,
101 R: Deref<Target = RapidGossipSync<G, L>>,
102 G: Deref<Target = NetworkGraph<L>>,
106 where A::Target: chain::Access, L::Target: Logger {
107 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109 /// Rapid gossip sync from a trusted server.
116 P: Deref<Target = P2PGossipSync<G, A, L>>,
117 R: Deref<Target = RapidGossipSync<G, L>>,
118 G: Deref<Target = NetworkGraph<L>>,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123 fn network_graph(&self) -> Option<&G> {
125 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::None => None,
131 fn prunable_network_graph(&self) -> Option<&G> {
133 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134 GossipSync::Rapid(gossip_sync) => {
135 if gossip_sync.is_initial_sync_complete() {
136 Some(gossip_sync.network_graph())
141 GossipSync::None => None,
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 A::Target: chain::Access,
153 /// Initializes a new [`GossipSync::P2P`] variant.
154 pub fn p2p(gossip_sync: P) -> Self {
155 GossipSync::P2P(gossip_sync)
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
165 &'a (dyn chain::Access + Send + Sync),
171 /// Initializes a new [`GossipSync::Rapid`] variant.
172 pub fn rapid(gossip_sync: R) -> Self {
173 GossipSync::Rapid(gossip_sync)
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
180 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183 &'a (dyn chain::Access + Send + Sync),
189 /// Initializes a new [`GossipSync::None`] variant.
190 pub fn none() -> Self {
195 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
196 struct DecoratingEventHandler<
199 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
200 RGS: Deref<Target = RapidGossipSync<G, L>>,
201 G: Deref<Target = NetworkGraph<L>>,
205 where A::Target: chain::Access, L::Target: Logger {
207 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
213 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
214 RGS: Deref<Target = RapidGossipSync<G, L>>,
215 G: Deref<Target = NetworkGraph<L>>,
218 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
219 where A::Target: chain::Access, L::Target: Logger {
220 fn handle_event(&self, event: &Event) {
221 if let Some(network_graph) = self.gossip_sync.network_graph() {
222 network_graph.handle_event(event);
224 self.event_handler.handle_event(event);
228 macro_rules! define_run_body {
229 ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
230 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231 $loop_exit_check: expr, $await: expr)
233 let event_handler = DecoratingEventHandler {
234 event_handler: $event_handler,
235 gossip_sync: &$gossip_sync,
238 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
239 $channel_manager.timer_tick_occurred();
241 let mut last_freshness_call = Instant::now();
242 let mut last_ping_call = Instant::now();
243 let mut last_prune_call = Instant::now();
244 let mut last_scorer_persist_call = Instant::now();
245 let mut have_pruned = false;
248 $channel_manager.process_pending_events(&event_handler);
249 $chain_monitor.process_pending_events(&event_handler);
251 // Note that the PeerManager::process_events may block on ChannelManager's locks,
252 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
253 // we want to ensure we get into `persist_manager` as quickly as we can, especially
254 // without running the normal event processing above and handing events to users.
256 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
257 // processing a message effectively at any point during this loop. In order to
258 // minimize the time between such processing completing and persisting the updated
259 // ChannelManager, we want to minimize methods blocking on a ChannelManager
260 // generally, and as a fallback place such blocking only immediately before
262 $peer_manager.process_events();
264 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
265 // see `await_start`'s use below.
266 let await_start = Instant::now();
267 let updates_available = $await;
268 let await_time = await_start.elapsed();
270 if updates_available {
271 log_trace!($logger, "Persisting ChannelManager...");
272 $persister.persist_manager(&*$channel_manager)?;
273 log_trace!($logger, "Done persisting ChannelManager.");
275 // Exit the loop if the background processor was requested to stop.
276 if $loop_exit_check {
277 log_trace!($logger, "Terminating background processor.");
280 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
281 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
282 $channel_manager.timer_tick_occurred();
283 last_freshness_call = Instant::now();
285 if await_time > Duration::from_secs(1) {
286 // On various platforms, we may be starved of CPU cycles for several reasons.
287 // E.g. on iOS, if we've been in the background, we will be entirely paused.
288 // Similarly, if we're on a desktop platform and the device has been asleep, we
289 // may not get any cycles.
290 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
291 // full second, at which point we assume sockets may have been killed (they
292 // appear to be at least on some platforms, even if it has only been a second).
293 // Note that we have to take care to not get here just because user event
294 // processing was slow at the top of the loop. For example, the sample client
295 // may call Bitcoin Core RPCs during event handling, which very often takes
296 // more than a handful of seconds to complete, and shouldn't disconnect all our
298 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
299 $peer_manager.disconnect_all_peers();
300 last_ping_call = Instant::now();
301 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
302 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
303 $peer_manager.timer_tick_occurred();
304 last_ping_call = Instant::now();
307 // Note that we want to run a graph prune once not long after startup before
308 // falling back to our usual hourly prunes. This avoids short-lived clients never
309 // pruning their network graph. We run once 60 seconds after startup before
310 // continuing our normal cadence.
311 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
312 // The network graph must not be pruned while rapid sync completion is pending
313 log_trace!($logger, "Assessing prunability of network graph");
314 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
315 network_graph.remove_stale_channels_and_tracking();
317 if let Err(e) = $persister.persist_graph(network_graph) {
318 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
321 last_prune_call = Instant::now();
324 log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
328 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
329 if let Some(ref scorer) = $scorer {
330 log_trace!($logger, "Persisting scorer");
331 if let Err(e) = $persister.persist_scorer(&scorer) {
332 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
335 last_scorer_persist_call = Instant::now();
339 // After we exit, ensure we persist the ChannelManager one final time - this avoids
340 // some races where users quit while channel updates were in-flight, with
341 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
342 $persister.persist_manager(&*$channel_manager)?;
344 // Persist Scorer on exit
345 if let Some(ref scorer) = $scorer {
346 $persister.persist_scorer(&scorer)?;
349 // Persist NetworkGraph on exit
350 if let Some(network_graph) = $gossip_sync.network_graph() {
351 $persister.persist_graph(network_graph)?;
358 /// Processes background events in a future.
360 /// `sleeper` should return a future which completes in the given amount of time and returns a
361 /// boolean indicating whether the background processing should continue. Once `sleeper` returns a
362 /// future which outputs false, the loop will exit and this function's future will complete.
364 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
365 #[cfg(feature = "futures")]
366 pub async fn process_events_async<
368 Signer: 'static + Sign,
369 CA: 'static + Deref + Send + Sync,
370 CF: 'static + Deref + Send + Sync,
371 CW: 'static + Deref + Send + Sync,
372 T: 'static + Deref + Send + Sync,
373 K: 'static + Deref + Send + Sync,
374 F: 'static + Deref + Send + Sync,
375 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
376 L: 'static + Deref + Send + Sync,
377 P: 'static + Deref + Send + Sync,
378 Descriptor: 'static + SocketDescriptor + Send + Sync,
379 CMH: 'static + Deref + Send + Sync,
380 CUMH: 'static + Deref + Send + Sync,
381 RMH: 'static + Deref + Send + Sync,
382 OMH: 'static + Deref + Send + Sync,
383 EH: 'static + EventHandler + Send,
384 PS: 'static + Deref + Send,
385 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
386 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
387 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
388 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
389 UMH: 'static + Deref + Send + Sync,
390 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
391 S: 'static + Deref<Target = SC> + Send + Sync,
392 SC: WriteableScore<'a>,
393 SleepFuture: core::future::Future<Output = bool>,
394 Sleeper: Fn(Duration) -> SleepFuture
396 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
397 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
399 ) -> Result<(), std::io::Error>
401 CA::Target: 'static + chain::Access,
402 CF::Target: 'static + chain::Filter,
403 CW::Target: 'static + chain::Watch<Signer>,
404 T::Target: 'static + BroadcasterInterface,
405 K::Target: 'static + KeysInterface<Signer = Signer>,
406 F::Target: 'static + FeeEstimator,
407 L::Target: 'static + Logger,
408 P::Target: 'static + Persist<Signer>,
409 CMH::Target: 'static + ChannelMessageHandler,
410 OMH::Target: 'static + OnionMessageHandler,
411 RMH::Target: 'static + RoutingMessageHandler,
412 UMH::Target: 'static + CustomMessageHandler,
413 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
415 let mut should_continue = true;
416 define_run_body!(persister, event_handler, chain_monitor, channel_manager,
417 gossip_sync, peer_manager, logger, scorer, should_continue, {
419 _ = channel_manager.get_persistable_update_future().fuse() => true,
420 cont = sleeper(Duration::from_millis(100)).fuse() => {
421 should_continue = cont;
428 impl BackgroundProcessor {
429 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
432 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
433 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
434 /// either [`join`] or [`stop`].
436 /// # Data Persistence
438 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
439 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
440 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
441 /// provided implementation.
443 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
444 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
445 /// See the `lightning-persister` crate for LDK's provided implementation.
447 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
448 /// error or call [`join`] and handle any error that may arise. For the latter case,
449 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
453 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
454 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
455 /// functionality implemented by other handlers.
456 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
458 /// # Rapid Gossip Sync
460 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
461 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
462 /// until the [`RapidGossipSync`] instance completes its first sync.
464 /// [top-level documentation]: BackgroundProcessor
465 /// [`join`]: Self::join
466 /// [`stop`]: Self::stop
467 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
468 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
469 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
470 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
471 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
472 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
475 Signer: 'static + Sign,
476 CA: 'static + Deref + Send + Sync,
477 CF: 'static + Deref + Send + Sync,
478 CW: 'static + Deref + Send + Sync,
479 T: 'static + Deref + Send + Sync,
480 K: 'static + Deref + Send + Sync,
481 F: 'static + Deref + Send + Sync,
482 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
483 L: 'static + Deref + Send + Sync,
484 P: 'static + Deref + Send + Sync,
485 Descriptor: 'static + SocketDescriptor + Send + Sync,
486 CMH: 'static + Deref + Send + Sync,
487 OMH: 'static + Deref + Send + Sync,
488 RMH: 'static + Deref + Send + Sync,
489 EH: 'static + EventHandler + Send,
490 PS: 'static + Deref + Send,
491 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
492 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
493 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
494 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
495 UMH: 'static + Deref + Send + Sync,
496 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
497 S: 'static + Deref<Target = SC> + Send + Sync,
498 SC: WriteableScore<'a>,
500 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
501 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
504 CA::Target: 'static + chain::Access,
505 CF::Target: 'static + chain::Filter,
506 CW::Target: 'static + chain::Watch<Signer>,
507 T::Target: 'static + BroadcasterInterface,
508 K::Target: 'static + KeysInterface<Signer = Signer>,
509 F::Target: 'static + FeeEstimator,
510 L::Target: 'static + Logger,
511 P::Target: 'static + Persist<Signer>,
512 CMH::Target: 'static + ChannelMessageHandler,
513 OMH::Target: 'static + OnionMessageHandler,
514 RMH::Target: 'static + RoutingMessageHandler,
515 UMH::Target: 'static + CustomMessageHandler,
516 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
518 let stop_thread = Arc::new(AtomicBool::new(false));
519 let stop_thread_clone = stop_thread.clone();
520 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
521 define_run_body!(persister, event_handler, chain_monitor, channel_manager,
522 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
523 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
525 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
528 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
529 /// [`ChannelManager`].
533 /// This function panics if the background thread has panicked such as while persisting or
536 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
537 pub fn join(mut self) -> Result<(), std::io::Error> {
538 assert!(self.thread_handle.is_some());
542 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
543 /// [`ChannelManager`].
547 /// This function panics if the background thread has panicked such as while persisting or
550 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
551 pub fn stop(mut self) -> Result<(), std::io::Error> {
552 assert!(self.thread_handle.is_some());
553 self.stop_and_join_thread()
556 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
557 self.stop_thread.store(true, Ordering::Release);
561 fn join_thread(&mut self) -> Result<(), std::io::Error> {
562 match self.thread_handle.take() {
563 Some(handle) => handle.join().unwrap(),
569 impl Drop for BackgroundProcessor {
571 self.stop_and_join_thread().unwrap();
577 use bitcoin::blockdata::block::BlockHeader;
578 use bitcoin::blockdata::constants::genesis_block;
579 use bitcoin::blockdata::locktime::PackedLockTime;
580 use bitcoin::blockdata::transaction::{Transaction, TxOut};
581 use bitcoin::network::constants::Network;
582 use lightning::chain::{BestBlock, Confirm, chainmonitor};
583 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
584 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
585 use lightning::chain::transaction::OutPoint;
586 use lightning::get_event_msg;
587 use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
588 use lightning::ln::features::ChannelFeatures;
589 use lightning::ln::msgs::{ChannelMessageHandler, Init};
590 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
591 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
592 use lightning::util::config::UserConfig;
593 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
594 use lightning::util::ser::Writeable;
595 use lightning::util::test_utils;
596 use lightning::util::persist::KVStorePersister;
597 use lightning_invoice::payment::{InvoicePayer, Retry};
598 use lightning_invoice::utils::DefaultRouter;
599 use lightning_persister::FilesystemPersister;
601 use std::path::PathBuf;
602 use std::sync::{Arc, Mutex};
603 use std::sync::mpsc::SyncSender;
604 use std::time::Duration;
605 use bitcoin::hashes::Hash;
606 use bitcoin::TxMerkleNode;
607 use lightning::routing::scoring::{FixedPenaltyScorer};
608 use lightning_rapid_gossip_sync::RapidGossipSync;
609 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
611 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
613 #[derive(Clone, Eq, Hash, PartialEq)]
614 struct TestDescriptor{}
615 impl SocketDescriptor for TestDescriptor {
616 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
620 fn disconnect_socket(&mut self) {}
623 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
625 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
626 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
629 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
630 p2p_gossip_sync: PGS,
631 rapid_gossip_sync: RGS,
632 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
633 chain_monitor: Arc<ChainMonitor>,
634 persister: Arc<FilesystemPersister>,
635 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
636 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
637 logger: Arc<test_utils::TestLogger>,
638 best_block: BestBlock,
639 scorer: Arc<Mutex<FixedPenaltyScorer>>,
643 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
644 GossipSync::P2P(self.p2p_gossip_sync.clone())
647 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
648 GossipSync::Rapid(self.rapid_gossip_sync.clone())
651 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
658 let data_dir = self.persister.get_data_dir();
659 match fs::remove_dir_all(data_dir.clone()) {
660 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
667 graph_error: Option<(std::io::ErrorKind, &'static str)>,
668 graph_persistence_notifier: Option<SyncSender<()>>,
669 manager_error: Option<(std::io::ErrorKind, &'static str)>,
670 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
671 filesystem_persister: FilesystemPersister,
675 fn new(data_dir: String) -> Self {
676 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
677 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
680 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
681 Self { graph_error: Some((error, message)), ..self }
684 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
685 Self { graph_persistence_notifier: Some(sender), ..self }
688 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
689 Self { manager_error: Some((error, message)), ..self }
692 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
693 Self { scorer_error: Some((error, message)), ..self }
697 impl KVStorePersister for Persister {
698 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
699 if key == "manager" {
700 if let Some((error, message)) = self.manager_error {
701 return Err(std::io::Error::new(error, message))
705 if key == "network_graph" {
706 if let Some(sender) = &self.graph_persistence_notifier {
707 sender.send(()).unwrap();
710 if let Some((error, message)) = self.graph_error {
711 return Err(std::io::Error::new(error, message))
716 if let Some((error, message)) = self.scorer_error {
717 return Err(std::io::Error::new(error, message))
721 self.filesystem_persister.persist(key, object)
725 fn get_full_filepath(filepath: String, filename: String) -> String {
726 let mut path = PathBuf::from(filepath);
728 path.to_str().unwrap().to_string()
731 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
732 let mut nodes = Vec::new();
733 for i in 0..num_nodes {
734 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
735 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
736 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
737 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
738 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
739 let seed = [i as u8; 32];
740 let network = Network::Testnet;
741 let genesis_block = genesis_block(network);
742 let now = Duration::from_secs(genesis_block.header.time as u64);
743 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
744 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
745 let best_block = BestBlock::from_genesis(network);
746 let params = ChainParameters { network, best_block };
747 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
748 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
749 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
750 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
751 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
752 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
753 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
754 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
758 for i in 0..num_nodes {
759 for j in (i+1)..num_nodes {
760 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
761 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
768 macro_rules! open_channel {
769 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
770 begin_open_channel!($node_a, $node_b, $channel_value);
771 let events = $node_a.node.get_and_clear_pending_events();
772 assert_eq!(events.len(), 1);
773 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
774 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
779 macro_rules! begin_open_channel {
780 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
781 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
782 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
783 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
787 macro_rules! handle_funding_generation_ready {
788 ($event: expr, $channel_value: expr) => {{
790 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
791 assert_eq!(channel_value_satoshis, $channel_value);
792 assert_eq!(user_channel_id, 42);
794 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
795 value: channel_value_satoshis, script_pubkey: output_script.clone(),
797 (temporary_channel_id, tx)
799 _ => panic!("Unexpected event"),
804 macro_rules! end_open_channel {
805 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
806 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
807 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
808 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
812 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
814 let prev_blockhash = node.best_block.block_hash();
815 let height = node.best_block.height() + 1;
816 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
817 let txdata = vec![(0, tx)];
818 node.best_block = BestBlock::new(header.block_hash(), height);
821 node.node.transactions_confirmed(&header, &txdata, height);
822 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
825 node.node.best_block_updated(&header, height);
826 node.chain_monitor.best_block_updated(&header, height);
832 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
833 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
837 fn test_background_processor() {
838 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
839 // updates. Also test that when new updates are available, the manager signals that it needs
840 // re-persistence and is successfully re-persisted.
841 let nodes = create_nodes(2, "test_background_processor".to_string());
843 // Go through the channel creation process so that each node has something to persist. Since
844 // open_channel consumes events, it must complete before starting BackgroundProcessor to
845 // avoid a race with processing events.
846 let tx = open_channel!(nodes[0], nodes[1], 100000);
848 // Initiate the background processors to watch each node.
849 let data_dir = nodes[0].persister.get_data_dir();
850 let persister = Arc::new(Persister::new(data_dir));
851 let event_handler = |_: &_| {};
852 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
854 macro_rules! check_persisted_data {
855 ($node: expr, $filepath: expr) => {
856 let mut expected_bytes = Vec::new();
858 expected_bytes.clear();
859 match $node.write(&mut expected_bytes) {
861 match std::fs::read($filepath) {
863 if bytes == expected_bytes {
872 Err(e) => panic!("Unexpected error: {}", e)
878 // Check that the initial channel manager data is persisted as expected.
879 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
880 check_persisted_data!(nodes[0].node, filepath.clone());
883 if !nodes[0].node.get_persistence_condvar_value() { break }
886 // Force-close the channel.
887 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
889 // Check that the force-close updates are persisted.
890 check_persisted_data!(nodes[0].node, filepath.clone());
892 if !nodes[0].node.get_persistence_condvar_value() { break }
895 // Check network graph is persisted
896 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
897 check_persisted_data!(nodes[0].network_graph, filepath.clone());
899 // Check scorer is persisted
900 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
901 check_persisted_data!(nodes[0].scorer, filepath.clone());
903 assert!(bg_processor.stop().is_ok());
907 fn test_timer_tick_called() {
908 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
909 // `FRESHNESS_TIMER`.
910 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
911 let data_dir = nodes[0].persister.get_data_dir();
912 let persister = Arc::new(Persister::new(data_dir));
913 let event_handler = |_: &_| {};
914 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
916 let log_entries = nodes[0].logger.lines.lock().unwrap();
917 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
918 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
919 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
920 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
925 assert!(bg_processor.stop().is_ok());
929 fn test_channel_manager_persist_error() {
930 // Test that if we encounter an error during manager persistence, the thread panics.
931 let nodes = create_nodes(2, "test_persist_error".to_string());
932 open_channel!(nodes[0], nodes[1], 100000);
934 let data_dir = nodes[0].persister.get_data_dir();
935 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
936 let event_handler = |_: &_| {};
937 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
938 match bg_processor.join() {
939 Ok(_) => panic!("Expected error persisting manager"),
941 assert_eq!(e.kind(), std::io::ErrorKind::Other);
942 assert_eq!(e.get_ref().unwrap().to_string(), "test");
948 fn test_network_graph_persist_error() {
949 // Test that if we encounter an error during network graph persistence, an error gets returned.
950 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
951 let data_dir = nodes[0].persister.get_data_dir();
952 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
953 let event_handler = |_: &_| {};
954 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
956 match bg_processor.stop() {
957 Ok(_) => panic!("Expected error persisting network graph"),
959 assert_eq!(e.kind(), std::io::ErrorKind::Other);
960 assert_eq!(e.get_ref().unwrap().to_string(), "test");
966 fn test_scorer_persist_error() {
967 // Test that if we encounter an error during scorer persistence, an error gets returned.
968 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
969 let data_dir = nodes[0].persister.get_data_dir();
970 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
971 let event_handler = |_: &_| {};
972 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
974 match bg_processor.stop() {
975 Ok(_) => panic!("Expected error persisting scorer"),
977 assert_eq!(e.kind(), std::io::ErrorKind::Other);
978 assert_eq!(e.get_ref().unwrap().to_string(), "test");
984 fn test_background_event_handling() {
985 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
986 let channel_value = 100000;
987 let data_dir = nodes[0].persister.get_data_dir();
988 let persister = Arc::new(Persister::new(data_dir.clone()));
990 // Set up a background event handler for FundingGenerationReady events.
991 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
992 let event_handler = move |event: &Event| {
993 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
995 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
997 // Open a channel and check that the FundingGenerationReady event was handled.
998 begin_open_channel!(nodes[0], nodes[1], channel_value);
999 let (temporary_channel_id, funding_tx) = receiver
1000 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1001 .expect("FundingGenerationReady not handled within deadline");
1002 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1004 // Confirm the funding transaction.
1005 confirm_transaction(&mut nodes[0], &funding_tx);
1006 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1007 confirm_transaction(&mut nodes[1], &funding_tx);
1008 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1009 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1010 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1011 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1012 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1014 assert!(bg_processor.stop().is_ok());
1016 // Set up a background event handler for SpendableOutputs events.
1017 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1018 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
1019 let persister = Arc::new(Persister::new(data_dir));
1020 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1022 // Force close the channel and check that the SpendableOutputs event was handled.
1023 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1024 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1025 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1026 let event = receiver
1027 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1028 .expect("SpendableOutputs not handled within deadline");
1030 Event::SpendableOutputs { .. } => {},
1031 Event::ChannelClosed { .. } => {},
1032 _ => panic!("Unexpected event: {:?}", event),
1035 assert!(bg_processor.stop().is_ok());
1039 fn test_scorer_persistence() {
1040 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1041 let data_dir = nodes[0].persister.get_data_dir();
1042 let persister = Arc::new(Persister::new(data_dir));
1043 let event_handler = |_: &_| {};
1044 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1047 let log_entries = nodes[0].logger.lines.lock().unwrap();
1048 let expected_log = "Persisting scorer".to_string();
1049 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1054 assert!(bg_processor.stop().is_ok());
1058 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1059 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1060 let data_dir = nodes[0].persister.get_data_dir();
1061 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1062 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1063 let network_graph = nodes[0].network_graph.clone();
1064 let features = ChannelFeatures::empty();
1065 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1066 .expect("Failed to update channel from partial announcement");
1067 let original_graph_description = network_graph.to_string();
1068 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1069 assert_eq!(network_graph.read_only().channels().len(), 1);
1071 let event_handler = |_: &_| {};
1072 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1075 let log_entries = nodes[0].logger.lines.lock().unwrap();
1076 let expected_log_a = "Assessing prunability of network graph".to_string();
1077 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1078 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1079 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1084 let initialization_input = vec![
1085 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1086 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1087 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1088 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1089 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1090 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1091 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1092 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1093 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1094 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1095 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1096 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1097 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1099 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1101 // this should have added two channels
1102 assert_eq!(network_graph.read_only().channels().len(), 3);
1105 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1106 .expect("Network graph not pruned within deadline");
1108 background_processor.stop().unwrap();
1110 // all channels should now be pruned
1111 assert_eq!(network_graph.read_only().channels().len(), 0);
1115 fn test_invoice_payer() {
1116 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1117 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1118 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1120 // Initiate the background processors to watch each node.
1121 let data_dir = nodes[0].persister.get_data_dir();
1122 let persister = Arc::new(Persister::new(data_dir));
1123 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1124 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1125 let event_handler = Arc::clone(&invoice_payer);
1126 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1127 assert!(bg_processor.stop().is_ok());