1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 #[cfg(feature = "futures")]
38 use futures::{select, future::FutureExt};
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 /// writing it to disk/backups by invoking the callback given to it at startup.
46 /// [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 /// at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
50 /// is provided to [`BackgroundProcessor::start`]).
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66 stop_thread: Arc<AtomicBool>,
67 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
71 const FRESHNESS_TIMER: u64 = 60;
73 const FRESHNESS_TIMER: u64 = 1;
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
83 const PING_TIMER: u64 = 1;
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
89 const SCORER_PERSIST_TIMER: u64 = 30;
91 const SCORER_PERSIST_TIMER: u64 = 1;
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 P: Deref<Target = P2PGossipSync<G, A, L>>,
101 R: Deref<Target = RapidGossipSync<G, L>>,
102 G: Deref<Target = NetworkGraph<L>>,
106 where A::Target: chain::Access, L::Target: Logger {
107 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109 /// Rapid gossip sync from a trusted server.
116 P: Deref<Target = P2PGossipSync<G, A, L>>,
117 R: Deref<Target = RapidGossipSync<G, L>>,
118 G: Deref<Target = NetworkGraph<L>>,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123 fn network_graph(&self) -> Option<&G> {
125 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::None => None,
131 fn prunable_network_graph(&self) -> Option<&G> {
133 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134 GossipSync::Rapid(gossip_sync) => {
135 if gossip_sync.is_initial_sync_complete() {
136 Some(gossip_sync.network_graph())
141 GossipSync::None => None,
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 A::Target: chain::Access,
153 /// Initializes a new [`GossipSync::P2P`] variant.
154 pub fn p2p(gossip_sync: P) -> Self {
155 GossipSync::P2P(gossip_sync)
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
165 &'a (dyn chain::Access + Send + Sync),
171 /// Initializes a new [`GossipSync::Rapid`] variant.
172 pub fn rapid(gossip_sync: R) -> Self {
173 GossipSync::Rapid(gossip_sync)
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
180 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183 &'a (dyn chain::Access + Send + Sync),
189 /// Initializes a new [`GossipSync::None`] variant.
190 pub fn none() -> Self {
195 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
196 struct DecoratingEventHandler<
199 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
200 RGS: Deref<Target = RapidGossipSync<G, L>>,
201 G: Deref<Target = NetworkGraph<L>>,
205 where A::Target: chain::Access, L::Target: Logger {
207 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
213 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
214 RGS: Deref<Target = RapidGossipSync<G, L>>,
215 G: Deref<Target = NetworkGraph<L>>,
218 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
219 where A::Target: chain::Access, L::Target: Logger {
220 fn handle_event(&self, event: &Event) {
221 if let Some(network_graph) = self.gossip_sync.network_graph() {
222 network_graph.handle_event(event);
224 self.event_handler.handle_event(event);
228 macro_rules! define_run_body {
229 ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
230 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231 $loop_exit_check: expr, $await: expr)
233 let event_handler = DecoratingEventHandler {
234 event_handler: $event_handler,
235 gossip_sync: &$gossip_sync,
238 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
239 $channel_manager.timer_tick_occurred();
241 let mut last_freshness_call = Instant::now();
242 let mut last_ping_call = Instant::now();
243 let mut last_prune_call = Instant::now();
244 let mut last_scorer_persist_call = Instant::now();
245 let mut have_pruned = false;
248 $channel_manager.process_pending_events(&event_handler);
249 $chain_monitor.process_pending_events(&event_handler);
251 // Note that the PeerManager::process_events may block on ChannelManager's locks,
252 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
253 // we want to ensure we get into `persist_manager` as quickly as we can, especially
254 // without running the normal event processing above and handing events to users.
256 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
257 // processing a message effectively at any point during this loop. In order to
258 // minimize the time between such processing completing and persisting the updated
259 // ChannelManager, we want to minimize methods blocking on a ChannelManager
260 // generally, and as a fallback place such blocking only immediately before
262 $peer_manager.process_events();
264 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
265 // see `await_start`'s use below.
266 let await_start = Instant::now();
267 let updates_available = $await;
268 let await_time = await_start.elapsed();
270 if updates_available {
271 log_trace!($logger, "Persisting ChannelManager...");
272 $persister.persist_manager(&*$channel_manager)?;
273 log_trace!($logger, "Done persisting ChannelManager.");
275 // Exit the loop if the background processor was requested to stop.
276 if $loop_exit_check {
277 log_trace!($logger, "Terminating background processor.");
280 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
281 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
282 $channel_manager.timer_tick_occurred();
283 last_freshness_call = Instant::now();
285 if await_time > Duration::from_secs(1) {
286 // On various platforms, we may be starved of CPU cycles for several reasons.
287 // E.g. on iOS, if we've been in the background, we will be entirely paused.
288 // Similarly, if we're on a desktop platform and the device has been asleep, we
289 // may not get any cycles.
290 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
291 // full second, at which point we assume sockets may have been killed (they
292 // appear to be at least on some platforms, even if it has only been a second).
293 // Note that we have to take care to not get here just because user event
294 // processing was slow at the top of the loop. For example, the sample client
295 // may call Bitcoin Core RPCs during event handling, which very often takes
296 // more than a handful of seconds to complete, and shouldn't disconnect all our
298 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
299 $peer_manager.disconnect_all_peers();
300 last_ping_call = Instant::now();
301 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
302 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
303 $peer_manager.timer_tick_occurred();
304 last_ping_call = Instant::now();
307 // Note that we want to run a graph prune once not long after startup before
308 // falling back to our usual hourly prunes. This avoids short-lived clients never
309 // pruning their network graph. We run once 60 seconds after startup before
310 // continuing our normal cadence.
311 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
312 // The network graph must not be pruned while rapid sync completion is pending
313 log_trace!($logger, "Assessing prunability of network graph");
314 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
315 network_graph.remove_stale_channels();
317 if let Err(e) = $persister.persist_graph(network_graph) {
318 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
321 last_prune_call = Instant::now();
324 log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
328 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
329 if let Some(ref scorer) = $scorer {
330 log_trace!($logger, "Persisting scorer");
331 if let Err(e) = $persister.persist_scorer(&scorer) {
332 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
335 last_scorer_persist_call = Instant::now();
339 // After we exit, ensure we persist the ChannelManager one final time - this avoids
340 // some races where users quit while channel updates were in-flight, with
341 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
342 $persister.persist_manager(&*$channel_manager)?;
344 // Persist Scorer on exit
345 if let Some(ref scorer) = $scorer {
346 $persister.persist_scorer(&scorer)?;
349 // Persist NetworkGraph on exit
350 if let Some(network_graph) = $gossip_sync.network_graph() {
351 $persister.persist_graph(network_graph)?;
358 /// Processes background events in a future.
360 /// `sleeper` should return a future which completes in the given amount of time and returns a
361 /// boolean indicating whether the background processing should continue. Once `sleeper` returns a
362 /// future which outputs false, the loop will exit and this function's future will complete.
364 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
365 #[cfg(feature = "futures")]
366 pub async fn process_events_async<
368 Signer: 'static + Sign,
369 CA: 'static + Deref + Send + Sync,
370 CF: 'static + Deref + Send + Sync,
371 CW: 'static + Deref + Send + Sync,
372 T: 'static + Deref + Send + Sync,
373 K: 'static + Deref + Send + Sync,
374 F: 'static + Deref + Send + Sync,
375 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
376 L: 'static + Deref + Send + Sync,
377 P: 'static + Deref + Send + Sync,
378 Descriptor: 'static + SocketDescriptor + Send + Sync,
379 CMH: 'static + Deref + Send + Sync,
380 RMH: 'static + Deref + Send + Sync,
381 EH: 'static + EventHandler + Send,
382 PS: 'static + Deref + Send,
383 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
384 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
385 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
386 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
387 UMH: 'static + Deref + Send + Sync,
388 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
389 S: 'static + Deref<Target = SC> + Send + Sync,
390 SC: WriteableScore<'a>,
391 SleepFuture: core::future::Future<Output = bool>,
392 Sleeper: Fn(Duration) -> SleepFuture
394 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
395 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
397 ) -> Result<(), std::io::Error>
399 CA::Target: 'static + chain::Access,
400 CF::Target: 'static + chain::Filter,
401 CW::Target: 'static + chain::Watch<Signer>,
402 T::Target: 'static + BroadcasterInterface,
403 K::Target: 'static + KeysInterface<Signer = Signer>,
404 F::Target: 'static + FeeEstimator,
405 L::Target: 'static + Logger,
406 P::Target: 'static + Persist<Signer>,
407 CMH::Target: 'static + ChannelMessageHandler,
408 RMH::Target: 'static + RoutingMessageHandler,
409 UMH::Target: 'static + CustomMessageHandler,
410 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
412 let mut should_continue = true;
413 define_run_body!(persister, event_handler, chain_monitor, channel_manager,
414 gossip_sync, peer_manager, logger, scorer, should_continue, {
416 _ = channel_manager.get_persistable_update_future().fuse() => true,
417 cont = sleeper(Duration::from_millis(100)).fuse() => {
418 should_continue = cont;
425 impl BackgroundProcessor {
426 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
429 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
430 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
431 /// either [`join`] or [`stop`].
433 /// # Data Persistence
435 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
436 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
437 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
438 /// provided implementation.
440 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
441 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
442 /// See the `lightning-persister` crate for LDK's provided implementation.
444 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
445 /// error or call [`join`] and handle any error that may arise. For the latter case,
446 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
450 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
451 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
452 /// functionality implemented by other handlers.
453 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
455 /// # Rapid Gossip Sync
457 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
458 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
459 /// until the [`RapidGossipSync`] instance completes its first sync.
461 /// [top-level documentation]: BackgroundProcessor
462 /// [`join`]: Self::join
463 /// [`stop`]: Self::stop
464 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
465 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
466 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
467 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
468 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
469 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
472 Signer: 'static + Sign,
473 CA: 'static + Deref + Send + Sync,
474 CF: 'static + Deref + Send + Sync,
475 CW: 'static + Deref + Send + Sync,
476 T: 'static + Deref + Send + Sync,
477 K: 'static + Deref + Send + Sync,
478 F: 'static + Deref + Send + Sync,
479 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
480 L: 'static + Deref + Send + Sync,
481 P: 'static + Deref + Send + Sync,
482 Descriptor: 'static + SocketDescriptor + Send + Sync,
483 CMH: 'static + Deref + Send + Sync,
484 OMH: 'static + Deref + Send + Sync,
485 RMH: 'static + Deref + Send + Sync,
486 EH: 'static + EventHandler + Send,
487 PS: 'static + Deref + Send,
488 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
489 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
490 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
491 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
492 UMH: 'static + Deref + Send + Sync,
493 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
494 S: 'static + Deref<Target = SC> + Send + Sync,
495 SC: WriteableScore<'a>,
497 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
498 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
501 CA::Target: 'static + chain::Access,
502 CF::Target: 'static + chain::Filter,
503 CW::Target: 'static + chain::Watch<Signer>,
504 T::Target: 'static + BroadcasterInterface,
505 K::Target: 'static + KeysInterface<Signer = Signer>,
506 F::Target: 'static + FeeEstimator,
507 L::Target: 'static + Logger,
508 P::Target: 'static + Persist<Signer>,
509 CMH::Target: 'static + ChannelMessageHandler,
510 OMH::Target: 'static + OnionMessageHandler,
511 RMH::Target: 'static + RoutingMessageHandler,
512 UMH::Target: 'static + CustomMessageHandler,
513 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
515 let stop_thread = Arc::new(AtomicBool::new(false));
516 let stop_thread_clone = stop_thread.clone();
517 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
518 define_run_body!(persister, event_handler, chain_monitor, channel_manager,
519 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
520 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
522 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
525 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
526 /// [`ChannelManager`].
530 /// This function panics if the background thread has panicked such as while persisting or
533 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
534 pub fn join(mut self) -> Result<(), std::io::Error> {
535 assert!(self.thread_handle.is_some());
539 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
540 /// [`ChannelManager`].
544 /// This function panics if the background thread has panicked such as while persisting or
547 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
548 pub fn stop(mut self) -> Result<(), std::io::Error> {
549 assert!(self.thread_handle.is_some());
550 self.stop_and_join_thread()
553 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
554 self.stop_thread.store(true, Ordering::Release);
558 fn join_thread(&mut self) -> Result<(), std::io::Error> {
559 match self.thread_handle.take() {
560 Some(handle) => handle.join().unwrap(),
566 impl Drop for BackgroundProcessor {
568 self.stop_and_join_thread().unwrap();
574 use bitcoin::blockdata::block::BlockHeader;
575 use bitcoin::blockdata::constants::genesis_block;
576 use bitcoin::blockdata::locktime::PackedLockTime;
577 use bitcoin::blockdata::transaction::{Transaction, TxOut};
578 use bitcoin::network::constants::Network;
579 use lightning::chain::{BestBlock, Confirm, chainmonitor};
580 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
581 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
582 use lightning::chain::transaction::OutPoint;
583 use lightning::get_event_msg;
584 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
585 use lightning::ln::features::{ChannelFeatures, InitFeatures};
586 use lightning::ln::msgs::{ChannelMessageHandler, Init};
587 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
588 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
589 use lightning::util::config::UserConfig;
590 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
591 use lightning::util::ser::Writeable;
592 use lightning::util::test_utils;
593 use lightning::util::persist::KVStorePersister;
594 use lightning_invoice::payment::{InvoicePayer, Retry};
595 use lightning_invoice::utils::DefaultRouter;
596 use lightning_persister::FilesystemPersister;
598 use std::path::PathBuf;
599 use std::sync::{Arc, Mutex};
600 use std::sync::mpsc::SyncSender;
601 use std::time::Duration;
602 use bitcoin::hashes::Hash;
603 use bitcoin::TxMerkleNode;
604 use lightning::routing::scoring::{FixedPenaltyScorer};
605 use lightning_rapid_gossip_sync::RapidGossipSync;
606 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
608 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
610 #[derive(Clone, Eq, Hash, PartialEq)]
611 struct TestDescriptor{}
612 impl SocketDescriptor for TestDescriptor {
613 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
617 fn disconnect_socket(&mut self) {}
620 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
622 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
623 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
626 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
627 p2p_gossip_sync: PGS,
628 rapid_gossip_sync: RGS,
629 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
630 chain_monitor: Arc<ChainMonitor>,
631 persister: Arc<FilesystemPersister>,
632 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
633 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
634 logger: Arc<test_utils::TestLogger>,
635 best_block: BestBlock,
636 scorer: Arc<Mutex<FixedPenaltyScorer>>,
640 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
641 GossipSync::P2P(self.p2p_gossip_sync.clone())
644 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
645 GossipSync::Rapid(self.rapid_gossip_sync.clone())
648 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
655 let data_dir = self.persister.get_data_dir();
656 match fs::remove_dir_all(data_dir.clone()) {
657 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
664 graph_error: Option<(std::io::ErrorKind, &'static str)>,
665 graph_persistence_notifier: Option<SyncSender<()>>,
666 manager_error: Option<(std::io::ErrorKind, &'static str)>,
667 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
668 filesystem_persister: FilesystemPersister,
672 fn new(data_dir: String) -> Self {
673 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
674 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
677 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
678 Self { graph_error: Some((error, message)), ..self }
681 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
682 Self { graph_persistence_notifier: Some(sender), ..self }
685 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
686 Self { manager_error: Some((error, message)), ..self }
689 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
690 Self { scorer_error: Some((error, message)), ..self }
694 impl KVStorePersister for Persister {
695 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
696 if key == "manager" {
697 if let Some((error, message)) = self.manager_error {
698 return Err(std::io::Error::new(error, message))
702 if key == "network_graph" {
703 if let Some(sender) = &self.graph_persistence_notifier {
704 sender.send(()).unwrap();
707 if let Some((error, message)) = self.graph_error {
708 return Err(std::io::Error::new(error, message))
713 if let Some((error, message)) = self.scorer_error {
714 return Err(std::io::Error::new(error, message))
718 self.filesystem_persister.persist(key, object)
722 fn get_full_filepath(filepath: String, filename: String) -> String {
723 let mut path = PathBuf::from(filepath);
725 path.to_str().unwrap().to_string()
728 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
729 let mut nodes = Vec::new();
730 for i in 0..num_nodes {
731 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
732 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
733 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
734 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
735 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
736 let seed = [i as u8; 32];
737 let network = Network::Testnet;
738 let genesis_block = genesis_block(network);
739 let now = Duration::from_secs(genesis_block.header.time as u64);
740 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
741 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
742 let best_block = BestBlock::from_genesis(network);
743 let params = ChainParameters { network, best_block };
744 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
745 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
746 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
747 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
748 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
749 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
750 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
751 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
755 for i in 0..num_nodes {
756 for j in (i+1)..num_nodes {
757 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
758 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
765 macro_rules! open_channel {
766 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
767 begin_open_channel!($node_a, $node_b, $channel_value);
768 let events = $node_a.node.get_and_clear_pending_events();
769 assert_eq!(events.len(), 1);
770 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
771 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
776 macro_rules! begin_open_channel {
777 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
778 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
779 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
780 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
784 macro_rules! handle_funding_generation_ready {
785 ($event: expr, $channel_value: expr) => {{
787 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
788 assert_eq!(channel_value_satoshis, $channel_value);
789 assert_eq!(user_channel_id, 42);
791 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
792 value: channel_value_satoshis, script_pubkey: output_script.clone(),
794 (temporary_channel_id, tx)
796 _ => panic!("Unexpected event"),
801 macro_rules! end_open_channel {
802 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
803 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
804 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
805 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
809 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
811 let prev_blockhash = node.best_block.block_hash();
812 let height = node.best_block.height() + 1;
813 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
814 let txdata = vec![(0, tx)];
815 node.best_block = BestBlock::new(header.block_hash(), height);
818 node.node.transactions_confirmed(&header, &txdata, height);
819 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
822 node.node.best_block_updated(&header, height);
823 node.chain_monitor.best_block_updated(&header, height);
829 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
830 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
834 fn test_background_processor() {
835 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
836 // updates. Also test that when new updates are available, the manager signals that it needs
837 // re-persistence and is successfully re-persisted.
838 let nodes = create_nodes(2, "test_background_processor".to_string());
840 // Go through the channel creation process so that each node has something to persist. Since
841 // open_channel consumes events, it must complete before starting BackgroundProcessor to
842 // avoid a race with processing events.
843 let tx = open_channel!(nodes[0], nodes[1], 100000);
845 // Initiate the background processors to watch each node.
846 let data_dir = nodes[0].persister.get_data_dir();
847 let persister = Arc::new(Persister::new(data_dir));
848 let event_handler = |_: &_| {};
849 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
851 macro_rules! check_persisted_data {
852 ($node: expr, $filepath: expr) => {
853 let mut expected_bytes = Vec::new();
855 expected_bytes.clear();
856 match $node.write(&mut expected_bytes) {
858 match std::fs::read($filepath) {
860 if bytes == expected_bytes {
869 Err(e) => panic!("Unexpected error: {}", e)
875 // Check that the initial channel manager data is persisted as expected.
876 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
877 check_persisted_data!(nodes[0].node, filepath.clone());
880 if !nodes[0].node.get_persistence_condvar_value() { break }
883 // Force-close the channel.
884 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
886 // Check that the force-close updates are persisted.
887 check_persisted_data!(nodes[0].node, filepath.clone());
889 if !nodes[0].node.get_persistence_condvar_value() { break }
892 // Check network graph is persisted
893 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
894 check_persisted_data!(nodes[0].network_graph, filepath.clone());
896 // Check scorer is persisted
897 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
898 check_persisted_data!(nodes[0].scorer, filepath.clone());
900 assert!(bg_processor.stop().is_ok());
904 fn test_timer_tick_called() {
905 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
906 // `FRESHNESS_TIMER`.
907 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
908 let data_dir = nodes[0].persister.get_data_dir();
909 let persister = Arc::new(Persister::new(data_dir));
910 let event_handler = |_: &_| {};
911 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
913 let log_entries = nodes[0].logger.lines.lock().unwrap();
914 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
915 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
916 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
917 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
922 assert!(bg_processor.stop().is_ok());
926 fn test_channel_manager_persist_error() {
927 // Test that if we encounter an error during manager persistence, the thread panics.
928 let nodes = create_nodes(2, "test_persist_error".to_string());
929 open_channel!(nodes[0], nodes[1], 100000);
931 let data_dir = nodes[0].persister.get_data_dir();
932 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
933 let event_handler = |_: &_| {};
934 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
935 match bg_processor.join() {
936 Ok(_) => panic!("Expected error persisting manager"),
938 assert_eq!(e.kind(), std::io::ErrorKind::Other);
939 assert_eq!(e.get_ref().unwrap().to_string(), "test");
945 fn test_network_graph_persist_error() {
946 // Test that if we encounter an error during network graph persistence, an error gets returned.
947 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
948 let data_dir = nodes[0].persister.get_data_dir();
949 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
950 let event_handler = |_: &_| {};
951 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
953 match bg_processor.stop() {
954 Ok(_) => panic!("Expected error persisting network graph"),
956 assert_eq!(e.kind(), std::io::ErrorKind::Other);
957 assert_eq!(e.get_ref().unwrap().to_string(), "test");
963 fn test_scorer_persist_error() {
964 // Test that if we encounter an error during scorer persistence, an error gets returned.
965 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
966 let data_dir = nodes[0].persister.get_data_dir();
967 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
968 let event_handler = |_: &_| {};
969 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
971 match bg_processor.stop() {
972 Ok(_) => panic!("Expected error persisting scorer"),
974 assert_eq!(e.kind(), std::io::ErrorKind::Other);
975 assert_eq!(e.get_ref().unwrap().to_string(), "test");
981 fn test_background_event_handling() {
982 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
983 let channel_value = 100000;
984 let data_dir = nodes[0].persister.get_data_dir();
985 let persister = Arc::new(Persister::new(data_dir.clone()));
987 // Set up a background event handler for FundingGenerationReady events.
988 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
989 let event_handler = move |event: &Event| {
990 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
992 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
994 // Open a channel and check that the FundingGenerationReady event was handled.
995 begin_open_channel!(nodes[0], nodes[1], channel_value);
996 let (temporary_channel_id, funding_tx) = receiver
997 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
998 .expect("FundingGenerationReady not handled within deadline");
999 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1001 // Confirm the funding transaction.
1002 confirm_transaction(&mut nodes[0], &funding_tx);
1003 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1004 confirm_transaction(&mut nodes[1], &funding_tx);
1005 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1006 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1007 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1008 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1009 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1011 assert!(bg_processor.stop().is_ok());
1013 // Set up a background event handler for SpendableOutputs events.
1014 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1015 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
1016 let persister = Arc::new(Persister::new(data_dir));
1017 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1019 // Force close the channel and check that the SpendableOutputs event was handled.
1020 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1021 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1022 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1023 let event = receiver
1024 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1025 .expect("SpendableOutputs not handled within deadline");
1027 Event::SpendableOutputs { .. } => {},
1028 Event::ChannelClosed { .. } => {},
1029 _ => panic!("Unexpected event: {:?}", event),
1032 assert!(bg_processor.stop().is_ok());
1036 fn test_scorer_persistence() {
1037 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1038 let data_dir = nodes[0].persister.get_data_dir();
1039 let persister = Arc::new(Persister::new(data_dir));
1040 let event_handler = |_: &_| {};
1041 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1044 let log_entries = nodes[0].logger.lines.lock().unwrap();
1045 let expected_log = "Persisting scorer".to_string();
1046 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1051 assert!(bg_processor.stop().is_ok());
1055 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1056 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1057 let data_dir = nodes[0].persister.get_data_dir();
1058 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1059 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1060 let network_graph = nodes[0].network_graph.clone();
1061 let features = ChannelFeatures::empty();
1062 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1063 .expect("Failed to update channel from partial announcement");
1064 let original_graph_description = network_graph.to_string();
1065 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1066 assert_eq!(network_graph.read_only().channels().len(), 1);
1068 let event_handler = |_: &_| {};
1069 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1072 let log_entries = nodes[0].logger.lines.lock().unwrap();
1073 let expected_log_a = "Assessing prunability of network graph".to_string();
1074 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1075 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1076 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1081 let initialization_input = vec![
1082 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1083 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1084 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1085 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1086 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1087 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1088 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1089 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1090 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1091 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1092 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1093 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1094 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1096 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1098 // this should have added two channels
1099 assert_eq!(network_graph.read_only().channels().len(), 3);
1102 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1103 .expect("Network graph not pruned within deadline");
1105 background_processor.stop().unwrap();
1107 // all channels should now be pruned
1108 assert_eq!(network_graph.read_only().channels().len(), 0);
1112 fn test_invoice_payer() {
1113 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1114 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1115 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1117 // Initiate the background processors to watch each node.
1118 let data_dir = nodes[0].persister.get_data_dir();
1119 let persister = Arc::new(Persister::new(data_dir));
1120 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1121 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1122 let event_handler = Arc::clone(&invoice_payer);
1123 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1124 assert!(bg_processor.stop().is_ok());