1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 /// writing it to disk/backups by invoking the callback given to it at startup.
46 /// [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 /// at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66 stop_thread: Arc<AtomicBool>,
67 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
71 const FRESHNESS_TIMER: u64 = 60;
73 const FRESHNESS_TIMER: u64 = 1;
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
83 const PING_TIMER: u64 = 1;
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
89 const SCORER_PERSIST_TIMER: u64 = 30;
91 const SCORER_PERSIST_TIMER: u64 = 1;
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 P: Deref<Target = P2PGossipSync<G, A, L>>,
101 R: Deref<Target = RapidGossipSync<G, L>>,
102 G: Deref<Target = NetworkGraph<L>>,
106 where A::Target: chain::Access, L::Target: Logger {
107 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109 /// Rapid gossip sync from a trusted server.
116 P: Deref<Target = P2PGossipSync<G, A, L>>,
117 R: Deref<Target = RapidGossipSync<G, L>>,
118 G: Deref<Target = NetworkGraph<L>>,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123 fn network_graph(&self) -> Option<&G> {
125 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::None => None,
131 fn prunable_network_graph(&self) -> Option<&G> {
133 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134 GossipSync::Rapid(gossip_sync) => {
135 if gossip_sync.is_initial_sync_complete() {
136 Some(gossip_sync.network_graph())
141 GossipSync::None => None,
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 A::Target: chain::Access,
153 /// Initializes a new [`GossipSync::P2P`] variant.
154 pub fn p2p(gossip_sync: P) -> Self {
155 GossipSync::P2P(gossip_sync)
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
165 &'a (dyn chain::Access + Send + Sync),
171 /// Initializes a new [`GossipSync::Rapid`] variant.
172 pub fn rapid(gossip_sync: R) -> Self {
173 GossipSync::Rapid(gossip_sync)
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
180 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183 &'a (dyn chain::Access + Send + Sync),
189 /// Initializes a new [`GossipSync::None`] variant.
190 pub fn none() -> Self {
195 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
196 struct DecoratingEventHandler<
199 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
200 RGS: Deref<Target = RapidGossipSync<G, L>>,
201 G: Deref<Target = NetworkGraph<L>>,
205 where A::Target: chain::Access, L::Target: Logger {
207 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
213 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
214 RGS: Deref<Target = RapidGossipSync<G, L>>,
215 G: Deref<Target = NetworkGraph<L>>,
218 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
219 where A::Target: chain::Access, L::Target: Logger {
220 fn handle_event(&self, event: &Event) {
221 if let Some(network_graph) = self.gossip_sync.network_graph() {
222 network_graph.handle_event(event);
224 self.event_handler.handle_event(event);
228 macro_rules! define_run_body {
229 ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
230 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231 $loop_exit_check: expr, $await: expr)
233 let event_handler = DecoratingEventHandler {
234 event_handler: $event_handler,
235 gossip_sync: &$gossip_sync,
238 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
239 $channel_manager.timer_tick_occurred();
241 let mut last_freshness_call = Instant::now();
242 let mut last_ping_call = Instant::now();
243 let mut last_prune_call = Instant::now();
244 let mut last_scorer_persist_call = Instant::now();
245 let mut have_pruned = false;
248 $channel_manager.process_pending_events(&event_handler);
249 $chain_monitor.process_pending_events(&event_handler);
251 // Note that the PeerManager::process_events may block on ChannelManager's locks,
252 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
253 // we want to ensure we get into `persist_manager` as quickly as we can, especially
254 // without running the normal event processing above and handing events to users.
256 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
257 // processing a message effectively at any point during this loop. In order to
258 // minimize the time between such processing completing and persisting the updated
259 // ChannelManager, we want to minimize methods blocking on a ChannelManager
260 // generally, and as a fallback place such blocking only immediately before
262 $peer_manager.process_events();
264 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
265 // see `await_start`'s use below.
266 let await_start = Instant::now();
267 let updates_available = $await;
268 let await_time = await_start.elapsed();
270 if updates_available {
271 log_trace!($logger, "Persisting ChannelManager...");
272 $persister.persist_manager(&*$channel_manager)?;
273 log_trace!($logger, "Done persisting ChannelManager.");
275 // Exit the loop if the background processor was requested to stop.
276 if $loop_exit_check {
277 log_trace!($logger, "Terminating background processor.");
280 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
281 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
282 $channel_manager.timer_tick_occurred();
283 last_freshness_call = Instant::now();
285 if await_time > Duration::from_secs(1) {
286 // On various platforms, we may be starved of CPU cycles for several reasons.
287 // E.g. on iOS, if we've been in the background, we will be entirely paused.
288 // Similarly, if we're on a desktop platform and the device has been asleep, we
289 // may not get any cycles.
290 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
291 // full second, at which point we assume sockets may have been killed (they
292 // appear to be at least on some platforms, even if it has only been a second).
293 // Note that we have to take care to not get here just because user event
294 // processing was slow at the top of the loop. For example, the sample client
295 // may call Bitcoin Core RPCs during event handling, which very often takes
296 // more than a handful of seconds to complete, and shouldn't disconnect all our
298 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
299 $peer_manager.disconnect_all_peers();
300 last_ping_call = Instant::now();
301 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
302 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
303 $peer_manager.timer_tick_occurred();
304 last_ping_call = Instant::now();
307 // Note that we want to run a graph prune once not long after startup before
308 // falling back to our usual hourly prunes. This avoids short-lived clients never
309 // pruning their network graph. We run once 60 seconds after startup before
310 // continuing our normal cadence.
311 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
312 // The network graph must not be pruned while rapid sync completion is pending
313 log_trace!($logger, "Assessing prunability of network graph");
314 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
315 network_graph.remove_stale_channels_and_tracking();
317 if let Err(e) = $persister.persist_graph(network_graph) {
318 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
321 last_prune_call = Instant::now();
324 log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
328 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
329 if let Some(ref scorer) = $scorer {
330 log_trace!($logger, "Persisting scorer");
331 if let Err(e) = $persister.persist_scorer(&scorer) {
332 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
335 last_scorer_persist_call = Instant::now();
339 // After we exit, ensure we persist the ChannelManager one final time - this avoids
340 // some races where users quit while channel updates were in-flight, with
341 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
342 $persister.persist_manager(&*$channel_manager)?;
344 // Persist Scorer on exit
345 if let Some(ref scorer) = $scorer {
346 $persister.persist_scorer(&scorer)?;
349 // Persist NetworkGraph on exit
350 if let Some(network_graph) = $gossip_sync.network_graph() {
351 $persister.persist_graph(network_graph)?;
358 /// Processes background events in a future.
360 /// `sleeper` should return a future which completes in the given amount of time and returns a
361 /// boolean indicating whether the background processing should continue. Once `sleeper` returns a
362 /// future which outputs false, the loop will exit and this function's future will complete.
364 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
365 #[cfg(feature = "futures")]
366 pub async fn process_events_async<
368 Signer: 'static + Sign,
369 CA: 'static + Deref + Send + Sync,
370 CF: 'static + Deref + Send + Sync,
371 CW: 'static + Deref + Send + Sync,
372 T: 'static + Deref + Send + Sync,
373 K: 'static + Deref + Send + Sync,
374 F: 'static + Deref + Send + Sync,
375 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
376 L: 'static + Deref + Send + Sync,
377 P: 'static + Deref + Send + Sync,
378 Descriptor: 'static + SocketDescriptor + Send + Sync,
379 CMH: 'static + Deref + Send + Sync,
380 RMH: 'static + Deref + Send + Sync,
381 OMH: 'static + Deref + Send + Sync,
382 EH: 'static + EventHandler + Send,
383 PS: 'static + Deref + Send,
384 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
385 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
386 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
387 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
388 UMH: 'static + Deref + Send + Sync,
389 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
390 S: 'static + Deref<Target = SC> + Send + Sync,
391 SC: WriteableScore<'a>,
392 SleepFuture: core::future::Future<Output = bool>,
393 Sleeper: Fn(Duration) -> SleepFuture
395 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
396 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
398 ) -> Result<(), std::io::Error>
400 CA::Target: 'static + chain::Access,
401 CF::Target: 'static + chain::Filter,
402 CW::Target: 'static + chain::Watch<Signer>,
403 T::Target: 'static + BroadcasterInterface,
404 K::Target: 'static + KeysInterface<Signer = Signer>,
405 F::Target: 'static + FeeEstimator,
406 L::Target: 'static + Logger,
407 P::Target: 'static + Persist<Signer>,
408 CMH::Target: 'static + ChannelMessageHandler,
409 OMH::Target: 'static + OnionMessageHandler,
410 RMH::Target: 'static + RoutingMessageHandler,
411 UMH::Target: 'static + CustomMessageHandler,
412 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
414 let mut should_continue = true;
415 define_run_body!(persister, event_handler, chain_monitor, channel_manager,
416 gossip_sync, peer_manager, logger, scorer, should_continue, {
418 _ = channel_manager.get_persistable_update_future().fuse() => true,
419 cont = sleeper(Duration::from_millis(100)).fuse() => {
420 should_continue = cont;
427 impl BackgroundProcessor {
428 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
431 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
432 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
433 /// either [`join`] or [`stop`].
435 /// # Data Persistence
437 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
438 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
439 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
440 /// provided implementation.
442 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
443 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
444 /// See the `lightning-persister` crate for LDK's provided implementation.
446 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
447 /// error or call [`join`] and handle any error that may arise. For the latter case,
448 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
452 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
453 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
454 /// functionality implemented by other handlers.
455 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
457 /// # Rapid Gossip Sync
459 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
460 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
461 /// until the [`RapidGossipSync`] instance completes its first sync.
463 /// [top-level documentation]: BackgroundProcessor
464 /// [`join`]: Self::join
465 /// [`stop`]: Self::stop
466 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
467 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
468 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
469 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
470 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
471 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
474 Signer: 'static + Sign,
475 CA: 'static + Deref + Send + Sync,
476 CF: 'static + Deref + Send + Sync,
477 CW: 'static + Deref + Send + Sync,
478 T: 'static + Deref + Send + Sync,
479 K: 'static + Deref + Send + Sync,
480 F: 'static + Deref + Send + Sync,
481 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
482 L: 'static + Deref + Send + Sync,
483 P: 'static + Deref + Send + Sync,
484 Descriptor: 'static + SocketDescriptor + Send + Sync,
485 CMH: 'static + Deref + Send + Sync,
486 OMH: 'static + Deref + Send + Sync,
487 RMH: 'static + Deref + Send + Sync,
488 EH: 'static + EventHandler + Send,
489 PS: 'static + Deref + Send,
490 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
491 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
492 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
493 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
494 UMH: 'static + Deref + Send + Sync,
495 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
496 S: 'static + Deref<Target = SC> + Send + Sync,
497 SC: WriteableScore<'a>,
499 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
500 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
503 CA::Target: 'static + chain::Access,
504 CF::Target: 'static + chain::Filter,
505 CW::Target: 'static + chain::Watch<Signer>,
506 T::Target: 'static + BroadcasterInterface,
507 K::Target: 'static + KeysInterface<Signer = Signer>,
508 F::Target: 'static + FeeEstimator,
509 L::Target: 'static + Logger,
510 P::Target: 'static + Persist<Signer>,
511 CMH::Target: 'static + ChannelMessageHandler,
512 OMH::Target: 'static + OnionMessageHandler,
513 RMH::Target: 'static + RoutingMessageHandler,
514 UMH::Target: 'static + CustomMessageHandler,
515 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
517 let stop_thread = Arc::new(AtomicBool::new(false));
518 let stop_thread_clone = stop_thread.clone();
519 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
520 define_run_body!(persister, event_handler, chain_monitor, channel_manager,
521 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
522 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
524 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
527 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
528 /// [`ChannelManager`].
532 /// This function panics if the background thread has panicked such as while persisting or
535 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
536 pub fn join(mut self) -> Result<(), std::io::Error> {
537 assert!(self.thread_handle.is_some());
541 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
542 /// [`ChannelManager`].
546 /// This function panics if the background thread has panicked such as while persisting or
549 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
550 pub fn stop(mut self) -> Result<(), std::io::Error> {
551 assert!(self.thread_handle.is_some());
552 self.stop_and_join_thread()
555 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
556 self.stop_thread.store(true, Ordering::Release);
560 fn join_thread(&mut self) -> Result<(), std::io::Error> {
561 match self.thread_handle.take() {
562 Some(handle) => handle.join().unwrap(),
568 impl Drop for BackgroundProcessor {
570 self.stop_and_join_thread().unwrap();
576 use bitcoin::blockdata::block::BlockHeader;
577 use bitcoin::blockdata::constants::genesis_block;
578 use bitcoin::blockdata::locktime::PackedLockTime;
579 use bitcoin::blockdata::transaction::{Transaction, TxOut};
580 use bitcoin::network::constants::Network;
581 use lightning::chain::{BestBlock, Confirm, chainmonitor};
582 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
583 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
584 use lightning::chain::transaction::OutPoint;
585 use lightning::get_event_msg;
586 use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
587 use lightning::ln::features::ChannelFeatures;
588 use lightning::ln::msgs::{ChannelMessageHandler, Init};
589 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
590 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
591 use lightning::util::config::UserConfig;
592 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
593 use lightning::util::ser::Writeable;
594 use lightning::util::test_utils;
595 use lightning::util::persist::KVStorePersister;
596 use lightning_invoice::payment::{InvoicePayer, Retry};
597 use lightning_invoice::utils::DefaultRouter;
598 use lightning_persister::FilesystemPersister;
600 use std::path::PathBuf;
601 use std::sync::{Arc, Mutex};
602 use std::sync::mpsc::SyncSender;
603 use std::time::Duration;
604 use bitcoin::hashes::Hash;
605 use bitcoin::TxMerkleNode;
606 use lightning::routing::scoring::{FixedPenaltyScorer};
607 use lightning_rapid_gossip_sync::RapidGossipSync;
608 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
610 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
612 #[derive(Clone, Hash, PartialEq, Eq)]
613 struct TestDescriptor{}
614 impl SocketDescriptor for TestDescriptor {
615 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
619 fn disconnect_socket(&mut self) {}
622 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
624 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
625 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
628 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
629 p2p_gossip_sync: PGS,
630 rapid_gossip_sync: RGS,
631 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
632 chain_monitor: Arc<ChainMonitor>,
633 persister: Arc<FilesystemPersister>,
634 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
635 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
636 logger: Arc<test_utils::TestLogger>,
637 best_block: BestBlock,
638 scorer: Arc<Mutex<FixedPenaltyScorer>>,
642 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
643 GossipSync::P2P(self.p2p_gossip_sync.clone())
646 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
647 GossipSync::Rapid(self.rapid_gossip_sync.clone())
650 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
657 let data_dir = self.persister.get_data_dir();
658 match fs::remove_dir_all(data_dir.clone()) {
659 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
666 graph_error: Option<(std::io::ErrorKind, &'static str)>,
667 graph_persistence_notifier: Option<SyncSender<()>>,
668 manager_error: Option<(std::io::ErrorKind, &'static str)>,
669 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
670 filesystem_persister: FilesystemPersister,
674 fn new(data_dir: String) -> Self {
675 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
676 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
679 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
680 Self { graph_error: Some((error, message)), ..self }
683 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
684 Self { graph_persistence_notifier: Some(sender), ..self }
687 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
688 Self { manager_error: Some((error, message)), ..self }
691 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
692 Self { scorer_error: Some((error, message)), ..self }
696 impl KVStorePersister for Persister {
697 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
698 if key == "manager" {
699 if let Some((error, message)) = self.manager_error {
700 return Err(std::io::Error::new(error, message))
704 if key == "network_graph" {
705 if let Some(sender) = &self.graph_persistence_notifier {
706 sender.send(()).unwrap();
709 if let Some((error, message)) = self.graph_error {
710 return Err(std::io::Error::new(error, message))
715 if let Some((error, message)) = self.scorer_error {
716 return Err(std::io::Error::new(error, message))
720 self.filesystem_persister.persist(key, object)
724 fn get_full_filepath(filepath: String, filename: String) -> String {
725 let mut path = PathBuf::from(filepath);
727 path.to_str().unwrap().to_string()
730 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
731 let mut nodes = Vec::new();
732 for i in 0..num_nodes {
733 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
734 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
735 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
736 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
737 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
738 let seed = [i as u8; 32];
739 let network = Network::Testnet;
740 let genesis_block = genesis_block(network);
741 let now = Duration::from_secs(genesis_block.header.time as u64);
742 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
743 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
744 let best_block = BestBlock::from_genesis(network);
745 let params = ChainParameters { network, best_block };
746 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
747 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
748 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
749 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
750 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
751 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
752 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
753 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
757 for i in 0..num_nodes {
758 for j in (i+1)..num_nodes {
759 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
760 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
767 macro_rules! open_channel {
768 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
769 begin_open_channel!($node_a, $node_b, $channel_value);
770 let events = $node_a.node.get_and_clear_pending_events();
771 assert_eq!(events.len(), 1);
772 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
773 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
778 macro_rules! begin_open_channel {
779 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
780 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
781 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
782 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
786 macro_rules! handle_funding_generation_ready {
787 ($event: expr, $channel_value: expr) => {{
789 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
790 assert_eq!(channel_value_satoshis, $channel_value);
791 assert_eq!(user_channel_id, 42);
793 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
794 value: channel_value_satoshis, script_pubkey: output_script.clone(),
796 (temporary_channel_id, tx)
798 _ => panic!("Unexpected event"),
803 macro_rules! end_open_channel {
804 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
805 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
806 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
807 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
811 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
813 let prev_blockhash = node.best_block.block_hash();
814 let height = node.best_block.height() + 1;
815 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
816 let txdata = vec![(0, tx)];
817 node.best_block = BestBlock::new(header.block_hash(), height);
820 node.node.transactions_confirmed(&header, &txdata, height);
821 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
824 node.node.best_block_updated(&header, height);
825 node.chain_monitor.best_block_updated(&header, height);
831 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
832 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
836 fn test_background_processor() {
837 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
838 // updates. Also test that when new updates are available, the manager signals that it needs
839 // re-persistence and is successfully re-persisted.
840 let nodes = create_nodes(2, "test_background_processor".to_string());
842 // Go through the channel creation process so that each node has something to persist. Since
843 // open_channel consumes events, it must complete before starting BackgroundProcessor to
844 // avoid a race with processing events.
845 let tx = open_channel!(nodes[0], nodes[1], 100000);
847 // Initiate the background processors to watch each node.
848 let data_dir = nodes[0].persister.get_data_dir();
849 let persister = Arc::new(Persister::new(data_dir));
850 let event_handler = |_: &_| {};
851 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
853 macro_rules! check_persisted_data {
854 ($node: expr, $filepath: expr) => {
855 let mut expected_bytes = Vec::new();
857 expected_bytes.clear();
858 match $node.write(&mut expected_bytes) {
860 match std::fs::read($filepath) {
862 if bytes == expected_bytes {
871 Err(e) => panic!("Unexpected error: {}", e)
877 // Check that the initial channel manager data is persisted as expected.
878 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
879 check_persisted_data!(nodes[0].node, filepath.clone());
882 if !nodes[0].node.get_persistence_condvar_value() { break }
885 // Force-close the channel.
886 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
888 // Check that the force-close updates are persisted.
889 check_persisted_data!(nodes[0].node, filepath.clone());
891 if !nodes[0].node.get_persistence_condvar_value() { break }
894 // Check network graph is persisted
895 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
896 check_persisted_data!(nodes[0].network_graph, filepath.clone());
898 // Check scorer is persisted
899 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
900 check_persisted_data!(nodes[0].scorer, filepath.clone());
902 assert!(bg_processor.stop().is_ok());
906 fn test_timer_tick_called() {
907 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
908 // `FRESHNESS_TIMER`.
909 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
910 let data_dir = nodes[0].persister.get_data_dir();
911 let persister = Arc::new(Persister::new(data_dir));
912 let event_handler = |_: &_| {};
913 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
915 let log_entries = nodes[0].logger.lines.lock().unwrap();
916 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
917 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
918 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
919 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
924 assert!(bg_processor.stop().is_ok());
928 fn test_channel_manager_persist_error() {
929 // Test that if we encounter an error during manager persistence, the thread panics.
930 let nodes = create_nodes(2, "test_persist_error".to_string());
931 open_channel!(nodes[0], nodes[1], 100000);
933 let data_dir = nodes[0].persister.get_data_dir();
934 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
935 let event_handler = |_: &_| {};
936 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
937 match bg_processor.join() {
938 Ok(_) => panic!("Expected error persisting manager"),
940 assert_eq!(e.kind(), std::io::ErrorKind::Other);
941 assert_eq!(e.get_ref().unwrap().to_string(), "test");
947 fn test_network_graph_persist_error() {
948 // Test that if we encounter an error during network graph persistence, an error gets returned.
949 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
950 let data_dir = nodes[0].persister.get_data_dir();
951 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
952 let event_handler = |_: &_| {};
953 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
955 match bg_processor.stop() {
956 Ok(_) => panic!("Expected error persisting network graph"),
958 assert_eq!(e.kind(), std::io::ErrorKind::Other);
959 assert_eq!(e.get_ref().unwrap().to_string(), "test");
965 fn test_scorer_persist_error() {
966 // Test that if we encounter an error during scorer persistence, an error gets returned.
967 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
968 let data_dir = nodes[0].persister.get_data_dir();
969 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
970 let event_handler = |_: &_| {};
971 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
973 match bg_processor.stop() {
974 Ok(_) => panic!("Expected error persisting scorer"),
976 assert_eq!(e.kind(), std::io::ErrorKind::Other);
977 assert_eq!(e.get_ref().unwrap().to_string(), "test");
983 fn test_background_event_handling() {
984 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
985 let channel_value = 100000;
986 let data_dir = nodes[0].persister.get_data_dir();
987 let persister = Arc::new(Persister::new(data_dir.clone()));
989 // Set up a background event handler for FundingGenerationReady events.
990 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
991 let event_handler = move |event: &Event| match event {
992 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
993 Event::ChannelReady { .. } => {},
994 _ => panic!("Unexpected event: {:?}", event),
997 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
999 // Open a channel and check that the FundingGenerationReady event was handled.
1000 begin_open_channel!(nodes[0], nodes[1], channel_value);
1001 let (temporary_channel_id, funding_tx) = receiver
1002 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1003 .expect("FundingGenerationReady not handled within deadline");
1004 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1006 // Confirm the funding transaction.
1007 confirm_transaction(&mut nodes[0], &funding_tx);
1008 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1009 confirm_transaction(&mut nodes[1], &funding_tx);
1010 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1011 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1012 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1013 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1014 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1016 assert!(bg_processor.stop().is_ok());
1018 // Set up a background event handler for SpendableOutputs events.
1019 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1020 let event_handler = move |event: &Event| match event {
1021 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1022 Event::ChannelReady { .. } => {},
1023 Event::ChannelClosed { .. } => {},
1024 _ => panic!("Unexpected event: {:?}", event),
1026 let persister = Arc::new(Persister::new(data_dir));
1027 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1029 // Force close the channel and check that the SpendableOutputs event was handled.
1030 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1031 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1032 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1034 let event = receiver
1035 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1036 .expect("Events not handled within deadline");
1038 Event::SpendableOutputs { .. } => {},
1039 _ => panic!("Unexpected event: {:?}", event),
1042 assert!(bg_processor.stop().is_ok());
1046 fn test_scorer_persistence() {
1047 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1048 let data_dir = nodes[0].persister.get_data_dir();
1049 let persister = Arc::new(Persister::new(data_dir));
1050 let event_handler = |_: &_| {};
1051 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1054 let log_entries = nodes[0].logger.lines.lock().unwrap();
1055 let expected_log = "Persisting scorer".to_string();
1056 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1061 assert!(bg_processor.stop().is_ok());
1065 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1066 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1067 let data_dir = nodes[0].persister.get_data_dir();
1068 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1069 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1070 let network_graph = nodes[0].network_graph.clone();
1071 let features = ChannelFeatures::empty();
1072 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1073 .expect("Failed to update channel from partial announcement");
1074 let original_graph_description = network_graph.to_string();
1075 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1076 assert_eq!(network_graph.read_only().channels().len(), 1);
1078 let event_handler = |_: &_| {};
1079 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1082 let log_entries = nodes[0].logger.lines.lock().unwrap();
1083 let expected_log_a = "Assessing prunability of network graph".to_string();
1084 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1085 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1086 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1091 let initialization_input = vec![
1092 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1093 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1094 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1095 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1096 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1097 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1098 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1099 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1100 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1101 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1102 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1103 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1104 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1106 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1108 // this should have added two channels
1109 assert_eq!(network_graph.read_only().channels().len(), 3);
1112 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1113 .expect("Network graph not pruned within deadline");
1115 background_processor.stop().unwrap();
1117 // all channels should now be pruned
1118 assert_eq!(network_graph.read_only().channels().len(), 0);
1122 fn test_invoice_payer() {
1123 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1124 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1125 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1127 // Initiate the background processors to watch each node.
1128 let data_dir = nodes[0].persister.get_data_dir();
1129 let persister = Arc::new(Persister::new(data_dir));
1130 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1131 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1132 let event_handler = Arc::clone(&invoice_payer);
1133 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1134 assert!(bg_processor.stop().is_ok());