1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 /// writing it to disk/backups by invoking the callback given to it at startup.
46 /// [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 /// at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66 stop_thread: Arc<AtomicBool>,
67 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
71 const FRESHNESS_TIMER: u64 = 60;
73 const FRESHNESS_TIMER: u64 = 1;
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
83 const PING_TIMER: u64 = 1;
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
89 const SCORER_PERSIST_TIMER: u64 = 30;
91 const SCORER_PERSIST_TIMER: u64 = 1;
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 P: Deref<Target = P2PGossipSync<G, A, L>>,
101 R: Deref<Target = RapidGossipSync<G, L>>,
102 G: Deref<Target = NetworkGraph<L>>,
106 where A::Target: chain::Access, L::Target: Logger {
107 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109 /// Rapid gossip sync from a trusted server.
116 P: Deref<Target = P2PGossipSync<G, A, L>>,
117 R: Deref<Target = RapidGossipSync<G, L>>,
118 G: Deref<Target = NetworkGraph<L>>,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123 fn network_graph(&self) -> Option<&G> {
125 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::None => None,
131 fn prunable_network_graph(&self) -> Option<&G> {
133 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134 GossipSync::Rapid(gossip_sync) => {
135 if gossip_sync.is_initial_sync_complete() {
136 Some(gossip_sync.network_graph())
141 GossipSync::None => None,
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 A::Target: chain::Access,
153 /// Initializes a new [`GossipSync::P2P`] variant.
154 pub fn p2p(gossip_sync: P) -> Self {
155 GossipSync::P2P(gossip_sync)
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
165 &'a (dyn chain::Access + Send + Sync),
171 /// Initializes a new [`GossipSync::Rapid`] variant.
172 pub fn rapid(gossip_sync: R) -> Self {
173 GossipSync::Rapid(gossip_sync)
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
180 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183 &'a (dyn chain::Access + Send + Sync),
189 /// Initializes a new [`GossipSync::None`] variant.
190 pub fn none() -> Self {
195 fn handle_network_graph_update<L: Deref>(
196 network_graph: &NetworkGraph<L>, event: &Event
197 ) where L::Target: Logger {
198 if let Event::PaymentPathFailed { ref network_update, .. } = event {
199 if let Some(network_update) = network_update {
200 network_graph.handle_network_update(&network_update);
205 macro_rules! define_run_body {
206 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
207 $channel_manager: ident, $process_channel_manager_events: expr,
208 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
209 $loop_exit_check: expr, $await: expr)
211 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
212 $channel_manager.timer_tick_occurred();
214 let mut last_freshness_call = Instant::now();
215 let mut last_ping_call = Instant::now();
216 let mut last_prune_call = Instant::now();
217 let mut last_scorer_persist_call = Instant::now();
218 let mut have_pruned = false;
221 $process_channel_manager_events;
222 $process_chain_monitor_events;
224 // Note that the PeerManager::process_events may block on ChannelManager's locks,
225 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
226 // we want to ensure we get into `persist_manager` as quickly as we can, especially
227 // without running the normal event processing above and handing events to users.
229 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
230 // processing a message effectively at any point during this loop. In order to
231 // minimize the time between such processing completing and persisting the updated
232 // ChannelManager, we want to minimize methods blocking on a ChannelManager
233 // generally, and as a fallback place such blocking only immediately before
235 $peer_manager.process_events();
237 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
238 // see `await_start`'s use below.
239 let await_start = Instant::now();
240 let updates_available = $await;
241 let await_time = await_start.elapsed();
243 if updates_available {
244 log_trace!($logger, "Persisting ChannelManager...");
245 $persister.persist_manager(&*$channel_manager)?;
246 log_trace!($logger, "Done persisting ChannelManager.");
248 // Exit the loop if the background processor was requested to stop.
249 if $loop_exit_check {
250 log_trace!($logger, "Terminating background processor.");
253 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
254 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
255 $channel_manager.timer_tick_occurred();
256 last_freshness_call = Instant::now();
258 if await_time > Duration::from_secs(1) {
259 // On various platforms, we may be starved of CPU cycles for several reasons.
260 // E.g. on iOS, if we've been in the background, we will be entirely paused.
261 // Similarly, if we're on a desktop platform and the device has been asleep, we
262 // may not get any cycles.
263 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
264 // full second, at which point we assume sockets may have been killed (they
265 // appear to be at least on some platforms, even if it has only been a second).
266 // Note that we have to take care to not get here just because user event
267 // processing was slow at the top of the loop. For example, the sample client
268 // may call Bitcoin Core RPCs during event handling, which very often takes
269 // more than a handful of seconds to complete, and shouldn't disconnect all our
271 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
272 $peer_manager.disconnect_all_peers();
273 last_ping_call = Instant::now();
274 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
275 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
276 $peer_manager.timer_tick_occurred();
277 last_ping_call = Instant::now();
280 // Note that we want to run a graph prune once not long after startup before
281 // falling back to our usual hourly prunes. This avoids short-lived clients never
282 // pruning their network graph. We run once 60 seconds after startup before
283 // continuing our normal cadence.
284 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
285 // The network graph must not be pruned while rapid sync completion is pending
286 log_trace!($logger, "Assessing prunability of network graph");
287 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
288 network_graph.remove_stale_channels_and_tracking();
290 if let Err(e) = $persister.persist_graph(network_graph) {
291 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
294 last_prune_call = Instant::now();
297 log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
301 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
302 if let Some(ref scorer) = $scorer {
303 log_trace!($logger, "Persisting scorer");
304 if let Err(e) = $persister.persist_scorer(&scorer) {
305 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
308 last_scorer_persist_call = Instant::now();
312 // After we exit, ensure we persist the ChannelManager one final time - this avoids
313 // some races where users quit while channel updates were in-flight, with
314 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
315 $persister.persist_manager(&*$channel_manager)?;
317 // Persist Scorer on exit
318 if let Some(ref scorer) = $scorer {
319 $persister.persist_scorer(&scorer)?;
322 // Persist NetworkGraph on exit
323 if let Some(network_graph) = $gossip_sync.network_graph() {
324 $persister.persist_graph(network_graph)?;
331 /// Processes background events in a future.
333 /// `sleeper` should return a future which completes in the given amount of time and returns a
334 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
335 /// future which outputs true, the loop will exit and this function's future will complete.
337 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
338 #[cfg(feature = "futures")]
339 pub async fn process_events_async<
341 Signer: 'static + Sign,
342 CA: 'static + Deref + Send + Sync,
343 CF: 'static + Deref + Send + Sync,
344 CW: 'static + Deref + Send + Sync,
345 T: 'static + Deref + Send + Sync,
346 K: 'static + Deref + Send + Sync,
347 F: 'static + Deref + Send + Sync,
348 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
349 L: 'static + Deref + Send + Sync,
350 P: 'static + Deref + Send + Sync,
351 Descriptor: 'static + SocketDescriptor + Send + Sync,
352 CMH: 'static + Deref + Send + Sync,
353 RMH: 'static + Deref + Send + Sync,
354 OMH: 'static + Deref + Send + Sync,
355 EventHandlerFuture: core::future::Future<Output = ()>,
356 EventHandler: Fn(Event) -> EventHandlerFuture,
357 PS: 'static + Deref + Send,
358 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
359 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
360 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
361 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
362 UMH: 'static + Deref + Send + Sync,
363 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
364 S: 'static + Deref<Target = SC> + Send + Sync,
365 SC: WriteableScore<'a>,
366 SleepFuture: core::future::Future<Output = bool>,
367 Sleeper: Fn(Duration) -> SleepFuture
369 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
370 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
372 ) -> Result<(), std::io::Error>
374 CA::Target: 'static + chain::Access,
375 CF::Target: 'static + chain::Filter,
376 CW::Target: 'static + chain::Watch<Signer>,
377 T::Target: 'static + BroadcasterInterface,
378 K::Target: 'static + KeysInterface<Signer = Signer>,
379 F::Target: 'static + FeeEstimator,
380 L::Target: 'static + Logger,
381 P::Target: 'static + Persist<Signer>,
382 CMH::Target: 'static + ChannelMessageHandler,
383 OMH::Target: 'static + OnionMessageHandler,
384 RMH::Target: 'static + RoutingMessageHandler,
385 UMH::Target: 'static + CustomMessageHandler,
386 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
388 let mut should_break = true;
389 let async_event_handler = |event| {
390 let network_graph = gossip_sync.network_graph();
391 let event_handler = &event_handler;
393 if let Some(network_graph) = network_graph {
394 handle_network_graph_update(network_graph, &event)
396 event_handler(event).await;
399 define_run_body!(persister,
400 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
401 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
402 gossip_sync, peer_manager, logger, scorer, should_break, {
404 _ = channel_manager.get_persistable_update_future().fuse() => true,
405 exit = sleeper(Duration::from_millis(100)).fuse() => {
413 impl BackgroundProcessor {
414 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
417 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
418 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
419 /// either [`join`] or [`stop`].
421 /// # Data Persistence
423 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
424 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
425 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
426 /// provided implementation.
428 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
429 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
430 /// See the `lightning-persister` crate for LDK's provided implementation.
432 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
433 /// error or call [`join`] and handle any error that may arise. For the latter case,
434 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
438 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
439 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
440 /// functionality implemented by other handlers.
441 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
443 /// # Rapid Gossip Sync
445 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
446 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
447 /// until the [`RapidGossipSync`] instance completes its first sync.
449 /// [top-level documentation]: BackgroundProcessor
450 /// [`join`]: Self::join
451 /// [`stop`]: Self::stop
452 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
453 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
454 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
455 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
456 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
457 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
460 Signer: 'static + Sign,
461 CA: 'static + Deref + Send + Sync,
462 CF: 'static + Deref + Send + Sync,
463 CW: 'static + Deref + Send + Sync,
464 T: 'static + Deref + Send + Sync,
465 K: 'static + Deref + Send + Sync,
466 F: 'static + Deref + Send + Sync,
467 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
468 L: 'static + Deref + Send + Sync,
469 P: 'static + Deref + Send + Sync,
470 Descriptor: 'static + SocketDescriptor + Send + Sync,
471 CMH: 'static + Deref + Send + Sync,
472 OMH: 'static + Deref + Send + Sync,
473 RMH: 'static + Deref + Send + Sync,
474 EH: 'static + EventHandler + Send,
475 PS: 'static + Deref + Send,
476 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
477 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
478 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
479 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
480 UMH: 'static + Deref + Send + Sync,
481 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
482 S: 'static + Deref<Target = SC> + Send + Sync,
483 SC: WriteableScore<'a>,
485 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
486 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
489 CA::Target: 'static + chain::Access,
490 CF::Target: 'static + chain::Filter,
491 CW::Target: 'static + chain::Watch<Signer>,
492 T::Target: 'static + BroadcasterInterface,
493 K::Target: 'static + KeysInterface<Signer = Signer>,
494 F::Target: 'static + FeeEstimator,
495 L::Target: 'static + Logger,
496 P::Target: 'static + Persist<Signer>,
497 CMH::Target: 'static + ChannelMessageHandler,
498 OMH::Target: 'static + OnionMessageHandler,
499 RMH::Target: 'static + RoutingMessageHandler,
500 UMH::Target: 'static + CustomMessageHandler,
501 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
503 let stop_thread = Arc::new(AtomicBool::new(false));
504 let stop_thread_clone = stop_thread.clone();
505 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
506 let event_handler = |event| {
507 let network_graph = gossip_sync.network_graph();
508 if let Some(network_graph) = network_graph {
509 handle_network_graph_update(network_graph, &event)
511 event_handler.handle_event(event);
513 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
514 channel_manager, channel_manager.process_pending_events(&event_handler),
515 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
516 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
518 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
521 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
522 /// [`ChannelManager`].
526 /// This function panics if the background thread has panicked such as while persisting or
529 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
530 pub fn join(mut self) -> Result<(), std::io::Error> {
531 assert!(self.thread_handle.is_some());
535 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
536 /// [`ChannelManager`].
540 /// This function panics if the background thread has panicked such as while persisting or
543 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
544 pub fn stop(mut self) -> Result<(), std::io::Error> {
545 assert!(self.thread_handle.is_some());
546 self.stop_and_join_thread()
549 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
550 self.stop_thread.store(true, Ordering::Release);
554 fn join_thread(&mut self) -> Result<(), std::io::Error> {
555 match self.thread_handle.take() {
556 Some(handle) => handle.join().unwrap(),
562 impl Drop for BackgroundProcessor {
564 self.stop_and_join_thread().unwrap();
570 use bitcoin::blockdata::block::BlockHeader;
571 use bitcoin::blockdata::constants::genesis_block;
572 use bitcoin::blockdata::locktime::PackedLockTime;
573 use bitcoin::blockdata::transaction::{Transaction, TxOut};
574 use bitcoin::network::constants::Network;
575 use lightning::chain::{BestBlock, Confirm, chainmonitor};
576 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
577 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
578 use lightning::chain::transaction::OutPoint;
579 use lightning::get_event_msg;
580 use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
581 use lightning::ln::features::ChannelFeatures;
582 use lightning::ln::msgs::{ChannelMessageHandler, Init};
583 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
584 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
585 use lightning::util::config::UserConfig;
586 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
587 use lightning::util::ser::Writeable;
588 use lightning::util::test_utils;
589 use lightning::util::persist::KVStorePersister;
590 use lightning_invoice::payment::{InvoicePayer, Retry};
591 use lightning_invoice::utils::DefaultRouter;
592 use lightning_persister::FilesystemPersister;
594 use std::path::PathBuf;
595 use std::sync::{Arc, Mutex};
596 use std::sync::mpsc::SyncSender;
597 use std::time::Duration;
598 use bitcoin::hashes::Hash;
599 use bitcoin::TxMerkleNode;
600 use lightning::routing::scoring::{FixedPenaltyScorer};
601 use lightning_rapid_gossip_sync::RapidGossipSync;
602 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
604 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
606 #[derive(Clone, Hash, PartialEq, Eq)]
607 struct TestDescriptor{}
608 impl SocketDescriptor for TestDescriptor {
609 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
613 fn disconnect_socket(&mut self) {}
616 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
618 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
619 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
622 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
623 p2p_gossip_sync: PGS,
624 rapid_gossip_sync: RGS,
625 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
626 chain_monitor: Arc<ChainMonitor>,
627 persister: Arc<FilesystemPersister>,
628 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
629 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
630 logger: Arc<test_utils::TestLogger>,
631 best_block: BestBlock,
632 scorer: Arc<Mutex<FixedPenaltyScorer>>,
636 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
637 GossipSync::P2P(self.p2p_gossip_sync.clone())
640 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
641 GossipSync::Rapid(self.rapid_gossip_sync.clone())
644 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
651 let data_dir = self.persister.get_data_dir();
652 match fs::remove_dir_all(data_dir.clone()) {
653 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
660 graph_error: Option<(std::io::ErrorKind, &'static str)>,
661 graph_persistence_notifier: Option<SyncSender<()>>,
662 manager_error: Option<(std::io::ErrorKind, &'static str)>,
663 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
664 filesystem_persister: FilesystemPersister,
668 fn new(data_dir: String) -> Self {
669 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
670 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
673 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
674 Self { graph_error: Some((error, message)), ..self }
677 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
678 Self { graph_persistence_notifier: Some(sender), ..self }
681 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
682 Self { manager_error: Some((error, message)), ..self }
685 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
686 Self { scorer_error: Some((error, message)), ..self }
690 impl KVStorePersister for Persister {
691 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
692 if key == "manager" {
693 if let Some((error, message)) = self.manager_error {
694 return Err(std::io::Error::new(error, message))
698 if key == "network_graph" {
699 if let Some(sender) = &self.graph_persistence_notifier {
700 sender.send(()).unwrap();
703 if let Some((error, message)) = self.graph_error {
704 return Err(std::io::Error::new(error, message))
709 if let Some((error, message)) = self.scorer_error {
710 return Err(std::io::Error::new(error, message))
714 self.filesystem_persister.persist(key, object)
718 fn get_full_filepath(filepath: String, filename: String) -> String {
719 let mut path = PathBuf::from(filepath);
721 path.to_str().unwrap().to_string()
724 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
725 let mut nodes = Vec::new();
726 for i in 0..num_nodes {
727 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
728 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
729 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
730 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
731 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
732 let seed = [i as u8; 32];
733 let network = Network::Testnet;
734 let genesis_block = genesis_block(network);
735 let now = Duration::from_secs(genesis_block.header.time as u64);
736 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
737 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
738 let best_block = BestBlock::from_genesis(network);
739 let params = ChainParameters { network, best_block };
740 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
741 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
742 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
743 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
744 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
745 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
746 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
747 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
751 for i in 0..num_nodes {
752 for j in (i+1)..num_nodes {
753 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
754 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
761 macro_rules! open_channel {
762 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
763 begin_open_channel!($node_a, $node_b, $channel_value);
764 let events = $node_a.node.get_and_clear_pending_events();
765 assert_eq!(events.len(), 1);
766 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
767 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
772 macro_rules! begin_open_channel {
773 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
774 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
775 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
776 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
780 macro_rules! handle_funding_generation_ready {
781 ($event: expr, $channel_value: expr) => {{
783 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
784 assert_eq!(channel_value_satoshis, $channel_value);
785 assert_eq!(user_channel_id, 42);
787 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
788 value: channel_value_satoshis, script_pubkey: output_script.clone(),
790 (temporary_channel_id, tx)
792 _ => panic!("Unexpected event"),
797 macro_rules! end_open_channel {
798 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
799 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
800 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
801 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
805 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
807 let prev_blockhash = node.best_block.block_hash();
808 let height = node.best_block.height() + 1;
809 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
810 let txdata = vec![(0, tx)];
811 node.best_block = BestBlock::new(header.block_hash(), height);
814 node.node.transactions_confirmed(&header, &txdata, height);
815 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
818 node.node.best_block_updated(&header, height);
819 node.chain_monitor.best_block_updated(&header, height);
825 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
826 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
830 fn test_background_processor() {
831 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
832 // updates. Also test that when new updates are available, the manager signals that it needs
833 // re-persistence and is successfully re-persisted.
834 let nodes = create_nodes(2, "test_background_processor".to_string());
836 // Go through the channel creation process so that each node has something to persist. Since
837 // open_channel consumes events, it must complete before starting BackgroundProcessor to
838 // avoid a race with processing events.
839 let tx = open_channel!(nodes[0], nodes[1], 100000);
841 // Initiate the background processors to watch each node.
842 let data_dir = nodes[0].persister.get_data_dir();
843 let persister = Arc::new(Persister::new(data_dir));
844 let event_handler = |_: _| {};
845 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
847 macro_rules! check_persisted_data {
848 ($node: expr, $filepath: expr) => {
849 let mut expected_bytes = Vec::new();
851 expected_bytes.clear();
852 match $node.write(&mut expected_bytes) {
854 match std::fs::read($filepath) {
856 if bytes == expected_bytes {
865 Err(e) => panic!("Unexpected error: {}", e)
871 // Check that the initial channel manager data is persisted as expected.
872 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
873 check_persisted_data!(nodes[0].node, filepath.clone());
876 if !nodes[0].node.get_persistence_condvar_value() { break }
879 // Force-close the channel.
880 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
882 // Check that the force-close updates are persisted.
883 check_persisted_data!(nodes[0].node, filepath.clone());
885 if !nodes[0].node.get_persistence_condvar_value() { break }
888 // Check network graph is persisted
889 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
890 check_persisted_data!(nodes[0].network_graph, filepath.clone());
892 // Check scorer is persisted
893 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
894 check_persisted_data!(nodes[0].scorer, filepath.clone());
896 assert!(bg_processor.stop().is_ok());
900 fn test_timer_tick_called() {
901 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
902 // `FRESHNESS_TIMER`.
903 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
904 let data_dir = nodes[0].persister.get_data_dir();
905 let persister = Arc::new(Persister::new(data_dir));
906 let event_handler = |_: _| {};
907 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
909 let log_entries = nodes[0].logger.lines.lock().unwrap();
910 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
911 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
912 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
913 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
918 assert!(bg_processor.stop().is_ok());
922 fn test_channel_manager_persist_error() {
923 // Test that if we encounter an error during manager persistence, the thread panics.
924 let nodes = create_nodes(2, "test_persist_error".to_string());
925 open_channel!(nodes[0], nodes[1], 100000);
927 let data_dir = nodes[0].persister.get_data_dir();
928 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
929 let event_handler = |_: _| {};
930 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
931 match bg_processor.join() {
932 Ok(_) => panic!("Expected error persisting manager"),
934 assert_eq!(e.kind(), std::io::ErrorKind::Other);
935 assert_eq!(e.get_ref().unwrap().to_string(), "test");
941 fn test_network_graph_persist_error() {
942 // Test that if we encounter an error during network graph persistence, an error gets returned.
943 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
944 let data_dir = nodes[0].persister.get_data_dir();
945 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
946 let event_handler = |_: _| {};
947 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
949 match bg_processor.stop() {
950 Ok(_) => panic!("Expected error persisting network graph"),
952 assert_eq!(e.kind(), std::io::ErrorKind::Other);
953 assert_eq!(e.get_ref().unwrap().to_string(), "test");
959 fn test_scorer_persist_error() {
960 // Test that if we encounter an error during scorer persistence, an error gets returned.
961 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
962 let data_dir = nodes[0].persister.get_data_dir();
963 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
964 let event_handler = |_: _| {};
965 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
967 match bg_processor.stop() {
968 Ok(_) => panic!("Expected error persisting scorer"),
970 assert_eq!(e.kind(), std::io::ErrorKind::Other);
971 assert_eq!(e.get_ref().unwrap().to_string(), "test");
977 fn test_background_event_handling() {
978 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
979 let channel_value = 100000;
980 let data_dir = nodes[0].persister.get_data_dir();
981 let persister = Arc::new(Persister::new(data_dir.clone()));
983 // Set up a background event handler for FundingGenerationReady events.
984 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
985 let event_handler = move |event: Event| match event {
986 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
987 Event::ChannelReady { .. } => {},
988 _ => panic!("Unexpected event: {:?}", event),
991 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
993 // Open a channel and check that the FundingGenerationReady event was handled.
994 begin_open_channel!(nodes[0], nodes[1], channel_value);
995 let (temporary_channel_id, funding_tx) = receiver
996 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
997 .expect("FundingGenerationReady not handled within deadline");
998 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1000 // Confirm the funding transaction.
1001 confirm_transaction(&mut nodes[0], &funding_tx);
1002 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1003 confirm_transaction(&mut nodes[1], &funding_tx);
1004 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1005 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1006 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1007 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1008 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1010 assert!(bg_processor.stop().is_ok());
1012 // Set up a background event handler for SpendableOutputs events.
1013 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1014 let event_handler = move |event: Event| match event {
1015 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1016 Event::ChannelReady { .. } => {},
1017 Event::ChannelClosed { .. } => {},
1018 _ => panic!("Unexpected event: {:?}", event),
1020 let persister = Arc::new(Persister::new(data_dir));
1021 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1023 // Force close the channel and check that the SpendableOutputs event was handled.
1024 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1025 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1026 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1028 let event = receiver
1029 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1030 .expect("Events not handled within deadline");
1032 Event::SpendableOutputs { .. } => {},
1033 _ => panic!("Unexpected event: {:?}", event),
1036 assert!(bg_processor.stop().is_ok());
1040 fn test_scorer_persistence() {
1041 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1042 let data_dir = nodes[0].persister.get_data_dir();
1043 let persister = Arc::new(Persister::new(data_dir));
1044 let event_handler = |_: _| {};
1045 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1048 let log_entries = nodes[0].logger.lines.lock().unwrap();
1049 let expected_log = "Persisting scorer".to_string();
1050 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1055 assert!(bg_processor.stop().is_ok());
1059 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1060 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1061 let data_dir = nodes[0].persister.get_data_dir();
1062 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1063 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1064 let network_graph = nodes[0].network_graph.clone();
1065 let features = ChannelFeatures::empty();
1066 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1067 .expect("Failed to update channel from partial announcement");
1068 let original_graph_description = network_graph.to_string();
1069 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1070 assert_eq!(network_graph.read_only().channels().len(), 1);
1072 let event_handler = |_: _| {};
1073 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1076 let log_entries = nodes[0].logger.lines.lock().unwrap();
1077 let expected_log_a = "Assessing prunability of network graph".to_string();
1078 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1079 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1080 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1085 let initialization_input = vec![
1086 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1087 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1088 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1089 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1090 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1091 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1092 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1093 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1094 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1095 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1096 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1097 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1098 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1100 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1102 // this should have added two channels
1103 assert_eq!(network_graph.read_only().channels().len(), 3);
1106 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1107 .expect("Network graph not pruned within deadline");
1109 background_processor.stop().unwrap();
1111 // all channels should now be pruned
1112 assert_eq!(network_graph.read_only().channels().len(), 0);
1116 fn test_invoice_payer() {
1117 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1118 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1119 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1121 // Initiate the background processors to watch each node.
1122 let data_dir = nodes[0].persister.get_data_dir();
1123 let persister = Arc::new(Persister::new(data_dir));
1124 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1125 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1126 let event_handler = Arc::clone(&invoice_payer);
1127 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1128 assert!(bg_processor.stop().is_ok());