1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::router::Router;
26 use lightning::routing::scoring::WriteableScore;
27 use lightning::util::events::{Event, EventHandler, EventsProvider};
28 use lightning::util::logger::Logger;
29 use lightning::util::persist::Persister;
30 use lightning_rapid_gossip_sync::RapidGossipSync;
32 use std::sync::atomic::{AtomicBool, Ordering};
34 use std::thread::JoinHandle;
35 use std::time::{Duration, Instant};
38 #[cfg(feature = "futures")]
39 use futures_util::{select_biased, future::FutureExt};
41 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
42 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
43 /// responsibilities are:
44 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
45 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
46 /// writing it to disk/backups by invoking the callback given to it at startup.
47 /// [`ChannelManager`] persistence should be done in the background.
48 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
49 /// at the appropriate intervals.
50 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
51 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
53 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
54 /// upon as doing so may result in high latency.
58 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
59 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
60 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
61 /// unilateral chain closure fees are at risk.
63 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
64 /// [`Event`]: lightning::util::events::Event
65 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
66 pub struct BackgroundProcessor {
67 stop_thread: Arc<AtomicBool>,
68 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
72 const FRESHNESS_TIMER: u64 = 60;
74 const FRESHNESS_TIMER: u64 = 1;
76 #[cfg(all(not(test), not(debug_assertions)))]
77 const PING_TIMER: u64 = 10;
78 /// Signature operations take a lot longer without compiler optimisations.
79 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
80 /// timeout is reached.
81 #[cfg(all(not(test), debug_assertions))]
82 const PING_TIMER: u64 = 30;
84 const PING_TIMER: u64 = 1;
86 /// Prune the network graph of stale entries hourly.
87 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
90 const SCORER_PERSIST_TIMER: u64 = 30;
92 const SCORER_PERSIST_TIMER: u64 = 1;
95 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
97 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
99 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
101 P: Deref<Target = P2PGossipSync<G, A, L>>,
102 R: Deref<Target = RapidGossipSync<G, L>>,
103 G: Deref<Target = NetworkGraph<L>>,
107 where A::Target: chain::Access, L::Target: Logger {
108 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
110 /// Rapid gossip sync from a trusted server.
117 P: Deref<Target = P2PGossipSync<G, A, L>>,
118 R: Deref<Target = RapidGossipSync<G, L>>,
119 G: Deref<Target = NetworkGraph<L>>,
122 > GossipSync<P, R, G, A, L>
123 where A::Target: chain::Access, L::Target: Logger {
124 fn network_graph(&self) -> Option<&G> {
126 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
128 GossipSync::None => None,
132 fn prunable_network_graph(&self) -> Option<&G> {
134 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
135 GossipSync::Rapid(gossip_sync) => {
136 if gossip_sync.is_initial_sync_complete() {
137 Some(gossip_sync.network_graph())
142 GossipSync::None => None,
147 /// (C-not exported) as the bindings concretize everything and have constructors for us
148 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
149 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
151 A::Target: chain::Access,
154 /// Initializes a new [`GossipSync::P2P`] variant.
155 pub fn p2p(gossip_sync: P) -> Self {
156 GossipSync::P2P(gossip_sync)
160 /// (C-not exported) as the bindings concretize everything and have constructors for us
161 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
163 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
166 &'a (dyn chain::Access + Send + Sync),
172 /// Initializes a new [`GossipSync::Rapid`] variant.
173 pub fn rapid(gossip_sync: R) -> Self {
174 GossipSync::Rapid(gossip_sync)
178 /// (C-not exported) as the bindings concretize everything and have constructors for us
181 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
182 &RapidGossipSync<&'a NetworkGraph<L>, L>,
184 &'a (dyn chain::Access + Send + Sync),
190 /// Initializes a new [`GossipSync::None`] variant.
191 pub fn none() -> Self {
196 fn handle_network_graph_update<L: Deref>(
197 network_graph: &NetworkGraph<L>, event: &Event
198 ) where L::Target: Logger {
199 if let Event::PaymentPathFailed { ref network_update, .. } = event {
200 if let Some(network_update) = network_update {
201 network_graph.handle_network_update(&network_update);
206 macro_rules! define_run_body {
207 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
208 $channel_manager: ident, $process_channel_manager_events: expr,
209 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
210 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
212 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
213 $channel_manager.timer_tick_occurred();
215 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
216 let mut last_ping_call = $get_timer(PING_TIMER);
217 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
218 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
219 let mut have_pruned = false;
222 $process_channel_manager_events;
223 $process_chain_monitor_events;
225 // Note that the PeerManager::process_events may block on ChannelManager's locks,
226 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
227 // we want to ensure we get into `persist_manager` as quickly as we can, especially
228 // without running the normal event processing above and handing events to users.
230 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
231 // processing a message effectively at any point during this loop. In order to
232 // minimize the time between such processing completing and persisting the updated
233 // ChannelManager, we want to minimize methods blocking on a ChannelManager
234 // generally, and as a fallback place such blocking only immediately before
236 $peer_manager.process_events();
238 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
239 // see `await_start`'s use below.
240 let mut await_start = $get_timer(1);
241 let updates_available = $await;
242 let await_slow = $timer_elapsed(&mut await_start, 1);
244 if updates_available {
245 log_trace!($logger, "Persisting ChannelManager...");
246 $persister.persist_manager(&*$channel_manager)?;
247 log_trace!($logger, "Done persisting ChannelManager.");
249 // Exit the loop if the background processor was requested to stop.
250 if $loop_exit_check {
251 log_trace!($logger, "Terminating background processor.");
254 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
255 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
256 $channel_manager.timer_tick_occurred();
257 last_freshness_call = $get_timer(FRESHNESS_TIMER);
260 // On various platforms, we may be starved of CPU cycles for several reasons.
261 // E.g. on iOS, if we've been in the background, we will be entirely paused.
262 // Similarly, if we're on a desktop platform and the device has been asleep, we
263 // may not get any cycles.
264 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
265 // full second, at which point we assume sockets may have been killed (they
266 // appear to be at least on some platforms, even if it has only been a second).
267 // Note that we have to take care to not get here just because user event
268 // processing was slow at the top of the loop. For example, the sample client
269 // may call Bitcoin Core RPCs during event handling, which very often takes
270 // more than a handful of seconds to complete, and shouldn't disconnect all our
272 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
273 $peer_manager.disconnect_all_peers();
274 last_ping_call = $get_timer(PING_TIMER);
275 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
276 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
277 $peer_manager.timer_tick_occurred();
278 last_ping_call = $get_timer(PING_TIMER);
281 // Note that we want to run a graph prune once not long after startup before
282 // falling back to our usual hourly prunes. This avoids short-lived clients never
283 // pruning their network graph. We run once 60 seconds after startup before
284 // continuing our normal cadence.
285 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
286 // The network graph must not be pruned while rapid sync completion is pending
287 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
288 log_trace!($logger, "Pruning and persisting network graph.");
289 network_graph.remove_stale_channels_and_tracking();
291 if let Err(e) = $persister.persist_graph(network_graph) {
292 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
295 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
300 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
301 if let Some(ref scorer) = $scorer {
302 log_trace!($logger, "Persisting scorer");
303 if let Err(e) = $persister.persist_scorer(&scorer) {
304 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
307 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
311 // After we exit, ensure we persist the ChannelManager one final time - this avoids
312 // some races where users quit while channel updates were in-flight, with
313 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
314 $persister.persist_manager(&*$channel_manager)?;
316 // Persist Scorer on exit
317 if let Some(ref scorer) = $scorer {
318 $persister.persist_scorer(&scorer)?;
321 // Persist NetworkGraph on exit
322 if let Some(network_graph) = $gossip_sync.network_graph() {
323 $persister.persist_graph(network_graph)?;
330 /// Processes background events in a future.
332 /// `sleeper` should return a future which completes in the given amount of time and returns a
333 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
334 /// future which outputs true, the loop will exit and this function's future will complete.
336 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
337 #[cfg(feature = "futures")]
338 pub async fn process_events_async<
340 CA: 'static + Deref + Send + Sync,
341 CF: 'static + Deref + Send + Sync,
342 CW: 'static + Deref + Send + Sync,
343 T: 'static + Deref + Send + Sync,
344 ES: 'static + Deref + Send + Sync,
345 NS: 'static + Deref + Send + Sync,
346 SP: 'static + Deref + Send + Sync,
347 F: 'static + Deref + Send + Sync,
348 R: 'static + Deref + Send + Sync,
349 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
350 L: 'static + Deref + Send + Sync,
351 P: 'static + Deref + Send + Sync,
352 Descriptor: 'static + SocketDescriptor + Send + Sync,
353 CMH: 'static + Deref + Send + Sync,
354 RMH: 'static + Deref + Send + Sync,
355 OMH: 'static + Deref + Send + Sync,
356 EventHandlerFuture: core::future::Future<Output = ()>,
357 EventHandler: Fn(Event) -> EventHandlerFuture,
358 PS: 'static + Deref + Send,
359 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
360 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
361 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
362 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
363 UMH: 'static + Deref + Send + Sync,
364 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
365 S: 'static + Deref<Target = SC> + Send + Sync,
366 SC: WriteableScore<'a>,
367 SleepFuture: core::future::Future<Output = bool>,
368 Sleeper: Fn(Duration) -> SleepFuture
370 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
371 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
373 ) -> Result<(), std::io::Error>
375 CA::Target: 'static + chain::Access,
376 CF::Target: 'static + chain::Filter,
377 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
378 T::Target: 'static + BroadcasterInterface,
379 ES::Target: 'static + EntropySource,
380 NS::Target: 'static + NodeSigner,
381 SP::Target: 'static + SignerProvider,
382 F::Target: 'static + FeeEstimator,
383 R::Target: 'static + Router,
384 L::Target: 'static + Logger,
385 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
386 CMH::Target: 'static + ChannelMessageHandler,
387 OMH::Target: 'static + OnionMessageHandler,
388 RMH::Target: 'static + RoutingMessageHandler,
389 UMH::Target: 'static + CustomMessageHandler,
390 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
392 let mut should_break = true;
393 let async_event_handler = |event| {
394 let network_graph = gossip_sync.network_graph();
395 let event_handler = &event_handler;
397 if let Some(network_graph) = network_graph {
398 handle_network_graph_update(network_graph, &event)
400 event_handler(event).await;
403 define_run_body!(persister,
404 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
405 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
406 gossip_sync, peer_manager, logger, scorer, should_break, {
408 _ = channel_manager.get_persistable_update_future().fuse() => true,
409 exit = sleeper(Duration::from_millis(100)).fuse() => {
414 }, |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
417 impl BackgroundProcessor {
418 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
421 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
422 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
423 /// either [`join`] or [`stop`].
425 /// # Data Persistence
427 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
428 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
429 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
430 /// provided implementation.
432 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
433 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
434 /// See the `lightning-persister` crate for LDK's provided implementation.
436 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
437 /// error or call [`join`] and handle any error that may arise. For the latter case,
438 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
442 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
443 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
444 /// functionality implemented by other handlers.
445 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
447 /// # Rapid Gossip Sync
449 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
450 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
451 /// until the [`RapidGossipSync`] instance completes its first sync.
453 /// [top-level documentation]: BackgroundProcessor
454 /// [`join`]: Self::join
455 /// [`stop`]: Self::stop
456 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
457 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
458 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
459 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
460 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
461 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
464 CA: 'static + Deref + Send + Sync,
465 CF: 'static + Deref + Send + Sync,
466 CW: 'static + Deref + Send + Sync,
467 T: 'static + Deref + Send + Sync,
468 ES: 'static + Deref + Send + Sync,
469 NS: 'static + Deref + Send + Sync,
470 SP: 'static + Deref + Send + Sync,
471 F: 'static + Deref + Send + Sync,
472 R: 'static + Deref + Send + Sync,
473 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
474 L: 'static + Deref + Send + Sync,
475 P: 'static + Deref + Send + Sync,
476 Descriptor: 'static + SocketDescriptor + Send + Sync,
477 CMH: 'static + Deref + Send + Sync,
478 OMH: 'static + Deref + Send + Sync,
479 RMH: 'static + Deref + Send + Sync,
480 EH: 'static + EventHandler + Send,
481 PS: 'static + Deref + Send,
482 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
483 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
484 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
485 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
486 UMH: 'static + Deref + Send + Sync,
487 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
488 S: 'static + Deref<Target = SC> + Send + Sync,
489 SC: WriteableScore<'a>,
491 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
492 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
495 CA::Target: 'static + chain::Access,
496 CF::Target: 'static + chain::Filter,
497 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
498 T::Target: 'static + BroadcasterInterface,
499 ES::Target: 'static + EntropySource,
500 NS::Target: 'static + NodeSigner,
501 SP::Target: 'static + SignerProvider,
502 F::Target: 'static + FeeEstimator,
503 R::Target: 'static + Router,
504 L::Target: 'static + Logger,
505 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
506 CMH::Target: 'static + ChannelMessageHandler,
507 OMH::Target: 'static + OnionMessageHandler,
508 RMH::Target: 'static + RoutingMessageHandler,
509 UMH::Target: 'static + CustomMessageHandler,
510 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
512 let stop_thread = Arc::new(AtomicBool::new(false));
513 let stop_thread_clone = stop_thread.clone();
514 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
515 let event_handler = |event| {
516 let network_graph = gossip_sync.network_graph();
517 if let Some(network_graph) = network_graph {
518 handle_network_graph_update(network_graph, &event)
520 event_handler.handle_event(event);
522 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
523 channel_manager, channel_manager.process_pending_events(&event_handler),
524 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
525 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
526 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
528 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
531 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
532 /// [`ChannelManager`].
536 /// This function panics if the background thread has panicked such as while persisting or
539 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
540 pub fn join(mut self) -> Result<(), std::io::Error> {
541 assert!(self.thread_handle.is_some());
545 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
546 /// [`ChannelManager`].
550 /// This function panics if the background thread has panicked such as while persisting or
553 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
554 pub fn stop(mut self) -> Result<(), std::io::Error> {
555 assert!(self.thread_handle.is_some());
556 self.stop_and_join_thread()
559 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
560 self.stop_thread.store(true, Ordering::Release);
564 fn join_thread(&mut self) -> Result<(), std::io::Error> {
565 match self.thread_handle.take() {
566 Some(handle) => handle.join().unwrap(),
572 impl Drop for BackgroundProcessor {
574 self.stop_and_join_thread().unwrap();
580 use bitcoin::blockdata::block::BlockHeader;
581 use bitcoin::blockdata::constants::genesis_block;
582 use bitcoin::blockdata::locktime::PackedLockTime;
583 use bitcoin::blockdata::transaction::{Transaction, TxOut};
584 use bitcoin::network::constants::Network;
585 use lightning::chain::{BestBlock, Confirm, chainmonitor};
586 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
587 use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysManager, NodeSigner};
588 use lightning::chain::transaction::OutPoint;
589 use lightning::get_event_msg;
590 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
591 use lightning::ln::features::ChannelFeatures;
592 use lightning::ln::msgs::{ChannelMessageHandler, Init};
593 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
594 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
595 use lightning::routing::router::DefaultRouter;
596 use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
597 use lightning::util::config::UserConfig;
598 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
599 use lightning::util::ser::Writeable;
600 use lightning::util::test_utils;
601 use lightning::util::persist::KVStorePersister;
602 use lightning_invoice::payment::{InvoicePayer, Retry};
603 use lightning_persister::FilesystemPersister;
605 use std::path::PathBuf;
606 use std::sync::{Arc, Mutex};
607 use std::sync::mpsc::SyncSender;
608 use std::time::Duration;
609 use bitcoin::hashes::Hash;
610 use bitcoin::TxMerkleNode;
611 use lightning_rapid_gossip_sync::RapidGossipSync;
612 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
614 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
616 #[derive(Clone, Hash, PartialEq, Eq)]
617 struct TestDescriptor{}
618 impl SocketDescriptor for TestDescriptor {
619 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
623 fn disconnect_socket(&mut self) {}
626 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
628 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
629 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
632 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
633 p2p_gossip_sync: PGS,
634 rapid_gossip_sync: RGS,
635 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
636 chain_monitor: Arc<ChainMonitor>,
637 persister: Arc<FilesystemPersister>,
638 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
639 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
640 logger: Arc<test_utils::TestLogger>,
641 best_block: BestBlock,
642 scorer: Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
646 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
647 GossipSync::P2P(self.p2p_gossip_sync.clone())
650 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
651 GossipSync::Rapid(self.rapid_gossip_sync.clone())
654 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
661 let data_dir = self.persister.get_data_dir();
662 match fs::remove_dir_all(data_dir.clone()) {
663 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
670 graph_error: Option<(std::io::ErrorKind, &'static str)>,
671 graph_persistence_notifier: Option<SyncSender<()>>,
672 manager_error: Option<(std::io::ErrorKind, &'static str)>,
673 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
674 filesystem_persister: FilesystemPersister,
678 fn new(data_dir: String) -> Self {
679 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
680 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
683 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
684 Self { graph_error: Some((error, message)), ..self }
687 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
688 Self { graph_persistence_notifier: Some(sender), ..self }
691 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
692 Self { manager_error: Some((error, message)), ..self }
695 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
696 Self { scorer_error: Some((error, message)), ..self }
700 impl KVStorePersister for Persister {
701 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
702 if key == "manager" {
703 if let Some((error, message)) = self.manager_error {
704 return Err(std::io::Error::new(error, message))
708 if key == "network_graph" {
709 if let Some(sender) = &self.graph_persistence_notifier {
710 sender.send(()).unwrap();
713 if let Some((error, message)) = self.graph_error {
714 return Err(std::io::Error::new(error, message))
719 if let Some((error, message)) = self.scorer_error {
720 return Err(std::io::Error::new(error, message))
724 self.filesystem_persister.persist(key, object)
728 fn get_full_filepath(filepath: String, filename: String) -> String {
729 let mut path = PathBuf::from(filepath);
731 path.to_str().unwrap().to_string()
734 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
735 let mut nodes = Vec::new();
736 for i in 0..num_nodes {
737 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
738 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
739 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
740 let network = Network::Testnet;
741 let genesis_block = genesis_block(network);
742 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
743 let params = ProbabilisticScoringParameters::default();
744 let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone())));
745 let seed = [i as u8; 32];
746 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
747 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
748 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
749 let now = Duration::from_secs(genesis_block.header.time as u64);
750 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
751 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
752 let best_block = BestBlock::from_genesis(network);
753 let params = ChainParameters { network, best_block };
754 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
755 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
756 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
757 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
758 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
759 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
763 for i in 0..num_nodes {
764 for j in (i+1)..num_nodes {
765 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
766 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
773 macro_rules! open_channel {
774 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
775 begin_open_channel!($node_a, $node_b, $channel_value);
776 let events = $node_a.node.get_and_clear_pending_events();
777 assert_eq!(events.len(), 1);
778 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
779 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
784 macro_rules! begin_open_channel {
785 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
786 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
787 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), $node_a.node.init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
788 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), $node_b.node.init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
792 macro_rules! handle_funding_generation_ready {
793 ($event: expr, $channel_value: expr) => {{
795 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
796 assert_eq!(channel_value_satoshis, $channel_value);
797 assert_eq!(user_channel_id, 42);
799 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
800 value: channel_value_satoshis, script_pubkey: output_script.clone(),
802 (temporary_channel_id, tx)
804 _ => panic!("Unexpected event"),
809 macro_rules! end_open_channel {
810 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
811 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
812 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
813 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
817 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
819 let prev_blockhash = node.best_block.block_hash();
820 let height = node.best_block.height() + 1;
821 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
822 let txdata = vec![(0, tx)];
823 node.best_block = BestBlock::new(header.block_hash(), height);
826 node.node.transactions_confirmed(&header, &txdata, height);
827 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
830 node.node.best_block_updated(&header, height);
831 node.chain_monitor.best_block_updated(&header, height);
837 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
838 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
842 fn test_background_processor() {
843 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
844 // updates. Also test that when new updates are available, the manager signals that it needs
845 // re-persistence and is successfully re-persisted.
846 let nodes = create_nodes(2, "test_background_processor".to_string());
848 // Go through the channel creation process so that each node has something to persist. Since
849 // open_channel consumes events, it must complete before starting BackgroundProcessor to
850 // avoid a race with processing events.
851 let tx = open_channel!(nodes[0], nodes[1], 100000);
853 // Initiate the background processors to watch each node.
854 let data_dir = nodes[0].persister.get_data_dir();
855 let persister = Arc::new(Persister::new(data_dir));
856 let event_handler = |_: _| {};
857 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
859 macro_rules! check_persisted_data {
860 ($node: expr, $filepath: expr) => {
861 let mut expected_bytes = Vec::new();
863 expected_bytes.clear();
864 match $node.write(&mut expected_bytes) {
866 match std::fs::read($filepath) {
868 if bytes == expected_bytes {
877 Err(e) => panic!("Unexpected error: {}", e)
883 // Check that the initial channel manager data is persisted as expected.
884 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
885 check_persisted_data!(nodes[0].node, filepath.clone());
888 if !nodes[0].node.get_persistence_condvar_value() { break }
891 // Force-close the channel.
892 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
894 // Check that the force-close updates are persisted.
895 check_persisted_data!(nodes[0].node, filepath.clone());
897 if !nodes[0].node.get_persistence_condvar_value() { break }
900 // Check network graph is persisted
901 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
902 check_persisted_data!(nodes[0].network_graph, filepath.clone());
904 // Check scorer is persisted
905 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
906 check_persisted_data!(nodes[0].scorer, filepath.clone());
908 assert!(bg_processor.stop().is_ok());
912 fn test_timer_tick_called() {
913 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
914 // `FRESHNESS_TIMER`.
915 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
916 let data_dir = nodes[0].persister.get_data_dir();
917 let persister = Arc::new(Persister::new(data_dir));
918 let event_handler = |_: _| {};
919 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
921 let log_entries = nodes[0].logger.lines.lock().unwrap();
922 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
923 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
924 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
925 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
930 assert!(bg_processor.stop().is_ok());
934 fn test_channel_manager_persist_error() {
935 // Test that if we encounter an error during manager persistence, the thread panics.
936 let nodes = create_nodes(2, "test_persist_error".to_string());
937 open_channel!(nodes[0], nodes[1], 100000);
939 let data_dir = nodes[0].persister.get_data_dir();
940 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
941 let event_handler = |_: _| {};
942 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
943 match bg_processor.join() {
944 Ok(_) => panic!("Expected error persisting manager"),
946 assert_eq!(e.kind(), std::io::ErrorKind::Other);
947 assert_eq!(e.get_ref().unwrap().to_string(), "test");
953 fn test_network_graph_persist_error() {
954 // Test that if we encounter an error during network graph persistence, an error gets returned.
955 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
956 let data_dir = nodes[0].persister.get_data_dir();
957 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
958 let event_handler = |_: _| {};
959 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
961 match bg_processor.stop() {
962 Ok(_) => panic!("Expected error persisting network graph"),
964 assert_eq!(e.kind(), std::io::ErrorKind::Other);
965 assert_eq!(e.get_ref().unwrap().to_string(), "test");
971 fn test_scorer_persist_error() {
972 // Test that if we encounter an error during scorer persistence, an error gets returned.
973 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
974 let data_dir = nodes[0].persister.get_data_dir();
975 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
976 let event_handler = |_: _| {};
977 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
979 match bg_processor.stop() {
980 Ok(_) => panic!("Expected error persisting scorer"),
982 assert_eq!(e.kind(), std::io::ErrorKind::Other);
983 assert_eq!(e.get_ref().unwrap().to_string(), "test");
989 fn test_background_event_handling() {
990 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
991 let channel_value = 100000;
992 let data_dir = nodes[0].persister.get_data_dir();
993 let persister = Arc::new(Persister::new(data_dir.clone()));
995 // Set up a background event handler for FundingGenerationReady events.
996 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
997 let event_handler = move |event: Event| match event {
998 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
999 Event::ChannelReady { .. } => {},
1000 _ => panic!("Unexpected event: {:?}", event),
1003 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1005 // Open a channel and check that the FundingGenerationReady event was handled.
1006 begin_open_channel!(nodes[0], nodes[1], channel_value);
1007 let (temporary_channel_id, funding_tx) = receiver
1008 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1009 .expect("FundingGenerationReady not handled within deadline");
1010 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1012 // Confirm the funding transaction.
1013 confirm_transaction(&mut nodes[0], &funding_tx);
1014 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1015 confirm_transaction(&mut nodes[1], &funding_tx);
1016 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1017 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1018 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1019 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1020 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1022 assert!(bg_processor.stop().is_ok());
1024 // Set up a background event handler for SpendableOutputs events.
1025 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1026 let event_handler = move |event: Event| match event {
1027 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1028 Event::ChannelReady { .. } => {},
1029 Event::ChannelClosed { .. } => {},
1030 _ => panic!("Unexpected event: {:?}", event),
1032 let persister = Arc::new(Persister::new(data_dir));
1033 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1035 // Force close the channel and check that the SpendableOutputs event was handled.
1036 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1037 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1038 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1040 let event = receiver
1041 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1042 .expect("Events not handled within deadline");
1044 Event::SpendableOutputs { .. } => {},
1045 _ => panic!("Unexpected event: {:?}", event),
1048 assert!(bg_processor.stop().is_ok());
1052 fn test_scorer_persistence() {
1053 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1054 let data_dir = nodes[0].persister.get_data_dir();
1055 let persister = Arc::new(Persister::new(data_dir));
1056 let event_handler = |_: _| {};
1057 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1060 let log_entries = nodes[0].logger.lines.lock().unwrap();
1061 let expected_log = "Persisting scorer".to_string();
1062 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1067 assert!(bg_processor.stop().is_ok());
1071 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1072 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1073 let data_dir = nodes[0].persister.get_data_dir();
1074 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1075 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1076 let network_graph = nodes[0].network_graph.clone();
1077 let features = ChannelFeatures::empty();
1078 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1079 .expect("Failed to update channel from partial announcement");
1080 let original_graph_description = network_graph.to_string();
1081 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1082 assert_eq!(network_graph.read_only().channels().len(), 1);
1084 let event_handler = |_: _| {};
1085 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1088 let log_entries = nodes[0].logger.lines.lock().unwrap();
1089 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1090 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1093 // Wait until the loop has gone around at least twice.
1098 let initialization_input = vec![
1099 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1100 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1101 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1102 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1103 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1104 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1105 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1106 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1107 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1108 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1109 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1110 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1111 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1113 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1115 // this should have added two channels
1116 assert_eq!(network_graph.read_only().channels().len(), 3);
1119 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1120 .expect("Network graph not pruned within deadline");
1122 background_processor.stop().unwrap();
1124 // all channels should now be pruned
1125 assert_eq!(network_graph.read_only().channels().len(), 0);
1129 fn test_invoice_payer() {
1130 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1131 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1132 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1134 // Initiate the background processors to watch each node.
1135 let data_dir = nodes[0].persister.get_data_dir();
1136 let persister = Arc::new(Persister::new(data_dir));
1137 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1138 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1139 let event_handler = Arc::clone(&invoice_payer);
1140 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1141 assert!(bg_processor.stop().is_ok());