1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 /// writing it to disk/backups by invoking the callback given to it at startup.
46 /// [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 /// at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66 stop_thread: Arc<AtomicBool>,
67 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
71 const FRESHNESS_TIMER: u64 = 60;
73 const FRESHNESS_TIMER: u64 = 1;
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
83 const PING_TIMER: u64 = 1;
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
89 const SCORER_PERSIST_TIMER: u64 = 30;
91 const SCORER_PERSIST_TIMER: u64 = 1;
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 P: Deref<Target = P2PGossipSync<G, A, L>>,
101 R: Deref<Target = RapidGossipSync<G, L>>,
102 G: Deref<Target = NetworkGraph<L>>,
106 where A::Target: chain::Access, L::Target: Logger {
107 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109 /// Rapid gossip sync from a trusted server.
116 P: Deref<Target = P2PGossipSync<G, A, L>>,
117 R: Deref<Target = RapidGossipSync<G, L>>,
118 G: Deref<Target = NetworkGraph<L>>,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123 fn network_graph(&self) -> Option<&G> {
125 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::None => None,
131 fn prunable_network_graph(&self) -> Option<&G> {
133 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134 GossipSync::Rapid(gossip_sync) => {
135 if gossip_sync.is_initial_sync_complete() {
136 Some(gossip_sync.network_graph())
141 GossipSync::None => None,
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 A::Target: chain::Access,
153 /// Initializes a new [`GossipSync::P2P`] variant.
154 pub fn p2p(gossip_sync: P) -> Self {
155 GossipSync::P2P(gossip_sync)
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
165 &'a (dyn chain::Access + Send + Sync),
171 /// Initializes a new [`GossipSync::Rapid`] variant.
172 pub fn rapid(gossip_sync: R) -> Self {
173 GossipSync::Rapid(gossip_sync)
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
180 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183 &'a (dyn chain::Access + Send + Sync),
189 /// Initializes a new [`GossipSync::None`] variant.
190 pub fn none() -> Self {
195 fn handle_network_graph_update<L: Deref>(
196 network_graph: &NetworkGraph<L>, event: &Event
197 ) where L::Target: Logger {
198 if let Event::PaymentPathFailed { ref network_update, .. } = event {
199 if let Some(network_update) = network_update {
200 network_graph.handle_network_update(&network_update);
205 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
206 struct DecoratingEventHandler<
209 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
210 RGS: Deref<Target = RapidGossipSync<G, L>>,
211 G: Deref<Target = NetworkGraph<L>>,
215 where A::Target: chain::Access, L::Target: Logger {
217 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
223 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
224 RGS: Deref<Target = RapidGossipSync<G, L>>,
225 G: Deref<Target = NetworkGraph<L>>,
228 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
229 where A::Target: chain::Access, L::Target: Logger {
230 fn handle_event(&self, event: &Event) {
231 if let Some(network_graph) = self.gossip_sync.network_graph() {
232 handle_network_graph_update(network_graph, &event)
234 self.event_handler.handle_event(event);
238 macro_rules! define_run_body {
239 ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
240 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
241 $loop_exit_check: expr, $await: expr)
243 let event_handler = DecoratingEventHandler {
244 event_handler: $event_handler,
245 gossip_sync: &$gossip_sync,
248 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
249 $channel_manager.timer_tick_occurred();
251 let mut last_freshness_call = Instant::now();
252 let mut last_ping_call = Instant::now();
253 let mut last_prune_call = Instant::now();
254 let mut last_scorer_persist_call = Instant::now();
255 let mut have_pruned = false;
258 $channel_manager.process_pending_events(&event_handler);
259 $chain_monitor.process_pending_events(&event_handler);
261 // Note that the PeerManager::process_events may block on ChannelManager's locks,
262 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
263 // we want to ensure we get into `persist_manager` as quickly as we can, especially
264 // without running the normal event processing above and handing events to users.
266 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
267 // processing a message effectively at any point during this loop. In order to
268 // minimize the time between such processing completing and persisting the updated
269 // ChannelManager, we want to minimize methods blocking on a ChannelManager
270 // generally, and as a fallback place such blocking only immediately before
272 $peer_manager.process_events();
274 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
275 // see `await_start`'s use below.
276 let await_start = Instant::now();
277 let updates_available = $await;
278 let await_time = await_start.elapsed();
280 if updates_available {
281 log_trace!($logger, "Persisting ChannelManager...");
282 $persister.persist_manager(&*$channel_manager)?;
283 log_trace!($logger, "Done persisting ChannelManager.");
285 // Exit the loop if the background processor was requested to stop.
286 if $loop_exit_check {
287 log_trace!($logger, "Terminating background processor.");
290 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
291 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
292 $channel_manager.timer_tick_occurred();
293 last_freshness_call = Instant::now();
295 if await_time > Duration::from_secs(1) {
296 // On various platforms, we may be starved of CPU cycles for several reasons.
297 // E.g. on iOS, if we've been in the background, we will be entirely paused.
298 // Similarly, if we're on a desktop platform and the device has been asleep, we
299 // may not get any cycles.
300 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
301 // full second, at which point we assume sockets may have been killed (they
302 // appear to be at least on some platforms, even if it has only been a second).
303 // Note that we have to take care to not get here just because user event
304 // processing was slow at the top of the loop. For example, the sample client
305 // may call Bitcoin Core RPCs during event handling, which very often takes
306 // more than a handful of seconds to complete, and shouldn't disconnect all our
308 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
309 $peer_manager.disconnect_all_peers();
310 last_ping_call = Instant::now();
311 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
312 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
313 $peer_manager.timer_tick_occurred();
314 last_ping_call = Instant::now();
317 // Note that we want to run a graph prune once not long after startup before
318 // falling back to our usual hourly prunes. This avoids short-lived clients never
319 // pruning their network graph. We run once 60 seconds after startup before
320 // continuing our normal cadence.
321 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
322 // The network graph must not be pruned while rapid sync completion is pending
323 log_trace!($logger, "Assessing prunability of network graph");
324 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
325 network_graph.remove_stale_channels_and_tracking();
327 if let Err(e) = $persister.persist_graph(network_graph) {
328 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
331 last_prune_call = Instant::now();
334 log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
338 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
339 if let Some(ref scorer) = $scorer {
340 log_trace!($logger, "Persisting scorer");
341 if let Err(e) = $persister.persist_scorer(&scorer) {
342 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
345 last_scorer_persist_call = Instant::now();
349 // After we exit, ensure we persist the ChannelManager one final time - this avoids
350 // some races where users quit while channel updates were in-flight, with
351 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
352 $persister.persist_manager(&*$channel_manager)?;
354 // Persist Scorer on exit
355 if let Some(ref scorer) = $scorer {
356 $persister.persist_scorer(&scorer)?;
359 // Persist NetworkGraph on exit
360 if let Some(network_graph) = $gossip_sync.network_graph() {
361 $persister.persist_graph(network_graph)?;
368 /// Processes background events in a future.
370 /// `sleeper` should return a future which completes in the given amount of time and returns a
371 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
372 /// future which outputs true, the loop will exit and this function's future will complete.
374 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
375 #[cfg(feature = "futures")]
376 pub async fn process_events_async<
378 Signer: 'static + Sign,
379 CA: 'static + Deref + Send + Sync,
380 CF: 'static + Deref + Send + Sync,
381 CW: 'static + Deref + Send + Sync,
382 T: 'static + Deref + Send + Sync,
383 K: 'static + Deref + Send + Sync,
384 F: 'static + Deref + Send + Sync,
385 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
386 L: 'static + Deref + Send + Sync,
387 P: 'static + Deref + Send + Sync,
388 Descriptor: 'static + SocketDescriptor + Send + Sync,
389 CMH: 'static + Deref + Send + Sync,
390 RMH: 'static + Deref + Send + Sync,
391 OMH: 'static + Deref + Send + Sync,
392 EH: 'static + EventHandler + Send,
393 PS: 'static + Deref + Send,
394 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
395 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
396 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
397 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
398 UMH: 'static + Deref + Send + Sync,
399 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
400 S: 'static + Deref<Target = SC> + Send + Sync,
401 SC: WriteableScore<'a>,
402 SleepFuture: core::future::Future<Output = bool>,
403 Sleeper: Fn(Duration) -> SleepFuture
405 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
406 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
408 ) -> Result<(), std::io::Error>
410 CA::Target: 'static + chain::Access,
411 CF::Target: 'static + chain::Filter,
412 CW::Target: 'static + chain::Watch<Signer>,
413 T::Target: 'static + BroadcasterInterface,
414 K::Target: 'static + KeysInterface<Signer = Signer>,
415 F::Target: 'static + FeeEstimator,
416 L::Target: 'static + Logger,
417 P::Target: 'static + Persist<Signer>,
418 CMH::Target: 'static + ChannelMessageHandler,
419 OMH::Target: 'static + OnionMessageHandler,
420 RMH::Target: 'static + RoutingMessageHandler,
421 UMH::Target: 'static + CustomMessageHandler,
422 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
424 let mut should_break = true;
425 define_run_body!(persister, event_handler, chain_monitor, channel_manager,
426 gossip_sync, peer_manager, logger, scorer, should_break, {
428 _ = channel_manager.get_persistable_update_future().fuse() => true,
429 exit = sleeper(Duration::from_millis(100)).fuse() => {
437 impl BackgroundProcessor {
438 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
441 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
442 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
443 /// either [`join`] or [`stop`].
445 /// # Data Persistence
447 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
448 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
449 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
450 /// provided implementation.
452 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
453 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
454 /// See the `lightning-persister` crate for LDK's provided implementation.
456 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
457 /// error or call [`join`] and handle any error that may arise. For the latter case,
458 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
462 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
463 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
464 /// functionality implemented by other handlers.
465 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
467 /// # Rapid Gossip Sync
469 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
470 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
471 /// until the [`RapidGossipSync`] instance completes its first sync.
473 /// [top-level documentation]: BackgroundProcessor
474 /// [`join`]: Self::join
475 /// [`stop`]: Self::stop
476 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
477 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
478 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
479 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
480 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
481 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
484 Signer: 'static + Sign,
485 CA: 'static + Deref + Send + Sync,
486 CF: 'static + Deref + Send + Sync,
487 CW: 'static + Deref + Send + Sync,
488 T: 'static + Deref + Send + Sync,
489 K: 'static + Deref + Send + Sync,
490 F: 'static + Deref + Send + Sync,
491 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
492 L: 'static + Deref + Send + Sync,
493 P: 'static + Deref + Send + Sync,
494 Descriptor: 'static + SocketDescriptor + Send + Sync,
495 CMH: 'static + Deref + Send + Sync,
496 OMH: 'static + Deref + Send + Sync,
497 RMH: 'static + Deref + Send + Sync,
498 EH: 'static + EventHandler + Send,
499 PS: 'static + Deref + Send,
500 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
501 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
502 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
503 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
504 UMH: 'static + Deref + Send + Sync,
505 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
506 S: 'static + Deref<Target = SC> + Send + Sync,
507 SC: WriteableScore<'a>,
509 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
510 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
513 CA::Target: 'static + chain::Access,
514 CF::Target: 'static + chain::Filter,
515 CW::Target: 'static + chain::Watch<Signer>,
516 T::Target: 'static + BroadcasterInterface,
517 K::Target: 'static + KeysInterface<Signer = Signer>,
518 F::Target: 'static + FeeEstimator,
519 L::Target: 'static + Logger,
520 P::Target: 'static + Persist<Signer>,
521 CMH::Target: 'static + ChannelMessageHandler,
522 OMH::Target: 'static + OnionMessageHandler,
523 RMH::Target: 'static + RoutingMessageHandler,
524 UMH::Target: 'static + CustomMessageHandler,
525 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
527 let stop_thread = Arc::new(AtomicBool::new(false));
528 let stop_thread_clone = stop_thread.clone();
529 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
530 define_run_body!(persister, event_handler, chain_monitor, channel_manager,
531 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
532 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
534 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
537 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
538 /// [`ChannelManager`].
542 /// This function panics if the background thread has panicked such as while persisting or
545 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
546 pub fn join(mut self) -> Result<(), std::io::Error> {
547 assert!(self.thread_handle.is_some());
551 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
552 /// [`ChannelManager`].
556 /// This function panics if the background thread has panicked such as while persisting or
559 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
560 pub fn stop(mut self) -> Result<(), std::io::Error> {
561 assert!(self.thread_handle.is_some());
562 self.stop_and_join_thread()
565 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
566 self.stop_thread.store(true, Ordering::Release);
570 fn join_thread(&mut self) -> Result<(), std::io::Error> {
571 match self.thread_handle.take() {
572 Some(handle) => handle.join().unwrap(),
578 impl Drop for BackgroundProcessor {
580 self.stop_and_join_thread().unwrap();
586 use bitcoin::blockdata::block::BlockHeader;
587 use bitcoin::blockdata::constants::genesis_block;
588 use bitcoin::blockdata::locktime::PackedLockTime;
589 use bitcoin::blockdata::transaction::{Transaction, TxOut};
590 use bitcoin::network::constants::Network;
591 use lightning::chain::{BestBlock, Confirm, chainmonitor};
592 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
593 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
594 use lightning::chain::transaction::OutPoint;
595 use lightning::get_event_msg;
596 use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
597 use lightning::ln::features::ChannelFeatures;
598 use lightning::ln::msgs::{ChannelMessageHandler, Init};
599 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
600 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
601 use lightning::util::config::UserConfig;
602 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
603 use lightning::util::ser::Writeable;
604 use lightning::util::test_utils;
605 use lightning::util::persist::KVStorePersister;
606 use lightning_invoice::payment::{InvoicePayer, Retry};
607 use lightning_invoice::utils::DefaultRouter;
608 use lightning_persister::FilesystemPersister;
610 use std::path::PathBuf;
611 use std::sync::{Arc, Mutex};
612 use std::sync::mpsc::SyncSender;
613 use std::time::Duration;
614 use bitcoin::hashes::Hash;
615 use bitcoin::TxMerkleNode;
616 use lightning::routing::scoring::{FixedPenaltyScorer};
617 use lightning_rapid_gossip_sync::RapidGossipSync;
618 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
620 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
622 #[derive(Clone, Hash, PartialEq, Eq)]
623 struct TestDescriptor{}
624 impl SocketDescriptor for TestDescriptor {
625 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
629 fn disconnect_socket(&mut self) {}
632 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
634 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
635 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
638 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
639 p2p_gossip_sync: PGS,
640 rapid_gossip_sync: RGS,
641 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
642 chain_monitor: Arc<ChainMonitor>,
643 persister: Arc<FilesystemPersister>,
644 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
645 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
646 logger: Arc<test_utils::TestLogger>,
647 best_block: BestBlock,
648 scorer: Arc<Mutex<FixedPenaltyScorer>>,
652 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
653 GossipSync::P2P(self.p2p_gossip_sync.clone())
656 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
657 GossipSync::Rapid(self.rapid_gossip_sync.clone())
660 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
667 let data_dir = self.persister.get_data_dir();
668 match fs::remove_dir_all(data_dir.clone()) {
669 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
676 graph_error: Option<(std::io::ErrorKind, &'static str)>,
677 graph_persistence_notifier: Option<SyncSender<()>>,
678 manager_error: Option<(std::io::ErrorKind, &'static str)>,
679 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
680 filesystem_persister: FilesystemPersister,
684 fn new(data_dir: String) -> Self {
685 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
686 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
689 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
690 Self { graph_error: Some((error, message)), ..self }
693 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
694 Self { graph_persistence_notifier: Some(sender), ..self }
697 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
698 Self { manager_error: Some((error, message)), ..self }
701 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
702 Self { scorer_error: Some((error, message)), ..self }
706 impl KVStorePersister for Persister {
707 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
708 if key == "manager" {
709 if let Some((error, message)) = self.manager_error {
710 return Err(std::io::Error::new(error, message))
714 if key == "network_graph" {
715 if let Some(sender) = &self.graph_persistence_notifier {
716 sender.send(()).unwrap();
719 if let Some((error, message)) = self.graph_error {
720 return Err(std::io::Error::new(error, message))
725 if let Some((error, message)) = self.scorer_error {
726 return Err(std::io::Error::new(error, message))
730 self.filesystem_persister.persist(key, object)
734 fn get_full_filepath(filepath: String, filename: String) -> String {
735 let mut path = PathBuf::from(filepath);
737 path.to_str().unwrap().to_string()
740 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
741 let mut nodes = Vec::new();
742 for i in 0..num_nodes {
743 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
744 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
745 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
746 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
747 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
748 let seed = [i as u8; 32];
749 let network = Network::Testnet;
750 let genesis_block = genesis_block(network);
751 let now = Duration::from_secs(genesis_block.header.time as u64);
752 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
753 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
754 let best_block = BestBlock::from_genesis(network);
755 let params = ChainParameters { network, best_block };
756 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
757 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
758 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
759 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
760 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
761 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
762 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
763 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
767 for i in 0..num_nodes {
768 for j in (i+1)..num_nodes {
769 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
770 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
777 macro_rules! open_channel {
778 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
779 begin_open_channel!($node_a, $node_b, $channel_value);
780 let events = $node_a.node.get_and_clear_pending_events();
781 assert_eq!(events.len(), 1);
782 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
783 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
788 macro_rules! begin_open_channel {
789 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
790 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
791 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
792 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
796 macro_rules! handle_funding_generation_ready {
797 ($event: expr, $channel_value: expr) => {{
799 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
800 assert_eq!(channel_value_satoshis, $channel_value);
801 assert_eq!(user_channel_id, 42);
803 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
804 value: channel_value_satoshis, script_pubkey: output_script.clone(),
806 (temporary_channel_id, tx)
808 _ => panic!("Unexpected event"),
813 macro_rules! end_open_channel {
814 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
815 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
816 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
817 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
821 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
823 let prev_blockhash = node.best_block.block_hash();
824 let height = node.best_block.height() + 1;
825 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
826 let txdata = vec![(0, tx)];
827 node.best_block = BestBlock::new(header.block_hash(), height);
830 node.node.transactions_confirmed(&header, &txdata, height);
831 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
834 node.node.best_block_updated(&header, height);
835 node.chain_monitor.best_block_updated(&header, height);
841 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
842 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
846 fn test_background_processor() {
847 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
848 // updates. Also test that when new updates are available, the manager signals that it needs
849 // re-persistence and is successfully re-persisted.
850 let nodes = create_nodes(2, "test_background_processor".to_string());
852 // Go through the channel creation process so that each node has something to persist. Since
853 // open_channel consumes events, it must complete before starting BackgroundProcessor to
854 // avoid a race with processing events.
855 let tx = open_channel!(nodes[0], nodes[1], 100000);
857 // Initiate the background processors to watch each node.
858 let data_dir = nodes[0].persister.get_data_dir();
859 let persister = Arc::new(Persister::new(data_dir));
860 let event_handler = |_: &_| {};
861 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
863 macro_rules! check_persisted_data {
864 ($node: expr, $filepath: expr) => {
865 let mut expected_bytes = Vec::new();
867 expected_bytes.clear();
868 match $node.write(&mut expected_bytes) {
870 match std::fs::read($filepath) {
872 if bytes == expected_bytes {
881 Err(e) => panic!("Unexpected error: {}", e)
887 // Check that the initial channel manager data is persisted as expected.
888 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
889 check_persisted_data!(nodes[0].node, filepath.clone());
892 if !nodes[0].node.get_persistence_condvar_value() { break }
895 // Force-close the channel.
896 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
898 // Check that the force-close updates are persisted.
899 check_persisted_data!(nodes[0].node, filepath.clone());
901 if !nodes[0].node.get_persistence_condvar_value() { break }
904 // Check network graph is persisted
905 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
906 check_persisted_data!(nodes[0].network_graph, filepath.clone());
908 // Check scorer is persisted
909 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
910 check_persisted_data!(nodes[0].scorer, filepath.clone());
912 assert!(bg_processor.stop().is_ok());
916 fn test_timer_tick_called() {
917 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
918 // `FRESHNESS_TIMER`.
919 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
920 let data_dir = nodes[0].persister.get_data_dir();
921 let persister = Arc::new(Persister::new(data_dir));
922 let event_handler = |_: &_| {};
923 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
925 let log_entries = nodes[0].logger.lines.lock().unwrap();
926 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
927 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
928 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
929 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
934 assert!(bg_processor.stop().is_ok());
938 fn test_channel_manager_persist_error() {
939 // Test that if we encounter an error during manager persistence, the thread panics.
940 let nodes = create_nodes(2, "test_persist_error".to_string());
941 open_channel!(nodes[0], nodes[1], 100000);
943 let data_dir = nodes[0].persister.get_data_dir();
944 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
945 let event_handler = |_: &_| {};
946 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
947 match bg_processor.join() {
948 Ok(_) => panic!("Expected error persisting manager"),
950 assert_eq!(e.kind(), std::io::ErrorKind::Other);
951 assert_eq!(e.get_ref().unwrap().to_string(), "test");
957 fn test_network_graph_persist_error() {
958 // Test that if we encounter an error during network graph persistence, an error gets returned.
959 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
960 let data_dir = nodes[0].persister.get_data_dir();
961 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
962 let event_handler = |_: &_| {};
963 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
965 match bg_processor.stop() {
966 Ok(_) => panic!("Expected error persisting network graph"),
968 assert_eq!(e.kind(), std::io::ErrorKind::Other);
969 assert_eq!(e.get_ref().unwrap().to_string(), "test");
975 fn test_scorer_persist_error() {
976 // Test that if we encounter an error during scorer persistence, an error gets returned.
977 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
978 let data_dir = nodes[0].persister.get_data_dir();
979 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
980 let event_handler = |_: &_| {};
981 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
983 match bg_processor.stop() {
984 Ok(_) => panic!("Expected error persisting scorer"),
986 assert_eq!(e.kind(), std::io::ErrorKind::Other);
987 assert_eq!(e.get_ref().unwrap().to_string(), "test");
993 fn test_background_event_handling() {
994 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
995 let channel_value = 100000;
996 let data_dir = nodes[0].persister.get_data_dir();
997 let persister = Arc::new(Persister::new(data_dir.clone()));
999 // Set up a background event handler for FundingGenerationReady events.
1000 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1001 let event_handler = move |event: &Event| match event {
1002 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1003 Event::ChannelReady { .. } => {},
1004 _ => panic!("Unexpected event: {:?}", event),
1007 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1009 // Open a channel and check that the FundingGenerationReady event was handled.
1010 begin_open_channel!(nodes[0], nodes[1], channel_value);
1011 let (temporary_channel_id, funding_tx) = receiver
1012 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1013 .expect("FundingGenerationReady not handled within deadline");
1014 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1016 // Confirm the funding transaction.
1017 confirm_transaction(&mut nodes[0], &funding_tx);
1018 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1019 confirm_transaction(&mut nodes[1], &funding_tx);
1020 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1021 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1022 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1023 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1024 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1026 assert!(bg_processor.stop().is_ok());
1028 // Set up a background event handler for SpendableOutputs events.
1029 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1030 let event_handler = move |event: &Event| match event {
1031 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1032 Event::ChannelReady { .. } => {},
1033 Event::ChannelClosed { .. } => {},
1034 _ => panic!("Unexpected event: {:?}", event),
1036 let persister = Arc::new(Persister::new(data_dir));
1037 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1039 // Force close the channel and check that the SpendableOutputs event was handled.
1040 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1041 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1042 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1044 let event = receiver
1045 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1046 .expect("Events not handled within deadline");
1048 Event::SpendableOutputs { .. } => {},
1049 _ => panic!("Unexpected event: {:?}", event),
1052 assert!(bg_processor.stop().is_ok());
1056 fn test_scorer_persistence() {
1057 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1058 let data_dir = nodes[0].persister.get_data_dir();
1059 let persister = Arc::new(Persister::new(data_dir));
1060 let event_handler = |_: &_| {};
1061 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1064 let log_entries = nodes[0].logger.lines.lock().unwrap();
1065 let expected_log = "Persisting scorer".to_string();
1066 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1071 assert!(bg_processor.stop().is_ok());
1075 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1076 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1077 let data_dir = nodes[0].persister.get_data_dir();
1078 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1079 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1080 let network_graph = nodes[0].network_graph.clone();
1081 let features = ChannelFeatures::empty();
1082 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1083 .expect("Failed to update channel from partial announcement");
1084 let original_graph_description = network_graph.to_string();
1085 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1086 assert_eq!(network_graph.read_only().channels().len(), 1);
1088 let event_handler = |_: &_| {};
1089 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1092 let log_entries = nodes[0].logger.lines.lock().unwrap();
1093 let expected_log_a = "Assessing prunability of network graph".to_string();
1094 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1095 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1096 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1101 let initialization_input = vec![
1102 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1103 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1104 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1105 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1106 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1107 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1108 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1109 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1110 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1111 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1112 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1113 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1114 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1116 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1118 // this should have added two channels
1119 assert_eq!(network_graph.read_only().channels().len(), 3);
1122 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1123 .expect("Network graph not pruned within deadline");
1125 background_processor.stop().unwrap();
1127 // all channels should now be pruned
1128 assert_eq!(network_graph.read_only().channels().len(), 0);
1132 fn test_invoice_payer() {
1133 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1134 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1135 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1137 // Initiate the background processors to watch each node.
1138 let data_dir = nodes[0].persister.get_data_dir();
1139 let persister = Arc::new(Persister::new(data_dir));
1140 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1141 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1142 let event_handler = Arc::clone(&invoice_payer);
1143 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1144 assert!(bg_processor.stop().is_ok());