1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[macro_use] extern crate lightning;
20 extern crate lightning_rapid_gossip_sync;
23 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
24 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
25 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
26 use lightning::ln::channelmanager::ChannelManager;
27 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
28 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
29 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
30 use lightning::routing::router::Router;
31 use lightning::routing::scoring::WriteableScore;
32 use lightning::util::events::{Event, EventHandler, EventsProvider};
33 use lightning::util::logger::Logger;
34 use lightning::util::persist::Persister;
35 use lightning_rapid_gossip_sync::RapidGossipSync;
39 use core::time::Duration;
41 #[cfg(feature = "std")]
43 #[cfg(feature = "std")]
44 use core::sync::atomic::{AtomicBool, Ordering};
45 #[cfg(feature = "std")]
46 use std::thread::{self, JoinHandle};
47 #[cfg(feature = "std")]
48 use std::time::Instant;
50 #[cfg(feature = "futures")]
51 use futures_util::{select_biased, future::FutureExt, task};
53 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
54 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
55 /// responsibilities are:
56 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
57 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
58 /// writing it to disk/backups by invoking the callback given to it at startup.
59 /// [`ChannelManager`] persistence should be done in the background.
60 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
61 /// at the appropriate intervals.
62 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
63 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
65 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
66 /// upon as doing so may result in high latency.
70 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
71 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
72 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
73 /// unilateral chain closure fees are at risk.
75 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
76 /// [`Event`]: lightning::util::events::Event
77 #[cfg(feature = "std")]
78 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
79 pub struct BackgroundProcessor {
80 stop_thread: Arc<AtomicBool>,
81 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
85 const FRESHNESS_TIMER: u64 = 60;
87 const FRESHNESS_TIMER: u64 = 1;
89 #[cfg(all(not(test), not(debug_assertions)))]
90 const PING_TIMER: u64 = 10;
91 /// Signature operations take a lot longer without compiler optimisations.
92 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
93 /// timeout is reached.
94 #[cfg(all(not(test), debug_assertions))]
95 const PING_TIMER: u64 = 30;
97 const PING_TIMER: u64 = 1;
99 /// Prune the network graph of stale entries hourly.
100 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
103 const SCORER_PERSIST_TIMER: u64 = 30;
105 const SCORER_PERSIST_TIMER: u64 = 1;
108 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
110 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
112 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
114 P: Deref<Target = P2PGossipSync<G, A, L>>,
115 R: Deref<Target = RapidGossipSync<G, L>>,
116 G: Deref<Target = NetworkGraph<L>>,
120 where A::Target: chain::Access, L::Target: Logger {
121 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
123 /// Rapid gossip sync from a trusted server.
130 P: Deref<Target = P2PGossipSync<G, A, L>>,
131 R: Deref<Target = RapidGossipSync<G, L>>,
132 G: Deref<Target = NetworkGraph<L>>,
135 > GossipSync<P, R, G, A, L>
136 where A::Target: chain::Access, L::Target: Logger {
137 fn network_graph(&self) -> Option<&G> {
139 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
140 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
141 GossipSync::None => None,
145 fn prunable_network_graph(&self) -> Option<&G> {
147 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
148 GossipSync::Rapid(gossip_sync) => {
149 if gossip_sync.is_initial_sync_complete() {
150 Some(gossip_sync.network_graph())
155 GossipSync::None => None,
160 /// (C-not exported) as the bindings concretize everything and have constructors for us
161 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
162 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
164 A::Target: chain::Access,
167 /// Initializes a new [`GossipSync::P2P`] variant.
168 pub fn p2p(gossip_sync: P) -> Self {
169 GossipSync::P2P(gossip_sync)
173 /// (C-not exported) as the bindings concretize everything and have constructors for us
174 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
176 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
179 &'a (dyn chain::Access + Send + Sync),
185 /// Initializes a new [`GossipSync::Rapid`] variant.
186 pub fn rapid(gossip_sync: R) -> Self {
187 GossipSync::Rapid(gossip_sync)
191 /// (C-not exported) as the bindings concretize everything and have constructors for us
194 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
195 &RapidGossipSync<&'a NetworkGraph<L>, L>,
197 &'a (dyn chain::Access + Send + Sync),
203 /// Initializes a new [`GossipSync::None`] variant.
204 pub fn none() -> Self {
209 fn handle_network_graph_update<L: Deref>(
210 network_graph: &NetworkGraph<L>, event: &Event
211 ) where L::Target: Logger {
212 if let Event::PaymentPathFailed { ref network_update, .. } = event {
213 if let Some(network_update) = network_update {
214 network_graph.handle_network_update(&network_update);
219 macro_rules! define_run_body {
220 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
221 $channel_manager: ident, $process_channel_manager_events: expr,
222 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
223 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
225 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
226 $channel_manager.timer_tick_occurred();
228 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
229 let mut last_ping_call = $get_timer(PING_TIMER);
230 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
231 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
232 let mut have_pruned = false;
235 $process_channel_manager_events;
236 $process_chain_monitor_events;
238 // Note that the PeerManager::process_events may block on ChannelManager's locks,
239 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
240 // we want to ensure we get into `persist_manager` as quickly as we can, especially
241 // without running the normal event processing above and handing events to users.
243 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
244 // processing a message effectively at any point during this loop. In order to
245 // minimize the time between such processing completing and persisting the updated
246 // ChannelManager, we want to minimize methods blocking on a ChannelManager
247 // generally, and as a fallback place such blocking only immediately before
249 $peer_manager.process_events();
251 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
252 // see `await_start`'s use below.
253 let mut await_start = $get_timer(1);
254 let updates_available = $await;
255 let await_slow = $timer_elapsed(&mut await_start, 1);
257 if updates_available {
258 log_trace!($logger, "Persisting ChannelManager...");
259 $persister.persist_manager(&*$channel_manager)?;
260 log_trace!($logger, "Done persisting ChannelManager.");
262 // Exit the loop if the background processor was requested to stop.
263 if $loop_exit_check {
264 log_trace!($logger, "Terminating background processor.");
267 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
268 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
269 $channel_manager.timer_tick_occurred();
270 last_freshness_call = $get_timer(FRESHNESS_TIMER);
273 // On various platforms, we may be starved of CPU cycles for several reasons.
274 // E.g. on iOS, if we've been in the background, we will be entirely paused.
275 // Similarly, if we're on a desktop platform and the device has been asleep, we
276 // may not get any cycles.
277 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
278 // full second, at which point we assume sockets may have been killed (they
279 // appear to be at least on some platforms, even if it has only been a second).
280 // Note that we have to take care to not get here just because user event
281 // processing was slow at the top of the loop. For example, the sample client
282 // may call Bitcoin Core RPCs during event handling, which very often takes
283 // more than a handful of seconds to complete, and shouldn't disconnect all our
285 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
286 $peer_manager.disconnect_all_peers();
287 last_ping_call = $get_timer(PING_TIMER);
288 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
289 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
290 $peer_manager.timer_tick_occurred();
291 last_ping_call = $get_timer(PING_TIMER);
294 // Note that we want to run a graph prune once not long after startup before
295 // falling back to our usual hourly prunes. This avoids short-lived clients never
296 // pruning their network graph. We run once 60 seconds after startup before
297 // continuing our normal cadence.
298 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
299 // The network graph must not be pruned while rapid sync completion is pending
300 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
301 #[cfg(feature = "std")] {
302 log_trace!($logger, "Pruning and persisting network graph.");
303 network_graph.remove_stale_channels_and_tracking();
305 #[cfg(not(feature = "std"))] {
306 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
307 log_trace!($logger, "Persisting network graph.");
310 if let Err(e) = $persister.persist_graph(network_graph) {
311 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
314 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
319 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
320 if let Some(ref scorer) = $scorer {
321 log_trace!($logger, "Persisting scorer");
322 if let Err(e) = $persister.persist_scorer(&scorer) {
323 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
326 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
330 // After we exit, ensure we persist the ChannelManager one final time - this avoids
331 // some races where users quit while channel updates were in-flight, with
332 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
333 $persister.persist_manager(&*$channel_manager)?;
335 // Persist Scorer on exit
336 if let Some(ref scorer) = $scorer {
337 $persister.persist_scorer(&scorer)?;
340 // Persist NetworkGraph on exit
341 if let Some(network_graph) = $gossip_sync.network_graph() {
342 $persister.persist_graph(network_graph)?;
349 /// Processes background events in a future.
351 /// `sleeper` should return a future which completes in the given amount of time and returns a
352 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
353 /// future which outputs true, the loop will exit and this function's future will complete.
355 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
357 /// Requires the `futures` feature. Note that while this method is available without the `std`
358 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
359 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
360 /// manually instead.
361 #[cfg(feature = "futures")]
362 pub async fn process_events_async<
364 CA: 'static + Deref + Send + Sync,
365 CF: 'static + Deref + Send + Sync,
366 CW: 'static + Deref + Send + Sync,
367 T: 'static + Deref + Send + Sync,
368 ES: 'static + Deref + Send + Sync,
369 NS: 'static + Deref + Send + Sync,
370 SP: 'static + Deref + Send + Sync,
371 F: 'static + Deref + Send + Sync,
372 R: 'static + Deref + Send + Sync,
373 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
374 L: 'static + Deref + Send + Sync,
375 P: 'static + Deref + Send + Sync,
376 Descriptor: 'static + SocketDescriptor + Send + Sync,
377 CMH: 'static + Deref + Send + Sync,
378 RMH: 'static + Deref + Send + Sync,
379 OMH: 'static + Deref + Send + Sync,
380 EventHandlerFuture: core::future::Future<Output = ()>,
381 EventHandler: Fn(Event) -> EventHandlerFuture,
382 PS: 'static + Deref + Send,
383 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
384 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
385 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
386 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
387 UMH: 'static + Deref + Send + Sync,
388 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
389 S: 'static + Deref<Target = SC> + Send + Sync,
390 SC: WriteableScore<'a>,
391 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
392 Sleeper: Fn(Duration) -> SleepFuture
394 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
395 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
397 ) -> Result<(), io::Error>
399 CA::Target: 'static + chain::Access,
400 CF::Target: 'static + chain::Filter,
401 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
402 T::Target: 'static + BroadcasterInterface,
403 ES::Target: 'static + EntropySource,
404 NS::Target: 'static + NodeSigner,
405 SP::Target: 'static + SignerProvider,
406 F::Target: 'static + FeeEstimator,
407 R::Target: 'static + Router,
408 L::Target: 'static + Logger,
409 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
410 CMH::Target: 'static + ChannelMessageHandler,
411 OMH::Target: 'static + OnionMessageHandler,
412 RMH::Target: 'static + RoutingMessageHandler,
413 UMH::Target: 'static + CustomMessageHandler,
414 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
416 let mut should_break = true;
417 let async_event_handler = |event| {
418 let network_graph = gossip_sync.network_graph();
419 let event_handler = &event_handler;
421 if let Some(network_graph) = network_graph {
422 handle_network_graph_update(network_graph, &event)
424 event_handler(event).await;
427 define_run_body!(persister,
428 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
429 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
430 gossip_sync, peer_manager, logger, scorer, should_break, {
432 _ = channel_manager.get_persistable_update_future().fuse() => true,
433 exit = sleeper(Duration::from_millis(100)).fuse() => {
438 }, |t| sleeper(Duration::from_secs(t)),
439 |fut: &mut SleepFuture, _| {
440 let mut waker = task::noop_waker();
441 let mut ctx = task::Context::from_waker(&mut waker);
442 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
446 #[cfg(feature = "std")]
447 impl BackgroundProcessor {
448 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
451 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
452 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
453 /// either [`join`] or [`stop`].
455 /// # Data Persistence
457 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
458 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
459 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
460 /// provided implementation.
462 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
463 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
464 /// See the `lightning-persister` crate for LDK's provided implementation.
466 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
467 /// error or call [`join`] and handle any error that may arise. For the latter case,
468 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
472 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
473 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
474 /// functionality implemented by other handlers.
475 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
477 /// # Rapid Gossip Sync
479 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
480 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
481 /// until the [`RapidGossipSync`] instance completes its first sync.
483 /// [top-level documentation]: BackgroundProcessor
484 /// [`join`]: Self::join
485 /// [`stop`]: Self::stop
486 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
487 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
488 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
489 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
490 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
491 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
494 CA: 'static + Deref + Send + Sync,
495 CF: 'static + Deref + Send + Sync,
496 CW: 'static + Deref + Send + Sync,
497 T: 'static + Deref + Send + Sync,
498 ES: 'static + Deref + Send + Sync,
499 NS: 'static + Deref + Send + Sync,
500 SP: 'static + Deref + Send + Sync,
501 F: 'static + Deref + Send + Sync,
502 R: 'static + Deref + Send + Sync,
503 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
504 L: 'static + Deref + Send + Sync,
505 P: 'static + Deref + Send + Sync,
506 Descriptor: 'static + SocketDescriptor + Send + Sync,
507 CMH: 'static + Deref + Send + Sync,
508 OMH: 'static + Deref + Send + Sync,
509 RMH: 'static + Deref + Send + Sync,
510 EH: 'static + EventHandler + Send,
511 PS: 'static + Deref + Send,
512 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
513 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
514 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
515 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
516 UMH: 'static + Deref + Send + Sync,
517 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
518 S: 'static + Deref<Target = SC> + Send + Sync,
519 SC: WriteableScore<'a>,
521 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
522 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
525 CA::Target: 'static + chain::Access,
526 CF::Target: 'static + chain::Filter,
527 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
528 T::Target: 'static + BroadcasterInterface,
529 ES::Target: 'static + EntropySource,
530 NS::Target: 'static + NodeSigner,
531 SP::Target: 'static + SignerProvider,
532 F::Target: 'static + FeeEstimator,
533 R::Target: 'static + Router,
534 L::Target: 'static + Logger,
535 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
536 CMH::Target: 'static + ChannelMessageHandler,
537 OMH::Target: 'static + OnionMessageHandler,
538 RMH::Target: 'static + RoutingMessageHandler,
539 UMH::Target: 'static + CustomMessageHandler,
540 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
542 let stop_thread = Arc::new(AtomicBool::new(false));
543 let stop_thread_clone = stop_thread.clone();
544 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
545 let event_handler = |event| {
546 let network_graph = gossip_sync.network_graph();
547 if let Some(network_graph) = network_graph {
548 handle_network_graph_update(network_graph, &event)
550 event_handler.handle_event(event);
552 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
553 channel_manager, channel_manager.process_pending_events(&event_handler),
554 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
555 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
556 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
558 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
561 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
562 /// [`ChannelManager`].
566 /// This function panics if the background thread has panicked such as while persisting or
569 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
570 pub fn join(mut self) -> Result<(), std::io::Error> {
571 assert!(self.thread_handle.is_some());
575 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
576 /// [`ChannelManager`].
580 /// This function panics if the background thread has panicked such as while persisting or
583 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
584 pub fn stop(mut self) -> Result<(), std::io::Error> {
585 assert!(self.thread_handle.is_some());
586 self.stop_and_join_thread()
589 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
590 self.stop_thread.store(true, Ordering::Release);
594 fn join_thread(&mut self) -> Result<(), std::io::Error> {
595 match self.thread_handle.take() {
596 Some(handle) => handle.join().unwrap(),
602 #[cfg(feature = "std")]
603 impl Drop for BackgroundProcessor {
605 self.stop_and_join_thread().unwrap();
609 #[cfg(all(feature = "std", test))]
611 use bitcoin::blockdata::block::BlockHeader;
612 use bitcoin::blockdata::constants::genesis_block;
613 use bitcoin::blockdata::locktime::PackedLockTime;
614 use bitcoin::blockdata::transaction::{Transaction, TxOut};
615 use bitcoin::network::constants::Network;
616 use lightning::chain::{BestBlock, Confirm, chainmonitor};
617 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
618 use lightning::chain::keysinterface::{InMemorySigner, EntropySource, KeysManager};
619 use lightning::chain::transaction::OutPoint;
620 use lightning::get_event_msg;
621 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
622 use lightning::ln::features::ChannelFeatures;
623 use lightning::ln::msgs::{ChannelMessageHandler, Init};
624 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
625 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
626 use lightning::routing::router::DefaultRouter;
627 use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
628 use lightning::util::config::UserConfig;
629 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
630 use lightning::util::ser::Writeable;
631 use lightning::util::test_utils;
632 use lightning::util::persist::KVStorePersister;
633 use lightning_invoice::payment::{InvoicePayer, Retry};
634 use lightning_persister::FilesystemPersister;
636 use std::path::PathBuf;
637 use std::sync::{Arc, Mutex};
638 use std::sync::mpsc::SyncSender;
639 use std::time::Duration;
640 use bitcoin::hashes::Hash;
641 use bitcoin::TxMerkleNode;
642 use lightning_rapid_gossip_sync::RapidGossipSync;
643 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
645 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
647 #[derive(Clone, Hash, PartialEq, Eq)]
648 struct TestDescriptor{}
649 impl SocketDescriptor for TestDescriptor {
650 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
654 fn disconnect_socket(&mut self) {}
657 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
659 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
660 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
663 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
664 p2p_gossip_sync: PGS,
665 rapid_gossip_sync: RGS,
666 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
667 chain_monitor: Arc<ChainMonitor>,
668 persister: Arc<FilesystemPersister>,
669 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
670 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
671 logger: Arc<test_utils::TestLogger>,
672 best_block: BestBlock,
673 scorer: Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
677 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
678 GossipSync::P2P(self.p2p_gossip_sync.clone())
681 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
682 GossipSync::Rapid(self.rapid_gossip_sync.clone())
685 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
692 let data_dir = self.persister.get_data_dir();
693 match fs::remove_dir_all(data_dir.clone()) {
694 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
701 graph_error: Option<(std::io::ErrorKind, &'static str)>,
702 graph_persistence_notifier: Option<SyncSender<()>>,
703 manager_error: Option<(std::io::ErrorKind, &'static str)>,
704 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
705 filesystem_persister: FilesystemPersister,
709 fn new(data_dir: String) -> Self {
710 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
711 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
714 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
715 Self { graph_error: Some((error, message)), ..self }
718 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
719 Self { graph_persistence_notifier: Some(sender), ..self }
722 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
723 Self { manager_error: Some((error, message)), ..self }
726 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
727 Self { scorer_error: Some((error, message)), ..self }
731 impl KVStorePersister for Persister {
732 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
733 if key == "manager" {
734 if let Some((error, message)) = self.manager_error {
735 return Err(std::io::Error::new(error, message))
739 if key == "network_graph" {
740 if let Some(sender) = &self.graph_persistence_notifier {
741 sender.send(()).unwrap();
744 if let Some((error, message)) = self.graph_error {
745 return Err(std::io::Error::new(error, message))
750 if let Some((error, message)) = self.scorer_error {
751 return Err(std::io::Error::new(error, message))
755 self.filesystem_persister.persist(key, object)
759 fn get_full_filepath(filepath: String, filename: String) -> String {
760 let mut path = PathBuf::from(filepath);
762 path.to_str().unwrap().to_string()
765 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
766 let mut nodes = Vec::new();
767 for i in 0..num_nodes {
768 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
769 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
770 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
771 let network = Network::Testnet;
772 let genesis_block = genesis_block(network);
773 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
774 let params = ProbabilisticScoringParameters::default();
775 let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone())));
776 let seed = [i as u8; 32];
777 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
778 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
779 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
780 let now = Duration::from_secs(genesis_block.header.time as u64);
781 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
782 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
783 let best_block = BestBlock::from_genesis(network);
784 let params = ChainParameters { network, best_block };
785 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
786 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
787 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
788 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
789 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
790 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
794 for i in 0..num_nodes {
795 for j in (i+1)..num_nodes {
796 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
797 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
804 macro_rules! open_channel {
805 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
806 begin_open_channel!($node_a, $node_b, $channel_value);
807 let events = $node_a.node.get_and_clear_pending_events();
808 assert_eq!(events.len(), 1);
809 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
810 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
815 macro_rules! begin_open_channel {
816 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
817 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
818 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
819 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
823 macro_rules! handle_funding_generation_ready {
824 ($event: expr, $channel_value: expr) => {{
826 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
827 assert_eq!(channel_value_satoshis, $channel_value);
828 assert_eq!(user_channel_id, 42);
830 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
831 value: channel_value_satoshis, script_pubkey: output_script.clone(),
833 (temporary_channel_id, tx)
835 _ => panic!("Unexpected event"),
840 macro_rules! end_open_channel {
841 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
842 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
843 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
844 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
848 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
850 let prev_blockhash = node.best_block.block_hash();
851 let height = node.best_block.height() + 1;
852 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
853 let txdata = vec![(0, tx)];
854 node.best_block = BestBlock::new(header.block_hash(), height);
857 node.node.transactions_confirmed(&header, &txdata, height);
858 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
861 node.node.best_block_updated(&header, height);
862 node.chain_monitor.best_block_updated(&header, height);
868 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
869 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
873 fn test_background_processor() {
874 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
875 // updates. Also test that when new updates are available, the manager signals that it needs
876 // re-persistence and is successfully re-persisted.
877 let nodes = create_nodes(2, "test_background_processor".to_string());
879 // Go through the channel creation process so that each node has something to persist. Since
880 // open_channel consumes events, it must complete before starting BackgroundProcessor to
881 // avoid a race with processing events.
882 let tx = open_channel!(nodes[0], nodes[1], 100000);
884 // Initiate the background processors to watch each node.
885 let data_dir = nodes[0].persister.get_data_dir();
886 let persister = Arc::new(Persister::new(data_dir));
887 let event_handler = |_: _| {};
888 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
890 macro_rules! check_persisted_data {
891 ($node: expr, $filepath: expr) => {
892 let mut expected_bytes = Vec::new();
894 expected_bytes.clear();
895 match $node.write(&mut expected_bytes) {
897 match std::fs::read($filepath) {
899 if bytes == expected_bytes {
908 Err(e) => panic!("Unexpected error: {}", e)
914 // Check that the initial channel manager data is persisted as expected.
915 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
916 check_persisted_data!(nodes[0].node, filepath.clone());
919 if !nodes[0].node.get_persistence_condvar_value() { break }
922 // Force-close the channel.
923 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
925 // Check that the force-close updates are persisted.
926 check_persisted_data!(nodes[0].node, filepath.clone());
928 if !nodes[0].node.get_persistence_condvar_value() { break }
931 // Check network graph is persisted
932 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
933 check_persisted_data!(nodes[0].network_graph, filepath.clone());
935 // Check scorer is persisted
936 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
937 check_persisted_data!(nodes[0].scorer, filepath.clone());
939 assert!(bg_processor.stop().is_ok());
943 fn test_timer_tick_called() {
944 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
945 // `FRESHNESS_TIMER`.
946 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
947 let data_dir = nodes[0].persister.get_data_dir();
948 let persister = Arc::new(Persister::new(data_dir));
949 let event_handler = |_: _| {};
950 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
952 let log_entries = nodes[0].logger.lines.lock().unwrap();
953 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
954 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
955 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
956 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
961 assert!(bg_processor.stop().is_ok());
965 fn test_channel_manager_persist_error() {
966 // Test that if we encounter an error during manager persistence, the thread panics.
967 let nodes = create_nodes(2, "test_persist_error".to_string());
968 open_channel!(nodes[0], nodes[1], 100000);
970 let data_dir = nodes[0].persister.get_data_dir();
971 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
972 let event_handler = |_: _| {};
973 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
974 match bg_processor.join() {
975 Ok(_) => panic!("Expected error persisting manager"),
977 assert_eq!(e.kind(), std::io::ErrorKind::Other);
978 assert_eq!(e.get_ref().unwrap().to_string(), "test");
984 fn test_network_graph_persist_error() {
985 // Test that if we encounter an error during network graph persistence, an error gets returned.
986 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
987 let data_dir = nodes[0].persister.get_data_dir();
988 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
989 let event_handler = |_: _| {};
990 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
992 match bg_processor.stop() {
993 Ok(_) => panic!("Expected error persisting network graph"),
995 assert_eq!(e.kind(), std::io::ErrorKind::Other);
996 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1002 fn test_scorer_persist_error() {
1003 // Test that if we encounter an error during scorer persistence, an error gets returned.
1004 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1005 let data_dir = nodes[0].persister.get_data_dir();
1006 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1007 let event_handler = |_: _| {};
1008 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1010 match bg_processor.stop() {
1011 Ok(_) => panic!("Expected error persisting scorer"),
1013 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1014 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1020 fn test_background_event_handling() {
1021 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1022 let channel_value = 100000;
1023 let data_dir = nodes[0].persister.get_data_dir();
1024 let persister = Arc::new(Persister::new(data_dir.clone()));
1026 // Set up a background event handler for FundingGenerationReady events.
1027 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1028 let event_handler = move |event: Event| match event {
1029 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1030 Event::ChannelReady { .. } => {},
1031 _ => panic!("Unexpected event: {:?}", event),
1034 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1036 // Open a channel and check that the FundingGenerationReady event was handled.
1037 begin_open_channel!(nodes[0], nodes[1], channel_value);
1038 let (temporary_channel_id, funding_tx) = receiver
1039 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1040 .expect("FundingGenerationReady not handled within deadline");
1041 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1043 // Confirm the funding transaction.
1044 confirm_transaction(&mut nodes[0], &funding_tx);
1045 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1046 confirm_transaction(&mut nodes[1], &funding_tx);
1047 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1048 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1049 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1050 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1051 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1053 assert!(bg_processor.stop().is_ok());
1055 // Set up a background event handler for SpendableOutputs events.
1056 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1057 let event_handler = move |event: Event| match event {
1058 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1059 Event::ChannelReady { .. } => {},
1060 Event::ChannelClosed { .. } => {},
1061 _ => panic!("Unexpected event: {:?}", event),
1063 let persister = Arc::new(Persister::new(data_dir));
1064 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1066 // Force close the channel and check that the SpendableOutputs event was handled.
1067 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1068 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1069 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1071 let event = receiver
1072 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1073 .expect("Events not handled within deadline");
1075 Event::SpendableOutputs { .. } => {},
1076 _ => panic!("Unexpected event: {:?}", event),
1079 assert!(bg_processor.stop().is_ok());
1083 fn test_scorer_persistence() {
1084 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1085 let data_dir = nodes[0].persister.get_data_dir();
1086 let persister = Arc::new(Persister::new(data_dir));
1087 let event_handler = |_: _| {};
1088 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1091 let log_entries = nodes[0].logger.lines.lock().unwrap();
1092 let expected_log = "Persisting scorer".to_string();
1093 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1098 assert!(bg_processor.stop().is_ok());
1102 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1103 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1104 let data_dir = nodes[0].persister.get_data_dir();
1105 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1106 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1107 let network_graph = nodes[0].network_graph.clone();
1108 let features = ChannelFeatures::empty();
1109 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1110 .expect("Failed to update channel from partial announcement");
1111 let original_graph_description = network_graph.to_string();
1112 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1113 assert_eq!(network_graph.read_only().channels().len(), 1);
1115 let event_handler = |_: _| {};
1116 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1119 let log_entries = nodes[0].logger.lines.lock().unwrap();
1120 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1121 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1124 // Wait until the loop has gone around at least twice.
1129 let initialization_input = vec![
1130 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1131 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1132 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1133 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1134 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1135 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1136 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1137 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1138 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1139 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1140 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1141 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1142 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1144 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1146 // this should have added two channels
1147 assert_eq!(network_graph.read_only().channels().len(), 3);
1150 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1151 .expect("Network graph not pruned within deadline");
1153 background_processor.stop().unwrap();
1155 // all channels should now be pruned
1156 assert_eq!(network_graph.read_only().channels().len(), 0);
1160 fn test_invoice_payer() {
1161 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1162 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1163 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1165 // Initiate the background processors to watch each node.
1166 let data_dir = nodes[0].persister.get_data_dir();
1167 let persister = Arc::new(Persister::new(data_dir));
1168 let router = Arc::new(DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer)));
1169 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1170 let event_handler = Arc::clone(&invoice_payer);
1171 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1172 assert!(bg_processor.stop().is_ok());