1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 /// writing it to disk/backups by invoking the callback given to it at startup.
46 /// [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 /// at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66 stop_thread: Arc<AtomicBool>,
67 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
71 const FRESHNESS_TIMER: u64 = 60;
73 const FRESHNESS_TIMER: u64 = 1;
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
83 const PING_TIMER: u64 = 1;
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
89 const SCORER_PERSIST_TIMER: u64 = 30;
91 const SCORER_PERSIST_TIMER: u64 = 1;
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 P: Deref<Target = P2PGossipSync<G, A, L>>,
101 R: Deref<Target = RapidGossipSync<G, L>>,
102 G: Deref<Target = NetworkGraph<L>>,
106 where A::Target: chain::Access, L::Target: Logger {
107 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109 /// Rapid gossip sync from a trusted server.
116 P: Deref<Target = P2PGossipSync<G, A, L>>,
117 R: Deref<Target = RapidGossipSync<G, L>>,
118 G: Deref<Target = NetworkGraph<L>>,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123 fn network_graph(&self) -> Option<&G> {
125 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::None => None,
131 fn prunable_network_graph(&self) -> Option<&G> {
133 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134 GossipSync::Rapid(gossip_sync) => {
135 if gossip_sync.is_initial_sync_complete() {
136 Some(gossip_sync.network_graph())
141 GossipSync::None => None,
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 A::Target: chain::Access,
153 /// Initializes a new [`GossipSync::P2P`] variant.
154 pub fn p2p(gossip_sync: P) -> Self {
155 GossipSync::P2P(gossip_sync)
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
165 &'a (dyn chain::Access + Send + Sync),
171 /// Initializes a new [`GossipSync::Rapid`] variant.
172 pub fn rapid(gossip_sync: R) -> Self {
173 GossipSync::Rapid(gossip_sync)
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
180 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183 &'a (dyn chain::Access + Send + Sync),
189 /// Initializes a new [`GossipSync::None`] variant.
190 pub fn none() -> Self {
195 fn handle_network_graph_update<L: Deref>(
196 network_graph: &NetworkGraph<L>, event: &Event
197 ) where L::Target: Logger {
198 if let Event::PaymentPathFailed { ref network_update, .. } = event {
199 if let Some(network_update) = network_update {
200 network_graph.handle_network_update(&network_update);
205 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
206 struct DecoratingEventHandler<
209 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
210 RGS: Deref<Target = RapidGossipSync<G, L>>,
211 G: Deref<Target = NetworkGraph<L>>,
215 where A::Target: chain::Access, L::Target: Logger {
217 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
223 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
224 RGS: Deref<Target = RapidGossipSync<G, L>>,
225 G: Deref<Target = NetworkGraph<L>>,
228 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
229 where A::Target: chain::Access, L::Target: Logger {
230 fn handle_event(&self, event: Event) {
231 if let Some(network_graph) = self.gossip_sync.network_graph() {
232 handle_network_graph_update(network_graph, &event)
234 self.event_handler.handle_event(event);
238 macro_rules! define_run_body {
239 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
240 $channel_manager: ident, $process_channel_manager_events: expr,
241 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
242 $loop_exit_check: expr, $await: expr)
244 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
245 $channel_manager.timer_tick_occurred();
247 let mut last_freshness_call = Instant::now();
248 let mut last_ping_call = Instant::now();
249 let mut last_prune_call = Instant::now();
250 let mut last_scorer_persist_call = Instant::now();
251 let mut have_pruned = false;
254 $process_channel_manager_events;
255 $process_chain_monitor_events;
257 // Note that the PeerManager::process_events may block on ChannelManager's locks,
258 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
259 // we want to ensure we get into `persist_manager` as quickly as we can, especially
260 // without running the normal event processing above and handing events to users.
262 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
263 // processing a message effectively at any point during this loop. In order to
264 // minimize the time between such processing completing and persisting the updated
265 // ChannelManager, we want to minimize methods blocking on a ChannelManager
266 // generally, and as a fallback place such blocking only immediately before
268 $peer_manager.process_events();
270 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
271 // see `await_start`'s use below.
272 let await_start = Instant::now();
273 let updates_available = $await;
274 let await_time = await_start.elapsed();
276 if updates_available {
277 log_trace!($logger, "Persisting ChannelManager...");
278 $persister.persist_manager(&*$channel_manager)?;
279 log_trace!($logger, "Done persisting ChannelManager.");
281 // Exit the loop if the background processor was requested to stop.
282 if $loop_exit_check {
283 log_trace!($logger, "Terminating background processor.");
286 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
287 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
288 $channel_manager.timer_tick_occurred();
289 last_freshness_call = Instant::now();
291 if await_time > Duration::from_secs(1) {
292 // On various platforms, we may be starved of CPU cycles for several reasons.
293 // E.g. on iOS, if we've been in the background, we will be entirely paused.
294 // Similarly, if we're on a desktop platform and the device has been asleep, we
295 // may not get any cycles.
296 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
297 // full second, at which point we assume sockets may have been killed (they
298 // appear to be at least on some platforms, even if it has only been a second).
299 // Note that we have to take care to not get here just because user event
300 // processing was slow at the top of the loop. For example, the sample client
301 // may call Bitcoin Core RPCs during event handling, which very often takes
302 // more than a handful of seconds to complete, and shouldn't disconnect all our
304 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
305 $peer_manager.disconnect_all_peers();
306 last_ping_call = Instant::now();
307 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
308 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
309 $peer_manager.timer_tick_occurred();
310 last_ping_call = Instant::now();
313 // Note that we want to run a graph prune once not long after startup before
314 // falling back to our usual hourly prunes. This avoids short-lived clients never
315 // pruning their network graph. We run once 60 seconds after startup before
316 // continuing our normal cadence.
317 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
318 // The network graph must not be pruned while rapid sync completion is pending
319 log_trace!($logger, "Assessing prunability of network graph");
320 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
321 network_graph.remove_stale_channels_and_tracking();
323 if let Err(e) = $persister.persist_graph(network_graph) {
324 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
327 last_prune_call = Instant::now();
330 log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
334 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
335 if let Some(ref scorer) = $scorer {
336 log_trace!($logger, "Persisting scorer");
337 if let Err(e) = $persister.persist_scorer(&scorer) {
338 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
341 last_scorer_persist_call = Instant::now();
345 // After we exit, ensure we persist the ChannelManager one final time - this avoids
346 // some races where users quit while channel updates were in-flight, with
347 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
348 $persister.persist_manager(&*$channel_manager)?;
350 // Persist Scorer on exit
351 if let Some(ref scorer) = $scorer {
352 $persister.persist_scorer(&scorer)?;
355 // Persist NetworkGraph on exit
356 if let Some(network_graph) = $gossip_sync.network_graph() {
357 $persister.persist_graph(network_graph)?;
364 /// Processes background events in a future.
366 /// `sleeper` should return a future which completes in the given amount of time and returns a
367 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
368 /// future which outputs true, the loop will exit and this function's future will complete.
370 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
371 #[cfg(feature = "futures")]
372 pub async fn process_events_async<
374 Signer: 'static + Sign,
375 CA: 'static + Deref + Send + Sync,
376 CF: 'static + Deref + Send + Sync,
377 CW: 'static + Deref + Send + Sync,
378 T: 'static + Deref + Send + Sync,
379 K: 'static + Deref + Send + Sync,
380 F: 'static + Deref + Send + Sync,
381 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
382 L: 'static + Deref + Send + Sync,
383 P: 'static + Deref + Send + Sync,
384 Descriptor: 'static + SocketDescriptor + Send + Sync,
385 CMH: 'static + Deref + Send + Sync,
386 RMH: 'static + Deref + Send + Sync,
387 OMH: 'static + Deref + Send + Sync,
388 EventHandlerFuture: core::future::Future<Output = ()>,
389 EventHandler: Fn(Event) -> EventHandlerFuture,
390 PS: 'static + Deref + Send,
391 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
392 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
393 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
394 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
395 UMH: 'static + Deref + Send + Sync,
396 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
397 S: 'static + Deref<Target = SC> + Send + Sync,
398 SC: WriteableScore<'a>,
399 SleepFuture: core::future::Future<Output = bool>,
400 Sleeper: Fn(Duration) -> SleepFuture
402 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
403 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
405 ) -> Result<(), std::io::Error>
407 CA::Target: 'static + chain::Access,
408 CF::Target: 'static + chain::Filter,
409 CW::Target: 'static + chain::Watch<Signer>,
410 T::Target: 'static + BroadcasterInterface,
411 K::Target: 'static + KeysInterface<Signer = Signer>,
412 F::Target: 'static + FeeEstimator,
413 L::Target: 'static + Logger,
414 P::Target: 'static + Persist<Signer>,
415 CMH::Target: 'static + ChannelMessageHandler,
416 OMH::Target: 'static + OnionMessageHandler,
417 RMH::Target: 'static + RoutingMessageHandler,
418 UMH::Target: 'static + CustomMessageHandler,
419 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
421 let mut should_break = true;
422 let async_event_handler = |event| {
423 let network_graph = gossip_sync.network_graph();
424 let event_handler = &event_handler;
426 if let Some(network_graph) = network_graph {
427 handle_network_graph_update(network_graph, &event)
429 event_handler(event).await;
432 define_run_body!(persister,
433 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
434 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
435 gossip_sync, peer_manager, logger, scorer, should_break, {
437 _ = channel_manager.get_persistable_update_future().fuse() => true,
438 exit = sleeper(Duration::from_millis(100)).fuse() => {
446 impl BackgroundProcessor {
447 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
450 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
451 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
452 /// either [`join`] or [`stop`].
454 /// # Data Persistence
456 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
457 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
458 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
459 /// provided implementation.
461 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
462 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
463 /// See the `lightning-persister` crate for LDK's provided implementation.
465 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
466 /// error or call [`join`] and handle any error that may arise. For the latter case,
467 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
471 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
472 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
473 /// functionality implemented by other handlers.
474 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
476 /// # Rapid Gossip Sync
478 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
479 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
480 /// until the [`RapidGossipSync`] instance completes its first sync.
482 /// [top-level documentation]: BackgroundProcessor
483 /// [`join`]: Self::join
484 /// [`stop`]: Self::stop
485 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
486 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
487 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
488 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
489 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
490 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
493 Signer: 'static + Sign,
494 CA: 'static + Deref + Send + Sync,
495 CF: 'static + Deref + Send + Sync,
496 CW: 'static + Deref + Send + Sync,
497 T: 'static + Deref + Send + Sync,
498 K: 'static + Deref + Send + Sync,
499 F: 'static + Deref + Send + Sync,
500 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
501 L: 'static + Deref + Send + Sync,
502 P: 'static + Deref + Send + Sync,
503 Descriptor: 'static + SocketDescriptor + Send + Sync,
504 CMH: 'static + Deref + Send + Sync,
505 OMH: 'static + Deref + Send + Sync,
506 RMH: 'static + Deref + Send + Sync,
507 EH: 'static + EventHandler + Send,
508 PS: 'static + Deref + Send,
509 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
510 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
511 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
512 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
513 UMH: 'static + Deref + Send + Sync,
514 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
515 S: 'static + Deref<Target = SC> + Send + Sync,
516 SC: WriteableScore<'a>,
518 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
519 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
522 CA::Target: 'static + chain::Access,
523 CF::Target: 'static + chain::Filter,
524 CW::Target: 'static + chain::Watch<Signer>,
525 T::Target: 'static + BroadcasterInterface,
526 K::Target: 'static + KeysInterface<Signer = Signer>,
527 F::Target: 'static + FeeEstimator,
528 L::Target: 'static + Logger,
529 P::Target: 'static + Persist<Signer>,
530 CMH::Target: 'static + ChannelMessageHandler,
531 OMH::Target: 'static + OnionMessageHandler,
532 RMH::Target: 'static + RoutingMessageHandler,
533 UMH::Target: 'static + CustomMessageHandler,
534 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
536 let stop_thread = Arc::new(AtomicBool::new(false));
537 let stop_thread_clone = stop_thread.clone();
538 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
539 let event_handler = DecoratingEventHandler {
541 gossip_sync: &gossip_sync,
543 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
544 channel_manager, channel_manager.process_pending_events(&event_handler),
545 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
546 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
548 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
551 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
552 /// [`ChannelManager`].
556 /// This function panics if the background thread has panicked such as while persisting or
559 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
560 pub fn join(mut self) -> Result<(), std::io::Error> {
561 assert!(self.thread_handle.is_some());
565 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
566 /// [`ChannelManager`].
570 /// This function panics if the background thread has panicked such as while persisting or
573 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
574 pub fn stop(mut self) -> Result<(), std::io::Error> {
575 assert!(self.thread_handle.is_some());
576 self.stop_and_join_thread()
579 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
580 self.stop_thread.store(true, Ordering::Release);
584 fn join_thread(&mut self) -> Result<(), std::io::Error> {
585 match self.thread_handle.take() {
586 Some(handle) => handle.join().unwrap(),
592 impl Drop for BackgroundProcessor {
594 self.stop_and_join_thread().unwrap();
600 use bitcoin::blockdata::block::BlockHeader;
601 use bitcoin::blockdata::constants::genesis_block;
602 use bitcoin::blockdata::locktime::PackedLockTime;
603 use bitcoin::blockdata::transaction::{Transaction, TxOut};
604 use bitcoin::network::constants::Network;
605 use lightning::chain::{BestBlock, Confirm, chainmonitor};
606 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
607 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
608 use lightning::chain::transaction::OutPoint;
609 use lightning::get_event_msg;
610 use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
611 use lightning::ln::features::ChannelFeatures;
612 use lightning::ln::msgs::{ChannelMessageHandler, Init};
613 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
614 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
615 use lightning::util::config::UserConfig;
616 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
617 use lightning::util::ser::Writeable;
618 use lightning::util::test_utils;
619 use lightning::util::persist::KVStorePersister;
620 use lightning_invoice::payment::{InvoicePayer, Retry};
621 use lightning_invoice::utils::DefaultRouter;
622 use lightning_persister::FilesystemPersister;
624 use std::path::PathBuf;
625 use std::sync::{Arc, Mutex};
626 use std::sync::mpsc::SyncSender;
627 use std::time::Duration;
628 use bitcoin::hashes::Hash;
629 use bitcoin::TxMerkleNode;
630 use lightning::routing::scoring::{FixedPenaltyScorer};
631 use lightning_rapid_gossip_sync::RapidGossipSync;
632 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
634 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
636 #[derive(Clone, Hash, PartialEq, Eq)]
637 struct TestDescriptor{}
638 impl SocketDescriptor for TestDescriptor {
639 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
643 fn disconnect_socket(&mut self) {}
646 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
648 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
649 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
652 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
653 p2p_gossip_sync: PGS,
654 rapid_gossip_sync: RGS,
655 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
656 chain_monitor: Arc<ChainMonitor>,
657 persister: Arc<FilesystemPersister>,
658 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
659 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
660 logger: Arc<test_utils::TestLogger>,
661 best_block: BestBlock,
662 scorer: Arc<Mutex<FixedPenaltyScorer>>,
666 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
667 GossipSync::P2P(self.p2p_gossip_sync.clone())
670 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
671 GossipSync::Rapid(self.rapid_gossip_sync.clone())
674 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
681 let data_dir = self.persister.get_data_dir();
682 match fs::remove_dir_all(data_dir.clone()) {
683 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
690 graph_error: Option<(std::io::ErrorKind, &'static str)>,
691 graph_persistence_notifier: Option<SyncSender<()>>,
692 manager_error: Option<(std::io::ErrorKind, &'static str)>,
693 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
694 filesystem_persister: FilesystemPersister,
698 fn new(data_dir: String) -> Self {
699 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
700 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
703 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
704 Self { graph_error: Some((error, message)), ..self }
707 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
708 Self { graph_persistence_notifier: Some(sender), ..self }
711 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
712 Self { manager_error: Some((error, message)), ..self }
715 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
716 Self { scorer_error: Some((error, message)), ..self }
720 impl KVStorePersister for Persister {
721 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
722 if key == "manager" {
723 if let Some((error, message)) = self.manager_error {
724 return Err(std::io::Error::new(error, message))
728 if key == "network_graph" {
729 if let Some(sender) = &self.graph_persistence_notifier {
730 sender.send(()).unwrap();
733 if let Some((error, message)) = self.graph_error {
734 return Err(std::io::Error::new(error, message))
739 if let Some((error, message)) = self.scorer_error {
740 return Err(std::io::Error::new(error, message))
744 self.filesystem_persister.persist(key, object)
748 fn get_full_filepath(filepath: String, filename: String) -> String {
749 let mut path = PathBuf::from(filepath);
751 path.to_str().unwrap().to_string()
754 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
755 let mut nodes = Vec::new();
756 for i in 0..num_nodes {
757 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
758 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
759 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
760 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
761 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
762 let seed = [i as u8; 32];
763 let network = Network::Testnet;
764 let genesis_block = genesis_block(network);
765 let now = Duration::from_secs(genesis_block.header.time as u64);
766 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
767 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
768 let best_block = BestBlock::from_genesis(network);
769 let params = ChainParameters { network, best_block };
770 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
771 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
772 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
773 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
774 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
775 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
776 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
777 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
781 for i in 0..num_nodes {
782 for j in (i+1)..num_nodes {
783 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
784 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
791 macro_rules! open_channel {
792 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
793 begin_open_channel!($node_a, $node_b, $channel_value);
794 let events = $node_a.node.get_and_clear_pending_events();
795 assert_eq!(events.len(), 1);
796 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
797 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
802 macro_rules! begin_open_channel {
803 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
804 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
805 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
806 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
810 macro_rules! handle_funding_generation_ready {
811 ($event: expr, $channel_value: expr) => {{
813 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
814 assert_eq!(channel_value_satoshis, $channel_value);
815 assert_eq!(user_channel_id, 42);
817 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
818 value: channel_value_satoshis, script_pubkey: output_script.clone(),
820 (temporary_channel_id, tx)
822 _ => panic!("Unexpected event"),
827 macro_rules! end_open_channel {
828 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
829 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
830 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
831 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
835 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
837 let prev_blockhash = node.best_block.block_hash();
838 let height = node.best_block.height() + 1;
839 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
840 let txdata = vec![(0, tx)];
841 node.best_block = BestBlock::new(header.block_hash(), height);
844 node.node.transactions_confirmed(&header, &txdata, height);
845 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
848 node.node.best_block_updated(&header, height);
849 node.chain_monitor.best_block_updated(&header, height);
855 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
856 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
860 fn test_background_processor() {
861 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
862 // updates. Also test that when new updates are available, the manager signals that it needs
863 // re-persistence and is successfully re-persisted.
864 let nodes = create_nodes(2, "test_background_processor".to_string());
866 // Go through the channel creation process so that each node has something to persist. Since
867 // open_channel consumes events, it must complete before starting BackgroundProcessor to
868 // avoid a race with processing events.
869 let tx = open_channel!(nodes[0], nodes[1], 100000);
871 // Initiate the background processors to watch each node.
872 let data_dir = nodes[0].persister.get_data_dir();
873 let persister = Arc::new(Persister::new(data_dir));
874 let event_handler = |_: _| {};
875 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
877 macro_rules! check_persisted_data {
878 ($node: expr, $filepath: expr) => {
879 let mut expected_bytes = Vec::new();
881 expected_bytes.clear();
882 match $node.write(&mut expected_bytes) {
884 match std::fs::read($filepath) {
886 if bytes == expected_bytes {
895 Err(e) => panic!("Unexpected error: {}", e)
901 // Check that the initial channel manager data is persisted as expected.
902 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
903 check_persisted_data!(nodes[0].node, filepath.clone());
906 if !nodes[0].node.get_persistence_condvar_value() { break }
909 // Force-close the channel.
910 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
912 // Check that the force-close updates are persisted.
913 check_persisted_data!(nodes[0].node, filepath.clone());
915 if !nodes[0].node.get_persistence_condvar_value() { break }
918 // Check network graph is persisted
919 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
920 check_persisted_data!(nodes[0].network_graph, filepath.clone());
922 // Check scorer is persisted
923 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
924 check_persisted_data!(nodes[0].scorer, filepath.clone());
926 assert!(bg_processor.stop().is_ok());
930 fn test_timer_tick_called() {
931 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
932 // `FRESHNESS_TIMER`.
933 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
934 let data_dir = nodes[0].persister.get_data_dir();
935 let persister = Arc::new(Persister::new(data_dir));
936 let event_handler = |_: _| {};
937 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
939 let log_entries = nodes[0].logger.lines.lock().unwrap();
940 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
941 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
942 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
943 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
948 assert!(bg_processor.stop().is_ok());
952 fn test_channel_manager_persist_error() {
953 // Test that if we encounter an error during manager persistence, the thread panics.
954 let nodes = create_nodes(2, "test_persist_error".to_string());
955 open_channel!(nodes[0], nodes[1], 100000);
957 let data_dir = nodes[0].persister.get_data_dir();
958 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
959 let event_handler = |_: _| {};
960 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
961 match bg_processor.join() {
962 Ok(_) => panic!("Expected error persisting manager"),
964 assert_eq!(e.kind(), std::io::ErrorKind::Other);
965 assert_eq!(e.get_ref().unwrap().to_string(), "test");
971 fn test_network_graph_persist_error() {
972 // Test that if we encounter an error during network graph persistence, an error gets returned.
973 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
974 let data_dir = nodes[0].persister.get_data_dir();
975 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
976 let event_handler = |_: _| {};
977 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
979 match bg_processor.stop() {
980 Ok(_) => panic!("Expected error persisting network graph"),
982 assert_eq!(e.kind(), std::io::ErrorKind::Other);
983 assert_eq!(e.get_ref().unwrap().to_string(), "test");
989 fn test_scorer_persist_error() {
990 // Test that if we encounter an error during scorer persistence, an error gets returned.
991 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
992 let data_dir = nodes[0].persister.get_data_dir();
993 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
994 let event_handler = |_: _| {};
995 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
997 match bg_processor.stop() {
998 Ok(_) => panic!("Expected error persisting scorer"),
1000 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1001 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1007 fn test_background_event_handling() {
1008 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1009 let channel_value = 100000;
1010 let data_dir = nodes[0].persister.get_data_dir();
1011 let persister = Arc::new(Persister::new(data_dir.clone()));
1013 // Set up a background event handler for FundingGenerationReady events.
1014 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1015 let event_handler = move |event: Event| match event {
1016 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1017 Event::ChannelReady { .. } => {},
1018 _ => panic!("Unexpected event: {:?}", event),
1021 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1023 // Open a channel and check that the FundingGenerationReady event was handled.
1024 begin_open_channel!(nodes[0], nodes[1], channel_value);
1025 let (temporary_channel_id, funding_tx) = receiver
1026 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1027 .expect("FundingGenerationReady not handled within deadline");
1028 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1030 // Confirm the funding transaction.
1031 confirm_transaction(&mut nodes[0], &funding_tx);
1032 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1033 confirm_transaction(&mut nodes[1], &funding_tx);
1034 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1035 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1036 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1037 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1038 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1040 assert!(bg_processor.stop().is_ok());
1042 // Set up a background event handler for SpendableOutputs events.
1043 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1044 let event_handler = move |event: Event| match event {
1045 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1046 Event::ChannelReady { .. } => {},
1047 Event::ChannelClosed { .. } => {},
1048 _ => panic!("Unexpected event: {:?}", event),
1050 let persister = Arc::new(Persister::new(data_dir));
1051 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1053 // Force close the channel and check that the SpendableOutputs event was handled.
1054 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1055 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1056 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1058 let event = receiver
1059 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1060 .expect("Events not handled within deadline");
1062 Event::SpendableOutputs { .. } => {},
1063 _ => panic!("Unexpected event: {:?}", event),
1066 assert!(bg_processor.stop().is_ok());
1070 fn test_scorer_persistence() {
1071 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1072 let data_dir = nodes[0].persister.get_data_dir();
1073 let persister = Arc::new(Persister::new(data_dir));
1074 let event_handler = |_: _| {};
1075 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1078 let log_entries = nodes[0].logger.lines.lock().unwrap();
1079 let expected_log = "Persisting scorer".to_string();
1080 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1085 assert!(bg_processor.stop().is_ok());
1089 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1090 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1091 let data_dir = nodes[0].persister.get_data_dir();
1092 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1093 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1094 let network_graph = nodes[0].network_graph.clone();
1095 let features = ChannelFeatures::empty();
1096 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1097 .expect("Failed to update channel from partial announcement");
1098 let original_graph_description = network_graph.to_string();
1099 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1100 assert_eq!(network_graph.read_only().channels().len(), 1);
1102 let event_handler = |_: _| {};
1103 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1106 let log_entries = nodes[0].logger.lines.lock().unwrap();
1107 let expected_log_a = "Assessing prunability of network graph".to_string();
1108 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1109 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1110 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1115 let initialization_input = vec![
1116 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1117 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1118 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1119 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1120 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1121 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1122 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1123 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1124 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1125 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1126 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1127 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1128 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1130 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1132 // this should have added two channels
1133 assert_eq!(network_graph.read_only().channels().len(), 3);
1136 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1137 .expect("Network graph not pruned within deadline");
1139 background_processor.stop().unwrap();
1141 // all channels should now be pruned
1142 assert_eq!(network_graph.read_only().channels().len(), 0);
1146 fn test_invoice_payer() {
1147 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1148 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1149 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1151 // Initiate the background processors to watch each node.
1152 let data_dir = nodes[0].persister.get_data_dir();
1153 let persister = Arc::new(Persister::new(data_dir));
1154 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1155 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1156 let event_handler = Arc::clone(&invoice_payer);
1157 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1158 assert!(bg_processor.stop().is_ok());