1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::KeysInterface;
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 /// writing it to disk/backups by invoking the callback given to it at startup.
46 /// [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 /// at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66 stop_thread: Arc<AtomicBool>,
67 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
71 const FRESHNESS_TIMER: u64 = 60;
73 const FRESHNESS_TIMER: u64 = 1;
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
83 const PING_TIMER: u64 = 1;
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
89 const SCORER_PERSIST_TIMER: u64 = 30;
91 const SCORER_PERSIST_TIMER: u64 = 1;
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 P: Deref<Target = P2PGossipSync<G, A, L>>,
101 R: Deref<Target = RapidGossipSync<G, L>>,
102 G: Deref<Target = NetworkGraph<L>>,
106 where A::Target: chain::Access, L::Target: Logger {
107 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109 /// Rapid gossip sync from a trusted server.
116 P: Deref<Target = P2PGossipSync<G, A, L>>,
117 R: Deref<Target = RapidGossipSync<G, L>>,
118 G: Deref<Target = NetworkGraph<L>>,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123 fn network_graph(&self) -> Option<&G> {
125 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::None => None,
131 fn prunable_network_graph(&self) -> Option<&G> {
133 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134 GossipSync::Rapid(gossip_sync) => {
135 if gossip_sync.is_initial_sync_complete() {
136 Some(gossip_sync.network_graph())
141 GossipSync::None => None,
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 A::Target: chain::Access,
153 /// Initializes a new [`GossipSync::P2P`] variant.
154 pub fn p2p(gossip_sync: P) -> Self {
155 GossipSync::P2P(gossip_sync)
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
165 &'a (dyn chain::Access + Send + Sync),
171 /// Initializes a new [`GossipSync::Rapid`] variant.
172 pub fn rapid(gossip_sync: R) -> Self {
173 GossipSync::Rapid(gossip_sync)
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
180 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183 &'a (dyn chain::Access + Send + Sync),
189 /// Initializes a new [`GossipSync::None`] variant.
190 pub fn none() -> Self {
195 fn handle_network_graph_update<L: Deref>(
196 network_graph: &NetworkGraph<L>, event: &Event
197 ) where L::Target: Logger {
198 if let Event::PaymentPathFailed { ref network_update, .. } = event {
199 if let Some(network_update) = network_update {
200 network_graph.handle_network_update(&network_update);
205 macro_rules! define_run_body {
206 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
207 $channel_manager: ident, $process_channel_manager_events: expr,
208 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
209 $loop_exit_check: expr, $await: expr)
211 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
212 $channel_manager.timer_tick_occurred();
214 let mut last_freshness_call = Instant::now();
215 let mut last_ping_call = Instant::now();
216 let mut last_prune_call = Instant::now();
217 let mut last_scorer_persist_call = Instant::now();
218 let mut have_pruned = false;
221 $process_channel_manager_events;
222 $process_chain_monitor_events;
224 // Note that the PeerManager::process_events may block on ChannelManager's locks,
225 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
226 // we want to ensure we get into `persist_manager` as quickly as we can, especially
227 // without running the normal event processing above and handing events to users.
229 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
230 // processing a message effectively at any point during this loop. In order to
231 // minimize the time between such processing completing and persisting the updated
232 // ChannelManager, we want to minimize methods blocking on a ChannelManager
233 // generally, and as a fallback place such blocking only immediately before
235 $peer_manager.process_events();
237 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
238 // see `await_start`'s use below.
239 let await_start = Instant::now();
240 let updates_available = $await;
241 let await_time = await_start.elapsed();
243 if updates_available {
244 log_trace!($logger, "Persisting ChannelManager...");
245 $persister.persist_manager(&*$channel_manager)?;
246 log_trace!($logger, "Done persisting ChannelManager.");
248 // Exit the loop if the background processor was requested to stop.
249 if $loop_exit_check {
250 log_trace!($logger, "Terminating background processor.");
253 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
254 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
255 $channel_manager.timer_tick_occurred();
256 last_freshness_call = Instant::now();
258 if await_time > Duration::from_secs(1) {
259 // On various platforms, we may be starved of CPU cycles for several reasons.
260 // E.g. on iOS, if we've been in the background, we will be entirely paused.
261 // Similarly, if we're on a desktop platform and the device has been asleep, we
262 // may not get any cycles.
263 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
264 // full second, at which point we assume sockets may have been killed (they
265 // appear to be at least on some platforms, even if it has only been a second).
266 // Note that we have to take care to not get here just because user event
267 // processing was slow at the top of the loop. For example, the sample client
268 // may call Bitcoin Core RPCs during event handling, which very often takes
269 // more than a handful of seconds to complete, and shouldn't disconnect all our
271 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
272 $peer_manager.disconnect_all_peers();
273 last_ping_call = Instant::now();
274 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
275 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
276 $peer_manager.timer_tick_occurred();
277 last_ping_call = Instant::now();
280 // Note that we want to run a graph prune once not long after startup before
281 // falling back to our usual hourly prunes. This avoids short-lived clients never
282 // pruning their network graph. We run once 60 seconds after startup before
283 // continuing our normal cadence.
284 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
285 // The network graph must not be pruned while rapid sync completion is pending
286 log_trace!($logger, "Assessing prunability of network graph");
287 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
288 network_graph.remove_stale_channels_and_tracking();
290 if let Err(e) = $persister.persist_graph(network_graph) {
291 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
294 last_prune_call = Instant::now();
297 log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
301 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
302 if let Some(ref scorer) = $scorer {
303 log_trace!($logger, "Persisting scorer");
304 if let Err(e) = $persister.persist_scorer(&scorer) {
305 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
308 last_scorer_persist_call = Instant::now();
312 // After we exit, ensure we persist the ChannelManager one final time - this avoids
313 // some races where users quit while channel updates were in-flight, with
314 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
315 $persister.persist_manager(&*$channel_manager)?;
317 // Persist Scorer on exit
318 if let Some(ref scorer) = $scorer {
319 $persister.persist_scorer(&scorer)?;
322 // Persist NetworkGraph on exit
323 if let Some(network_graph) = $gossip_sync.network_graph() {
324 $persister.persist_graph(network_graph)?;
331 /// Processes background events in a future.
333 /// `sleeper` should return a future which completes in the given amount of time and returns a
334 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
335 /// future which outputs true, the loop will exit and this function's future will complete.
337 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
338 #[cfg(feature = "futures")]
339 pub async fn process_events_async<
341 CA: 'static + Deref + Send + Sync,
342 CF: 'static + Deref + Send + Sync,
343 CW: 'static + Deref + Send + Sync,
344 T: 'static + Deref + Send + Sync,
345 K: 'static + Deref + Send + Sync,
346 F: 'static + Deref + Send + Sync,
347 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
348 L: 'static + Deref + Send + Sync,
349 P: 'static + Deref + Send + Sync,
350 Descriptor: 'static + SocketDescriptor + Send + Sync,
351 CMH: 'static + Deref + Send + Sync,
352 RMH: 'static + Deref + Send + Sync,
353 OMH: 'static + Deref + Send + Sync,
354 EventHandlerFuture: core::future::Future<Output = ()>,
355 EventHandler: Fn(Event) -> EventHandlerFuture,
356 PS: 'static + Deref + Send,
357 M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
358 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
359 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
360 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
361 UMH: 'static + Deref + Send + Sync,
362 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
363 S: 'static + Deref<Target = SC> + Send + Sync,
364 SC: WriteableScore<'a>,
365 SleepFuture: core::future::Future<Output = bool>,
366 Sleeper: Fn(Duration) -> SleepFuture
368 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
369 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
371 ) -> Result<(), std::io::Error>
373 CA::Target: 'static + chain::Access,
374 CF::Target: 'static + chain::Filter,
375 CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
376 T::Target: 'static + BroadcasterInterface,
377 K::Target: 'static + KeysInterface,
378 F::Target: 'static + FeeEstimator,
379 L::Target: 'static + Logger,
380 P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
381 CMH::Target: 'static + ChannelMessageHandler,
382 OMH::Target: 'static + OnionMessageHandler,
383 RMH::Target: 'static + RoutingMessageHandler,
384 UMH::Target: 'static + CustomMessageHandler,
385 PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
387 let mut should_break = true;
388 let async_event_handler = |event| {
389 let network_graph = gossip_sync.network_graph();
390 let event_handler = &event_handler;
392 if let Some(network_graph) = network_graph {
393 handle_network_graph_update(network_graph, &event)
395 event_handler(event).await;
398 define_run_body!(persister,
399 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
400 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
401 gossip_sync, peer_manager, logger, scorer, should_break, {
403 _ = channel_manager.get_persistable_update_future().fuse() => true,
404 exit = sleeper(Duration::from_millis(100)).fuse() => {
412 impl BackgroundProcessor {
413 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
416 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
417 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
418 /// either [`join`] or [`stop`].
420 /// # Data Persistence
422 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
423 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
424 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
425 /// provided implementation.
427 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
428 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
429 /// See the `lightning-persister` crate for LDK's provided implementation.
431 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
432 /// error or call [`join`] and handle any error that may arise. For the latter case,
433 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
437 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
438 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
439 /// functionality implemented by other handlers.
440 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
442 /// # Rapid Gossip Sync
444 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
445 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
446 /// until the [`RapidGossipSync`] instance completes its first sync.
448 /// [top-level documentation]: BackgroundProcessor
449 /// [`join`]: Self::join
450 /// [`stop`]: Self::stop
451 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
452 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
453 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
454 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
455 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
456 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
459 CA: 'static + Deref + Send + Sync,
460 CF: 'static + Deref + Send + Sync,
461 CW: 'static + Deref + Send + Sync,
462 T: 'static + Deref + Send + Sync,
463 K: 'static + Deref + Send + Sync,
464 F: 'static + Deref + Send + Sync,
465 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
466 L: 'static + Deref + Send + Sync,
467 P: 'static + Deref + Send + Sync,
468 Descriptor: 'static + SocketDescriptor + Send + Sync,
469 CMH: 'static + Deref + Send + Sync,
470 OMH: 'static + Deref + Send + Sync,
471 RMH: 'static + Deref + Send + Sync,
472 EH: 'static + EventHandler + Send,
473 PS: 'static + Deref + Send,
474 M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
475 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
476 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
477 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
478 UMH: 'static + Deref + Send + Sync,
479 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
480 S: 'static + Deref<Target = SC> + Send + Sync,
481 SC: WriteableScore<'a>,
483 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
484 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
487 CA::Target: 'static + chain::Access,
488 CF::Target: 'static + chain::Filter,
489 CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
490 T::Target: 'static + BroadcasterInterface,
491 K::Target: 'static + KeysInterface,
492 F::Target: 'static + FeeEstimator,
493 L::Target: 'static + Logger,
494 P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
495 CMH::Target: 'static + ChannelMessageHandler,
496 OMH::Target: 'static + OnionMessageHandler,
497 RMH::Target: 'static + RoutingMessageHandler,
498 UMH::Target: 'static + CustomMessageHandler,
499 PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
501 let stop_thread = Arc::new(AtomicBool::new(false));
502 let stop_thread_clone = stop_thread.clone();
503 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
504 let event_handler = |event| {
505 let network_graph = gossip_sync.network_graph();
506 if let Some(network_graph) = network_graph {
507 handle_network_graph_update(network_graph, &event)
509 event_handler.handle_event(event);
511 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
512 channel_manager, channel_manager.process_pending_events(&event_handler),
513 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
514 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
516 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
519 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
520 /// [`ChannelManager`].
524 /// This function panics if the background thread has panicked such as while persisting or
527 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
528 pub fn join(mut self) -> Result<(), std::io::Error> {
529 assert!(self.thread_handle.is_some());
533 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
534 /// [`ChannelManager`].
538 /// This function panics if the background thread has panicked such as while persisting or
541 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
542 pub fn stop(mut self) -> Result<(), std::io::Error> {
543 assert!(self.thread_handle.is_some());
544 self.stop_and_join_thread()
547 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
548 self.stop_thread.store(true, Ordering::Release);
552 fn join_thread(&mut self) -> Result<(), std::io::Error> {
553 match self.thread_handle.take() {
554 Some(handle) => handle.join().unwrap(),
560 impl Drop for BackgroundProcessor {
562 self.stop_and_join_thread().unwrap();
568 use bitcoin::blockdata::block::BlockHeader;
569 use bitcoin::blockdata::constants::genesis_block;
570 use bitcoin::blockdata::locktime::PackedLockTime;
571 use bitcoin::blockdata::transaction::{Transaction, TxOut};
572 use bitcoin::network::constants::Network;
573 use lightning::chain::{BestBlock, Confirm, chainmonitor};
574 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
575 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
576 use lightning::chain::transaction::OutPoint;
577 use lightning::get_event_msg;
578 use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
579 use lightning::ln::features::ChannelFeatures;
580 use lightning::ln::msgs::{ChannelMessageHandler, Init};
581 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
582 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
583 use lightning::util::config::UserConfig;
584 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
585 use lightning::util::ser::Writeable;
586 use lightning::util::test_utils;
587 use lightning::util::persist::KVStorePersister;
588 use lightning_invoice::payment::{InvoicePayer, Retry};
589 use lightning_invoice::utils::DefaultRouter;
590 use lightning_persister::FilesystemPersister;
592 use std::path::PathBuf;
593 use std::sync::{Arc, Mutex};
594 use std::sync::mpsc::SyncSender;
595 use std::time::Duration;
596 use bitcoin::hashes::Hash;
597 use bitcoin::TxMerkleNode;
598 use lightning::routing::scoring::{FixedPenaltyScorer};
599 use lightning_rapid_gossip_sync::RapidGossipSync;
600 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
602 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
604 #[derive(Clone, Hash, PartialEq, Eq)]
605 struct TestDescriptor{}
606 impl SocketDescriptor for TestDescriptor {
607 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
611 fn disconnect_socket(&mut self) {}
614 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
616 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
617 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
620 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
621 p2p_gossip_sync: PGS,
622 rapid_gossip_sync: RGS,
623 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
624 chain_monitor: Arc<ChainMonitor>,
625 persister: Arc<FilesystemPersister>,
626 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
627 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
628 logger: Arc<test_utils::TestLogger>,
629 best_block: BestBlock,
630 scorer: Arc<Mutex<FixedPenaltyScorer>>,
634 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
635 GossipSync::P2P(self.p2p_gossip_sync.clone())
638 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
639 GossipSync::Rapid(self.rapid_gossip_sync.clone())
642 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
649 let data_dir = self.persister.get_data_dir();
650 match fs::remove_dir_all(data_dir.clone()) {
651 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
658 graph_error: Option<(std::io::ErrorKind, &'static str)>,
659 graph_persistence_notifier: Option<SyncSender<()>>,
660 manager_error: Option<(std::io::ErrorKind, &'static str)>,
661 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
662 filesystem_persister: FilesystemPersister,
666 fn new(data_dir: String) -> Self {
667 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
668 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
671 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
672 Self { graph_error: Some((error, message)), ..self }
675 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
676 Self { graph_persistence_notifier: Some(sender), ..self }
679 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
680 Self { manager_error: Some((error, message)), ..self }
683 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
684 Self { scorer_error: Some((error, message)), ..self }
688 impl KVStorePersister for Persister {
689 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
690 if key == "manager" {
691 if let Some((error, message)) = self.manager_error {
692 return Err(std::io::Error::new(error, message))
696 if key == "network_graph" {
697 if let Some(sender) = &self.graph_persistence_notifier {
698 sender.send(()).unwrap();
701 if let Some((error, message)) = self.graph_error {
702 return Err(std::io::Error::new(error, message))
707 if let Some((error, message)) = self.scorer_error {
708 return Err(std::io::Error::new(error, message))
712 self.filesystem_persister.persist(key, object)
716 fn get_full_filepath(filepath: String, filename: String) -> String {
717 let mut path = PathBuf::from(filepath);
719 path.to_str().unwrap().to_string()
722 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
723 let mut nodes = Vec::new();
724 for i in 0..num_nodes {
725 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
726 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
727 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
728 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
729 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
730 let seed = [i as u8; 32];
731 let network = Network::Testnet;
732 let genesis_block = genesis_block(network);
733 let now = Duration::from_secs(genesis_block.header.time as u64);
734 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
735 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
736 let best_block = BestBlock::from_genesis(network);
737 let params = ChainParameters { network, best_block };
738 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
739 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
740 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
741 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
742 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
743 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
744 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
745 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
749 for i in 0..num_nodes {
750 for j in (i+1)..num_nodes {
751 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
752 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
759 macro_rules! open_channel {
760 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
761 begin_open_channel!($node_a, $node_b, $channel_value);
762 let events = $node_a.node.get_and_clear_pending_events();
763 assert_eq!(events.len(), 1);
764 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
765 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
770 macro_rules! begin_open_channel {
771 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
772 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
773 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
774 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
778 macro_rules! handle_funding_generation_ready {
779 ($event: expr, $channel_value: expr) => {{
781 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
782 assert_eq!(channel_value_satoshis, $channel_value);
783 assert_eq!(user_channel_id, 42);
785 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
786 value: channel_value_satoshis, script_pubkey: output_script.clone(),
788 (temporary_channel_id, tx)
790 _ => panic!("Unexpected event"),
795 macro_rules! end_open_channel {
796 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
797 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
798 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
799 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
803 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
805 let prev_blockhash = node.best_block.block_hash();
806 let height = node.best_block.height() + 1;
807 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
808 let txdata = vec![(0, tx)];
809 node.best_block = BestBlock::new(header.block_hash(), height);
812 node.node.transactions_confirmed(&header, &txdata, height);
813 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
816 node.node.best_block_updated(&header, height);
817 node.chain_monitor.best_block_updated(&header, height);
823 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
824 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
828 fn test_background_processor() {
829 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
830 // updates. Also test that when new updates are available, the manager signals that it needs
831 // re-persistence and is successfully re-persisted.
832 let nodes = create_nodes(2, "test_background_processor".to_string());
834 // Go through the channel creation process so that each node has something to persist. Since
835 // open_channel consumes events, it must complete before starting BackgroundProcessor to
836 // avoid a race with processing events.
837 let tx = open_channel!(nodes[0], nodes[1], 100000);
839 // Initiate the background processors to watch each node.
840 let data_dir = nodes[0].persister.get_data_dir();
841 let persister = Arc::new(Persister::new(data_dir));
842 let event_handler = |_: _| {};
843 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
845 macro_rules! check_persisted_data {
846 ($node: expr, $filepath: expr) => {
847 let mut expected_bytes = Vec::new();
849 expected_bytes.clear();
850 match $node.write(&mut expected_bytes) {
852 match std::fs::read($filepath) {
854 if bytes == expected_bytes {
863 Err(e) => panic!("Unexpected error: {}", e)
869 // Check that the initial channel manager data is persisted as expected.
870 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
871 check_persisted_data!(nodes[0].node, filepath.clone());
874 if !nodes[0].node.get_persistence_condvar_value() { break }
877 // Force-close the channel.
878 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
880 // Check that the force-close updates are persisted.
881 check_persisted_data!(nodes[0].node, filepath.clone());
883 if !nodes[0].node.get_persistence_condvar_value() { break }
886 // Check network graph is persisted
887 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
888 check_persisted_data!(nodes[0].network_graph, filepath.clone());
890 // Check scorer is persisted
891 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
892 check_persisted_data!(nodes[0].scorer, filepath.clone());
894 assert!(bg_processor.stop().is_ok());
898 fn test_timer_tick_called() {
899 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
900 // `FRESHNESS_TIMER`.
901 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
902 let data_dir = nodes[0].persister.get_data_dir();
903 let persister = Arc::new(Persister::new(data_dir));
904 let event_handler = |_: _| {};
905 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
907 let log_entries = nodes[0].logger.lines.lock().unwrap();
908 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
909 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
910 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
911 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
916 assert!(bg_processor.stop().is_ok());
920 fn test_channel_manager_persist_error() {
921 // Test that if we encounter an error during manager persistence, the thread panics.
922 let nodes = create_nodes(2, "test_persist_error".to_string());
923 open_channel!(nodes[0], nodes[1], 100000);
925 let data_dir = nodes[0].persister.get_data_dir();
926 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
927 let event_handler = |_: _| {};
928 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
929 match bg_processor.join() {
930 Ok(_) => panic!("Expected error persisting manager"),
932 assert_eq!(e.kind(), std::io::ErrorKind::Other);
933 assert_eq!(e.get_ref().unwrap().to_string(), "test");
939 fn test_network_graph_persist_error() {
940 // Test that if we encounter an error during network graph persistence, an error gets returned.
941 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
942 let data_dir = nodes[0].persister.get_data_dir();
943 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
944 let event_handler = |_: _| {};
945 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
947 match bg_processor.stop() {
948 Ok(_) => panic!("Expected error persisting network graph"),
950 assert_eq!(e.kind(), std::io::ErrorKind::Other);
951 assert_eq!(e.get_ref().unwrap().to_string(), "test");
957 fn test_scorer_persist_error() {
958 // Test that if we encounter an error during scorer persistence, an error gets returned.
959 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
960 let data_dir = nodes[0].persister.get_data_dir();
961 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
962 let event_handler = |_: _| {};
963 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
965 match bg_processor.stop() {
966 Ok(_) => panic!("Expected error persisting scorer"),
968 assert_eq!(e.kind(), std::io::ErrorKind::Other);
969 assert_eq!(e.get_ref().unwrap().to_string(), "test");
975 fn test_background_event_handling() {
976 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
977 let channel_value = 100000;
978 let data_dir = nodes[0].persister.get_data_dir();
979 let persister = Arc::new(Persister::new(data_dir.clone()));
981 // Set up a background event handler for FundingGenerationReady events.
982 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
983 let event_handler = move |event: Event| match event {
984 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
985 Event::ChannelReady { .. } => {},
986 _ => panic!("Unexpected event: {:?}", event),
989 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
991 // Open a channel and check that the FundingGenerationReady event was handled.
992 begin_open_channel!(nodes[0], nodes[1], channel_value);
993 let (temporary_channel_id, funding_tx) = receiver
994 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
995 .expect("FundingGenerationReady not handled within deadline");
996 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
998 // Confirm the funding transaction.
999 confirm_transaction(&mut nodes[0], &funding_tx);
1000 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1001 confirm_transaction(&mut nodes[1], &funding_tx);
1002 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1003 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1004 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1005 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1006 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1008 assert!(bg_processor.stop().is_ok());
1010 // Set up a background event handler for SpendableOutputs events.
1011 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1012 let event_handler = move |event: Event| match event {
1013 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1014 Event::ChannelReady { .. } => {},
1015 Event::ChannelClosed { .. } => {},
1016 _ => panic!("Unexpected event: {:?}", event),
1018 let persister = Arc::new(Persister::new(data_dir));
1019 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1021 // Force close the channel and check that the SpendableOutputs event was handled.
1022 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1023 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1024 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1026 let event = receiver
1027 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1028 .expect("Events not handled within deadline");
1030 Event::SpendableOutputs { .. } => {},
1031 _ => panic!("Unexpected event: {:?}", event),
1034 assert!(bg_processor.stop().is_ok());
1038 fn test_scorer_persistence() {
1039 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1040 let data_dir = nodes[0].persister.get_data_dir();
1041 let persister = Arc::new(Persister::new(data_dir));
1042 let event_handler = |_: _| {};
1043 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1046 let log_entries = nodes[0].logger.lines.lock().unwrap();
1047 let expected_log = "Persisting scorer".to_string();
1048 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1053 assert!(bg_processor.stop().is_ok());
1057 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1058 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1059 let data_dir = nodes[0].persister.get_data_dir();
1060 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1061 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1062 let network_graph = nodes[0].network_graph.clone();
1063 let features = ChannelFeatures::empty();
1064 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1065 .expect("Failed to update channel from partial announcement");
1066 let original_graph_description = network_graph.to_string();
1067 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1068 assert_eq!(network_graph.read_only().channels().len(), 1);
1070 let event_handler = |_: _| {};
1071 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1074 let log_entries = nodes[0].logger.lines.lock().unwrap();
1075 let expected_log_a = "Assessing prunability of network graph".to_string();
1076 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1077 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1078 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1083 let initialization_input = vec![
1084 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1085 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1086 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1087 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1088 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1089 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1090 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1091 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1092 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1093 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1094 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1095 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1096 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1098 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1100 // this should have added two channels
1101 assert_eq!(network_graph.read_only().channels().len(), 3);
1104 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1105 .expect("Network graph not pruned within deadline");
1107 background_processor.stop().unwrap();
1109 // all channels should now be pruned
1110 assert_eq!(network_graph.read_only().channels().len(), 0);
1114 fn test_invoice_payer() {
1115 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1116 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1117 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1119 // Initiate the background processors to watch each node.
1120 let data_dir = nodes[0].persister.get_data_dir();
1121 let persister = Arc::new(Persister::new(data_dir));
1122 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1123 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1124 let event_handler = Arc::clone(&invoice_payer);
1125 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1126 assert!(bg_processor.stop().is_ok());