1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::KeysInterface;
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 /// writing it to disk/backups by invoking the callback given to it at startup.
46 /// [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 /// at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66 stop_thread: Arc<AtomicBool>,
67 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
71 const FRESHNESS_TIMER: u64 = 60;
73 const FRESHNESS_TIMER: u64 = 1;
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
83 const PING_TIMER: u64 = 1;
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
89 const SCORER_PERSIST_TIMER: u64 = 30;
91 const SCORER_PERSIST_TIMER: u64 = 1;
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 P: Deref<Target = P2PGossipSync<G, A, L>>,
101 R: Deref<Target = RapidGossipSync<G, L>>,
102 G: Deref<Target = NetworkGraph<L>>,
106 where A::Target: chain::Access, L::Target: Logger {
107 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109 /// Rapid gossip sync from a trusted server.
116 P: Deref<Target = P2PGossipSync<G, A, L>>,
117 R: Deref<Target = RapidGossipSync<G, L>>,
118 G: Deref<Target = NetworkGraph<L>>,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123 fn network_graph(&self) -> Option<&G> {
125 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::None => None,
131 fn prunable_network_graph(&self) -> Option<&G> {
133 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134 GossipSync::Rapid(gossip_sync) => {
135 if gossip_sync.is_initial_sync_complete() {
136 Some(gossip_sync.network_graph())
141 GossipSync::None => None,
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 A::Target: chain::Access,
153 /// Initializes a new [`GossipSync::P2P`] variant.
154 pub fn p2p(gossip_sync: P) -> Self {
155 GossipSync::P2P(gossip_sync)
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
165 &'a (dyn chain::Access + Send + Sync),
171 /// Initializes a new [`GossipSync::Rapid`] variant.
172 pub fn rapid(gossip_sync: R) -> Self {
173 GossipSync::Rapid(gossip_sync)
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
180 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183 &'a (dyn chain::Access + Send + Sync),
189 /// Initializes a new [`GossipSync::None`] variant.
190 pub fn none() -> Self {
195 fn handle_network_graph_update<L: Deref>(
196 network_graph: &NetworkGraph<L>, event: &Event
197 ) where L::Target: Logger {
198 if let Event::PaymentPathFailed { ref network_update, .. } = event {
199 if let Some(network_update) = network_update {
200 network_graph.handle_network_update(&network_update);
205 macro_rules! define_run_body {
206 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
207 $channel_manager: ident, $process_channel_manager_events: expr,
208 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
209 $loop_exit_check: expr, $await: expr)
211 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
212 $channel_manager.timer_tick_occurred();
214 let mut last_freshness_call = Instant::now();
215 let mut last_ping_call = Instant::now();
216 let mut last_prune_call = Instant::now();
217 let mut last_scorer_persist_call = Instant::now();
218 let mut have_pruned = false;
221 $process_channel_manager_events;
222 $process_chain_monitor_events;
224 // Note that the PeerManager::process_events may block on ChannelManager's locks,
225 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
226 // we want to ensure we get into `persist_manager` as quickly as we can, especially
227 // without running the normal event processing above and handing events to users.
229 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
230 // processing a message effectively at any point during this loop. In order to
231 // minimize the time between such processing completing and persisting the updated
232 // ChannelManager, we want to minimize methods blocking on a ChannelManager
233 // generally, and as a fallback place such blocking only immediately before
235 $peer_manager.process_events();
237 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
238 // see `await_start`'s use below.
239 let await_start = Instant::now();
240 let updates_available = $await;
241 let await_time = await_start.elapsed();
243 if updates_available {
244 log_trace!($logger, "Persisting ChannelManager...");
245 $persister.persist_manager(&*$channel_manager)?;
246 log_trace!($logger, "Done persisting ChannelManager.");
248 // Exit the loop if the background processor was requested to stop.
249 if $loop_exit_check {
250 log_trace!($logger, "Terminating background processor.");
253 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
254 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
255 $channel_manager.timer_tick_occurred();
256 last_freshness_call = Instant::now();
258 if await_time > Duration::from_secs(1) {
259 // On various platforms, we may be starved of CPU cycles for several reasons.
260 // E.g. on iOS, if we've been in the background, we will be entirely paused.
261 // Similarly, if we're on a desktop platform and the device has been asleep, we
262 // may not get any cycles.
263 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
264 // full second, at which point we assume sockets may have been killed (they
265 // appear to be at least on some platforms, even if it has only been a second).
266 // Note that we have to take care to not get here just because user event
267 // processing was slow at the top of the loop. For example, the sample client
268 // may call Bitcoin Core RPCs during event handling, which very often takes
269 // more than a handful of seconds to complete, and shouldn't disconnect all our
271 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
272 $peer_manager.disconnect_all_peers();
273 last_ping_call = Instant::now();
274 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
275 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
276 $peer_manager.timer_tick_occurred();
277 last_ping_call = Instant::now();
280 // Note that we want to run a graph prune once not long after startup before
281 // falling back to our usual hourly prunes. This avoids short-lived clients never
282 // pruning their network graph. We run once 60 seconds after startup before
283 // continuing our normal cadence.
284 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
285 // The network graph must not be pruned while rapid sync completion is pending
286 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
287 log_trace!($logger, "Pruning and persisting network graph.");
288 network_graph.remove_stale_channels_and_tracking();
290 if let Err(e) = $persister.persist_graph(network_graph) {
291 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
294 last_prune_call = Instant::now();
299 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
300 if let Some(ref scorer) = $scorer {
301 log_trace!($logger, "Persisting scorer");
302 if let Err(e) = $persister.persist_scorer(&scorer) {
303 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
306 last_scorer_persist_call = Instant::now();
310 // After we exit, ensure we persist the ChannelManager one final time - this avoids
311 // some races where users quit while channel updates were in-flight, with
312 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
313 $persister.persist_manager(&*$channel_manager)?;
315 // Persist Scorer on exit
316 if let Some(ref scorer) = $scorer {
317 $persister.persist_scorer(&scorer)?;
320 // Persist NetworkGraph on exit
321 if let Some(network_graph) = $gossip_sync.network_graph() {
322 $persister.persist_graph(network_graph)?;
329 /// Processes background events in a future.
331 /// `sleeper` should return a future which completes in the given amount of time and returns a
332 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
333 /// future which outputs true, the loop will exit and this function's future will complete.
335 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
336 #[cfg(feature = "futures")]
337 pub async fn process_events_async<
339 CA: 'static + Deref + Send + Sync,
340 CF: 'static + Deref + Send + Sync,
341 CW: 'static + Deref + Send + Sync,
342 T: 'static + Deref + Send + Sync,
343 K: 'static + Deref + Send + Sync,
344 F: 'static + Deref + Send + Sync,
345 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
346 L: 'static + Deref + Send + Sync,
347 P: 'static + Deref + Send + Sync,
348 Descriptor: 'static + SocketDescriptor + Send + Sync,
349 CMH: 'static + Deref + Send + Sync,
350 RMH: 'static + Deref + Send + Sync,
351 OMH: 'static + Deref + Send + Sync,
352 EventHandlerFuture: core::future::Future<Output = ()>,
353 EventHandler: Fn(Event) -> EventHandlerFuture,
354 PS: 'static + Deref + Send,
355 M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
356 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
357 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
358 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
359 UMH: 'static + Deref + Send + Sync,
360 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
361 S: 'static + Deref<Target = SC> + Send + Sync,
362 SC: WriteableScore<'a>,
363 SleepFuture: core::future::Future<Output = bool>,
364 Sleeper: Fn(Duration) -> SleepFuture
366 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
367 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
369 ) -> Result<(), std::io::Error>
371 CA::Target: 'static + chain::Access,
372 CF::Target: 'static + chain::Filter,
373 CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
374 T::Target: 'static + BroadcasterInterface,
375 K::Target: 'static + KeysInterface,
376 F::Target: 'static + FeeEstimator,
377 L::Target: 'static + Logger,
378 P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
379 CMH::Target: 'static + ChannelMessageHandler,
380 OMH::Target: 'static + OnionMessageHandler,
381 RMH::Target: 'static + RoutingMessageHandler,
382 UMH::Target: 'static + CustomMessageHandler,
383 PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
385 let mut should_break = true;
386 let async_event_handler = |event| {
387 let network_graph = gossip_sync.network_graph();
388 let event_handler = &event_handler;
390 if let Some(network_graph) = network_graph {
391 handle_network_graph_update(network_graph, &event)
393 event_handler(event).await;
396 define_run_body!(persister,
397 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
398 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
399 gossip_sync, peer_manager, logger, scorer, should_break, {
401 _ = channel_manager.get_persistable_update_future().fuse() => true,
402 exit = sleeper(Duration::from_millis(100)).fuse() => {
410 impl BackgroundProcessor {
411 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
414 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
415 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
416 /// either [`join`] or [`stop`].
418 /// # Data Persistence
420 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
421 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
422 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
423 /// provided implementation.
425 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
426 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
427 /// See the `lightning-persister` crate for LDK's provided implementation.
429 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
430 /// error or call [`join`] and handle any error that may arise. For the latter case,
431 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
435 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
436 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
437 /// functionality implemented by other handlers.
438 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
440 /// # Rapid Gossip Sync
442 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
443 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
444 /// until the [`RapidGossipSync`] instance completes its first sync.
446 /// [top-level documentation]: BackgroundProcessor
447 /// [`join`]: Self::join
448 /// [`stop`]: Self::stop
449 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
450 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
451 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
452 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
453 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
454 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
457 CA: 'static + Deref + Send + Sync,
458 CF: 'static + Deref + Send + Sync,
459 CW: 'static + Deref + Send + Sync,
460 T: 'static + Deref + Send + Sync,
461 K: 'static + Deref + Send + Sync,
462 F: 'static + Deref + Send + Sync,
463 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
464 L: 'static + Deref + Send + Sync,
465 P: 'static + Deref + Send + Sync,
466 Descriptor: 'static + SocketDescriptor + Send + Sync,
467 CMH: 'static + Deref + Send + Sync,
468 OMH: 'static + Deref + Send + Sync,
469 RMH: 'static + Deref + Send + Sync,
470 EH: 'static + EventHandler + Send,
471 PS: 'static + Deref + Send,
472 M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
473 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
474 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
475 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
476 UMH: 'static + Deref + Send + Sync,
477 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
478 S: 'static + Deref<Target = SC> + Send + Sync,
479 SC: WriteableScore<'a>,
481 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
482 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
485 CA::Target: 'static + chain::Access,
486 CF::Target: 'static + chain::Filter,
487 CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
488 T::Target: 'static + BroadcasterInterface,
489 K::Target: 'static + KeysInterface,
490 F::Target: 'static + FeeEstimator,
491 L::Target: 'static + Logger,
492 P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
493 CMH::Target: 'static + ChannelMessageHandler,
494 OMH::Target: 'static + OnionMessageHandler,
495 RMH::Target: 'static + RoutingMessageHandler,
496 UMH::Target: 'static + CustomMessageHandler,
497 PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
499 let stop_thread = Arc::new(AtomicBool::new(false));
500 let stop_thread_clone = stop_thread.clone();
501 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
502 let event_handler = |event| {
503 let network_graph = gossip_sync.network_graph();
504 if let Some(network_graph) = network_graph {
505 handle_network_graph_update(network_graph, &event)
507 event_handler.handle_event(event);
509 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
510 channel_manager, channel_manager.process_pending_events(&event_handler),
511 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
512 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
514 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
517 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
518 /// [`ChannelManager`].
522 /// This function panics if the background thread has panicked such as while persisting or
525 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
526 pub fn join(mut self) -> Result<(), std::io::Error> {
527 assert!(self.thread_handle.is_some());
531 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
532 /// [`ChannelManager`].
536 /// This function panics if the background thread has panicked such as while persisting or
539 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
540 pub fn stop(mut self) -> Result<(), std::io::Error> {
541 assert!(self.thread_handle.is_some());
542 self.stop_and_join_thread()
545 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
546 self.stop_thread.store(true, Ordering::Release);
550 fn join_thread(&mut self) -> Result<(), std::io::Error> {
551 match self.thread_handle.take() {
552 Some(handle) => handle.join().unwrap(),
558 impl Drop for BackgroundProcessor {
560 self.stop_and_join_thread().unwrap();
566 use bitcoin::blockdata::block::BlockHeader;
567 use bitcoin::blockdata::constants::genesis_block;
568 use bitcoin::blockdata::locktime::PackedLockTime;
569 use bitcoin::blockdata::transaction::{Transaction, TxOut};
570 use bitcoin::network::constants::Network;
571 use lightning::chain::{BestBlock, Confirm, chainmonitor};
572 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
573 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
574 use lightning::chain::transaction::OutPoint;
575 use lightning::get_event_msg;
576 use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
577 use lightning::ln::features::ChannelFeatures;
578 use lightning::ln::msgs::{ChannelMessageHandler, Init};
579 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
580 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
581 use lightning::routing::router::DefaultRouter;
582 use lightning::util::config::UserConfig;
583 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
584 use lightning::util::ser::Writeable;
585 use lightning::util::test_utils;
586 use lightning::util::persist::KVStorePersister;
587 use lightning_invoice::payment::{InvoicePayer, Retry};
588 use lightning_persister::FilesystemPersister;
590 use std::path::PathBuf;
591 use std::sync::{Arc, Mutex};
592 use std::sync::mpsc::SyncSender;
593 use std::time::Duration;
594 use bitcoin::hashes::Hash;
595 use bitcoin::TxMerkleNode;
596 use lightning::routing::scoring::{FixedPenaltyScorer};
597 use lightning_rapid_gossip_sync::RapidGossipSync;
598 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
600 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
602 #[derive(Clone, Hash, PartialEq, Eq)]
603 struct TestDescriptor{}
604 impl SocketDescriptor for TestDescriptor {
605 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
609 fn disconnect_socket(&mut self) {}
612 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
614 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
615 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
618 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
619 p2p_gossip_sync: PGS,
620 rapid_gossip_sync: RGS,
621 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
622 chain_monitor: Arc<ChainMonitor>,
623 persister: Arc<FilesystemPersister>,
624 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
625 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
626 logger: Arc<test_utils::TestLogger>,
627 best_block: BestBlock,
628 scorer: Arc<Mutex<FixedPenaltyScorer>>,
632 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
633 GossipSync::P2P(self.p2p_gossip_sync.clone())
636 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
637 GossipSync::Rapid(self.rapid_gossip_sync.clone())
640 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
647 let data_dir = self.persister.get_data_dir();
648 match fs::remove_dir_all(data_dir.clone()) {
649 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
656 graph_error: Option<(std::io::ErrorKind, &'static str)>,
657 graph_persistence_notifier: Option<SyncSender<()>>,
658 manager_error: Option<(std::io::ErrorKind, &'static str)>,
659 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
660 filesystem_persister: FilesystemPersister,
664 fn new(data_dir: String) -> Self {
665 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
666 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
669 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
670 Self { graph_error: Some((error, message)), ..self }
673 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
674 Self { graph_persistence_notifier: Some(sender), ..self }
677 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
678 Self { manager_error: Some((error, message)), ..self }
681 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
682 Self { scorer_error: Some((error, message)), ..self }
686 impl KVStorePersister for Persister {
687 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
688 if key == "manager" {
689 if let Some((error, message)) = self.manager_error {
690 return Err(std::io::Error::new(error, message))
694 if key == "network_graph" {
695 if let Some(sender) = &self.graph_persistence_notifier {
696 sender.send(()).unwrap();
699 if let Some((error, message)) = self.graph_error {
700 return Err(std::io::Error::new(error, message))
705 if let Some((error, message)) = self.scorer_error {
706 return Err(std::io::Error::new(error, message))
710 self.filesystem_persister.persist(key, object)
714 fn get_full_filepath(filepath: String, filename: String) -> String {
715 let mut path = PathBuf::from(filepath);
717 path.to_str().unwrap().to_string()
720 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
721 let mut nodes = Vec::new();
722 for i in 0..num_nodes {
723 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
724 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
725 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
726 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
727 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
728 let seed = [i as u8; 32];
729 let network = Network::Testnet;
730 let genesis_block = genesis_block(network);
731 let now = Duration::from_secs(genesis_block.header.time as u64);
732 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
733 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
734 let best_block = BestBlock::from_genesis(network);
735 let params = ChainParameters { network, best_block };
736 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
737 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
738 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
739 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
740 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
741 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
742 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
743 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
747 for i in 0..num_nodes {
748 for j in (i+1)..num_nodes {
749 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
750 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
757 macro_rules! open_channel {
758 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
759 begin_open_channel!($node_a, $node_b, $channel_value);
760 let events = $node_a.node.get_and_clear_pending_events();
761 assert_eq!(events.len(), 1);
762 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
763 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
768 macro_rules! begin_open_channel {
769 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
770 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
771 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
772 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
776 macro_rules! handle_funding_generation_ready {
777 ($event: expr, $channel_value: expr) => {{
779 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
780 assert_eq!(channel_value_satoshis, $channel_value);
781 assert_eq!(user_channel_id, 42);
783 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
784 value: channel_value_satoshis, script_pubkey: output_script.clone(),
786 (temporary_channel_id, tx)
788 _ => panic!("Unexpected event"),
793 macro_rules! end_open_channel {
794 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
795 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
796 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
797 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
801 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
803 let prev_blockhash = node.best_block.block_hash();
804 let height = node.best_block.height() + 1;
805 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
806 let txdata = vec![(0, tx)];
807 node.best_block = BestBlock::new(header.block_hash(), height);
810 node.node.transactions_confirmed(&header, &txdata, height);
811 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
814 node.node.best_block_updated(&header, height);
815 node.chain_monitor.best_block_updated(&header, height);
821 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
822 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
826 fn test_background_processor() {
827 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
828 // updates. Also test that when new updates are available, the manager signals that it needs
829 // re-persistence and is successfully re-persisted.
830 let nodes = create_nodes(2, "test_background_processor".to_string());
832 // Go through the channel creation process so that each node has something to persist. Since
833 // open_channel consumes events, it must complete before starting BackgroundProcessor to
834 // avoid a race with processing events.
835 let tx = open_channel!(nodes[0], nodes[1], 100000);
837 // Initiate the background processors to watch each node.
838 let data_dir = nodes[0].persister.get_data_dir();
839 let persister = Arc::new(Persister::new(data_dir));
840 let event_handler = |_: _| {};
841 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
843 macro_rules! check_persisted_data {
844 ($node: expr, $filepath: expr) => {
845 let mut expected_bytes = Vec::new();
847 expected_bytes.clear();
848 match $node.write(&mut expected_bytes) {
850 match std::fs::read($filepath) {
852 if bytes == expected_bytes {
861 Err(e) => panic!("Unexpected error: {}", e)
867 // Check that the initial channel manager data is persisted as expected.
868 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
869 check_persisted_data!(nodes[0].node, filepath.clone());
872 if !nodes[0].node.get_persistence_condvar_value() { break }
875 // Force-close the channel.
876 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
878 // Check that the force-close updates are persisted.
879 check_persisted_data!(nodes[0].node, filepath.clone());
881 if !nodes[0].node.get_persistence_condvar_value() { break }
884 // Check network graph is persisted
885 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
886 check_persisted_data!(nodes[0].network_graph, filepath.clone());
888 // Check scorer is persisted
889 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
890 check_persisted_data!(nodes[0].scorer, filepath.clone());
892 assert!(bg_processor.stop().is_ok());
896 fn test_timer_tick_called() {
897 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
898 // `FRESHNESS_TIMER`.
899 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
900 let data_dir = nodes[0].persister.get_data_dir();
901 let persister = Arc::new(Persister::new(data_dir));
902 let event_handler = |_: _| {};
903 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
905 let log_entries = nodes[0].logger.lines.lock().unwrap();
906 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
907 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
908 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
909 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
914 assert!(bg_processor.stop().is_ok());
918 fn test_channel_manager_persist_error() {
919 // Test that if we encounter an error during manager persistence, the thread panics.
920 let nodes = create_nodes(2, "test_persist_error".to_string());
921 open_channel!(nodes[0], nodes[1], 100000);
923 let data_dir = nodes[0].persister.get_data_dir();
924 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
925 let event_handler = |_: _| {};
926 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
927 match bg_processor.join() {
928 Ok(_) => panic!("Expected error persisting manager"),
930 assert_eq!(e.kind(), std::io::ErrorKind::Other);
931 assert_eq!(e.get_ref().unwrap().to_string(), "test");
937 fn test_network_graph_persist_error() {
938 // Test that if we encounter an error during network graph persistence, an error gets returned.
939 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
940 let data_dir = nodes[0].persister.get_data_dir();
941 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
942 let event_handler = |_: _| {};
943 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
945 match bg_processor.stop() {
946 Ok(_) => panic!("Expected error persisting network graph"),
948 assert_eq!(e.kind(), std::io::ErrorKind::Other);
949 assert_eq!(e.get_ref().unwrap().to_string(), "test");
955 fn test_scorer_persist_error() {
956 // Test that if we encounter an error during scorer persistence, an error gets returned.
957 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
958 let data_dir = nodes[0].persister.get_data_dir();
959 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
960 let event_handler = |_: _| {};
961 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
963 match bg_processor.stop() {
964 Ok(_) => panic!("Expected error persisting scorer"),
966 assert_eq!(e.kind(), std::io::ErrorKind::Other);
967 assert_eq!(e.get_ref().unwrap().to_string(), "test");
973 fn test_background_event_handling() {
974 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
975 let channel_value = 100000;
976 let data_dir = nodes[0].persister.get_data_dir();
977 let persister = Arc::new(Persister::new(data_dir.clone()));
979 // Set up a background event handler for FundingGenerationReady events.
980 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
981 let event_handler = move |event: Event| match event {
982 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
983 Event::ChannelReady { .. } => {},
984 _ => panic!("Unexpected event: {:?}", event),
987 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
989 // Open a channel and check that the FundingGenerationReady event was handled.
990 begin_open_channel!(nodes[0], nodes[1], channel_value);
991 let (temporary_channel_id, funding_tx) = receiver
992 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
993 .expect("FundingGenerationReady not handled within deadline");
994 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
996 // Confirm the funding transaction.
997 confirm_transaction(&mut nodes[0], &funding_tx);
998 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
999 confirm_transaction(&mut nodes[1], &funding_tx);
1000 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1001 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1002 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1003 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1004 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1006 assert!(bg_processor.stop().is_ok());
1008 // Set up a background event handler for SpendableOutputs events.
1009 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1010 let event_handler = move |event: Event| match event {
1011 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1012 Event::ChannelReady { .. } => {},
1013 Event::ChannelClosed { .. } => {},
1014 _ => panic!("Unexpected event: {:?}", event),
1016 let persister = Arc::new(Persister::new(data_dir));
1017 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1019 // Force close the channel and check that the SpendableOutputs event was handled.
1020 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1021 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1022 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1024 let event = receiver
1025 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1026 .expect("Events not handled within deadline");
1028 Event::SpendableOutputs { .. } => {},
1029 _ => panic!("Unexpected event: {:?}", event),
1032 assert!(bg_processor.stop().is_ok());
1036 fn test_scorer_persistence() {
1037 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1038 let data_dir = nodes[0].persister.get_data_dir();
1039 let persister = Arc::new(Persister::new(data_dir));
1040 let event_handler = |_: _| {};
1041 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1044 let log_entries = nodes[0].logger.lines.lock().unwrap();
1045 let expected_log = "Persisting scorer".to_string();
1046 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1051 assert!(bg_processor.stop().is_ok());
1055 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1056 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1057 let data_dir = nodes[0].persister.get_data_dir();
1058 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1059 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1060 let network_graph = nodes[0].network_graph.clone();
1061 let features = ChannelFeatures::empty();
1062 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1063 .expect("Failed to update channel from partial announcement");
1064 let original_graph_description = network_graph.to_string();
1065 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1066 assert_eq!(network_graph.read_only().channels().len(), 1);
1068 let event_handler = |_: _| {};
1069 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1072 let log_entries = nodes[0].logger.lines.lock().unwrap();
1073 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1074 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1077 // Wait until the loop has gone around at least twice.
1082 let initialization_input = vec![
1083 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1084 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1085 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1086 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1087 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1088 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1089 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1090 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1091 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1092 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1093 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1094 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1095 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1097 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1099 // this should have added two channels
1100 assert_eq!(network_graph.read_only().channels().len(), 3);
1103 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1104 .expect("Network graph not pruned within deadline");
1106 background_processor.stop().unwrap();
1108 // all channels should now be pruned
1109 assert_eq!(network_graph.read_only().channels().len(), 0);
1113 fn test_invoice_payer() {
1114 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1115 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1116 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1118 // Initiate the background processors to watch each node.
1119 let data_dir = nodes[0].persister.get_data_dir();
1120 let persister = Arc::new(Persister::new(data_dir));
1121 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1122 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1123 let event_handler = Arc::clone(&invoice_payer);
1124 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1125 assert!(bg_processor.stop().is_ok());