1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[macro_use] extern crate lightning;
20 extern crate lightning_rapid_gossip_sync;
23 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
24 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
25 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
26 use lightning::ln::channelmanager::ChannelManager;
27 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
28 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
29 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
30 use lightning::routing::router::Router;
31 use lightning::routing::scoring::WriteableScore;
32 use lightning::util::events::{Event, EventHandler, EventsProvider};
33 use lightning::util::logger::Logger;
34 use lightning::util::persist::Persister;
35 use lightning_rapid_gossip_sync::RapidGossipSync;
39 use core::time::Duration;
41 #[cfg(feature = "std")]
43 #[cfg(feature = "std")]
44 use core::sync::atomic::{AtomicBool, Ordering};
45 #[cfg(feature = "std")]
46 use std::thread::{self, JoinHandle};
47 #[cfg(feature = "std")]
48 use std::time::Instant;
50 #[cfg(feature = "futures")]
51 use futures_util::{select_biased, future::FutureExt, task};
53 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
54 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
55 /// responsibilities are:
56 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
57 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
58 /// writing it to disk/backups by invoking the callback given to it at startup.
59 /// [`ChannelManager`] persistence should be done in the background.
60 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
61 /// at the appropriate intervals.
62 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
63 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
65 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
66 /// upon as doing so may result in high latency.
70 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
71 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
72 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
73 /// unilateral chain closure fees are at risk.
75 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
76 /// [`Event`]: lightning::util::events::Event
77 #[cfg(feature = "std")]
78 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
79 pub struct BackgroundProcessor {
80 stop_thread: Arc<AtomicBool>,
81 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
85 const FRESHNESS_TIMER: u64 = 60;
87 const FRESHNESS_TIMER: u64 = 1;
89 #[cfg(all(not(test), not(debug_assertions)))]
90 const PING_TIMER: u64 = 10;
91 /// Signature operations take a lot longer without compiler optimisations.
92 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
93 /// timeout is reached.
94 #[cfg(all(not(test), debug_assertions))]
95 const PING_TIMER: u64 = 30;
97 const PING_TIMER: u64 = 1;
99 /// Prune the network graph of stale entries hourly.
100 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
103 const SCORER_PERSIST_TIMER: u64 = 30;
105 const SCORER_PERSIST_TIMER: u64 = 1;
108 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
110 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
112 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
114 P: Deref<Target = P2PGossipSync<G, A, L>>,
115 R: Deref<Target = RapidGossipSync<G, L>>,
116 G: Deref<Target = NetworkGraph<L>>,
120 where A::Target: chain::Access, L::Target: Logger {
121 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
123 /// Rapid gossip sync from a trusted server.
130 P: Deref<Target = P2PGossipSync<G, A, L>>,
131 R: Deref<Target = RapidGossipSync<G, L>>,
132 G: Deref<Target = NetworkGraph<L>>,
135 > GossipSync<P, R, G, A, L>
136 where A::Target: chain::Access, L::Target: Logger {
137 fn network_graph(&self) -> Option<&G> {
139 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
140 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
141 GossipSync::None => None,
145 fn prunable_network_graph(&self) -> Option<&G> {
147 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
148 GossipSync::Rapid(gossip_sync) => {
149 if gossip_sync.is_initial_sync_complete() {
150 Some(gossip_sync.network_graph())
155 GossipSync::None => None,
160 /// (C-not exported) as the bindings concretize everything and have constructors for us
161 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
162 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
164 A::Target: chain::Access,
167 /// Initializes a new [`GossipSync::P2P`] variant.
168 pub fn p2p(gossip_sync: P) -> Self {
169 GossipSync::P2P(gossip_sync)
173 /// (C-not exported) as the bindings concretize everything and have constructors for us
174 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
176 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
179 &'a (dyn chain::Access + Send + Sync),
185 /// Initializes a new [`GossipSync::Rapid`] variant.
186 pub fn rapid(gossip_sync: R) -> Self {
187 GossipSync::Rapid(gossip_sync)
191 /// (C-not exported) as the bindings concretize everything and have constructors for us
194 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
195 &RapidGossipSync<&'a NetworkGraph<L>, L>,
197 &'a (dyn chain::Access + Send + Sync),
203 /// Initializes a new [`GossipSync::None`] variant.
204 pub fn none() -> Self {
209 fn handle_network_graph_update<L: Deref>(
210 network_graph: &NetworkGraph<L>, event: &Event
211 ) where L::Target: Logger {
212 if let Event::PaymentPathFailed { ref network_update, .. } = event {
213 if let Some(network_update) = network_update {
214 network_graph.handle_network_update(&network_update);
219 macro_rules! define_run_body {
220 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
221 $channel_manager: ident, $process_channel_manager_events: expr,
222 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
223 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
225 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
226 $channel_manager.timer_tick_occurred();
228 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
229 let mut last_ping_call = $get_timer(PING_TIMER);
230 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
231 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
232 let mut have_pruned = false;
235 $process_channel_manager_events;
236 $process_chain_monitor_events;
238 // Note that the PeerManager::process_events may block on ChannelManager's locks,
239 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
240 // we want to ensure we get into `persist_manager` as quickly as we can, especially
241 // without running the normal event processing above and handing events to users.
243 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
244 // processing a message effectively at any point during this loop. In order to
245 // minimize the time between such processing completing and persisting the updated
246 // ChannelManager, we want to minimize methods blocking on a ChannelManager
247 // generally, and as a fallback place such blocking only immediately before
249 $peer_manager.process_events();
251 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
252 // see `await_start`'s use below.
253 let mut await_start = $get_timer(1);
254 let updates_available = $await;
255 let await_slow = $timer_elapsed(&mut await_start, 1);
257 if updates_available {
258 log_trace!($logger, "Persisting ChannelManager...");
259 $persister.persist_manager(&*$channel_manager)?;
260 log_trace!($logger, "Done persisting ChannelManager.");
262 // Exit the loop if the background processor was requested to stop.
263 if $loop_exit_check {
264 log_trace!($logger, "Terminating background processor.");
267 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
268 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
269 $channel_manager.timer_tick_occurred();
270 last_freshness_call = $get_timer(FRESHNESS_TIMER);
273 // On various platforms, we may be starved of CPU cycles for several reasons.
274 // E.g. on iOS, if we've been in the background, we will be entirely paused.
275 // Similarly, if we're on a desktop platform and the device has been asleep, we
276 // may not get any cycles.
277 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
278 // full second, at which point we assume sockets may have been killed (they
279 // appear to be at least on some platforms, even if it has only been a second).
280 // Note that we have to take care to not get here just because user event
281 // processing was slow at the top of the loop. For example, the sample client
282 // may call Bitcoin Core RPCs during event handling, which very often takes
283 // more than a handful of seconds to complete, and shouldn't disconnect all our
285 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
286 $peer_manager.disconnect_all_peers();
287 last_ping_call = $get_timer(PING_TIMER);
288 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
289 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
290 $peer_manager.timer_tick_occurred();
291 last_ping_call = $get_timer(PING_TIMER);
294 // Note that we want to run a graph prune once not long after startup before
295 // falling back to our usual hourly prunes. This avoids short-lived clients never
296 // pruning their network graph. We run once 60 seconds after startup before
297 // continuing our normal cadence.
298 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
299 // The network graph must not be pruned while rapid sync completion is pending
300 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
301 #[cfg(feature = "std")] {
302 log_trace!($logger, "Pruning and persisting network graph.");
303 network_graph.remove_stale_channels_and_tracking();
305 #[cfg(not(feature = "std"))] {
306 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
307 log_trace!($logger, "Persisting network graph.");
310 if let Err(e) = $persister.persist_graph(network_graph) {
311 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
314 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
319 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
320 if let Some(ref scorer) = $scorer {
321 log_trace!($logger, "Persisting scorer");
322 if let Err(e) = $persister.persist_scorer(&scorer) {
323 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
326 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
330 // After we exit, ensure we persist the ChannelManager one final time - this avoids
331 // some races where users quit while channel updates were in-flight, with
332 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
333 $persister.persist_manager(&*$channel_manager)?;
335 // Persist Scorer on exit
336 if let Some(ref scorer) = $scorer {
337 $persister.persist_scorer(&scorer)?;
340 // Persist NetworkGraph on exit
341 if let Some(network_graph) = $gossip_sync.network_graph() {
342 $persister.persist_graph(network_graph)?;
349 /// Processes background events in a future.
351 /// `sleeper` should return a future which completes in the given amount of time and returns a
352 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
353 /// future which outputs true, the loop will exit and this function's future will complete.
355 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
357 /// Requires the `futures` feature. Note that while this method is available without the `std`
358 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
359 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
360 /// manually instead.
361 #[cfg(feature = "futures")]
362 pub async fn process_events_async<
364 CA: 'static + Deref + Send + Sync,
365 CF: 'static + Deref + Send + Sync,
366 CW: 'static + Deref + Send + Sync,
367 T: 'static + Deref + Send + Sync,
368 ES: 'static + Deref + Send + Sync,
369 NS: 'static + Deref + Send + Sync,
370 SP: 'static + Deref + Send + Sync,
371 F: 'static + Deref + Send + Sync,
372 R: 'static + Deref + Send + Sync,
373 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
374 L: 'static + Deref + Send + Sync,
375 P: 'static + Deref + Send + Sync,
376 Descriptor: 'static + SocketDescriptor + Send + Sync,
377 CMH: 'static + Deref + Send + Sync,
378 RMH: 'static + Deref + Send + Sync,
379 OMH: 'static + Deref + Send + Sync,
380 EventHandlerFuture: core::future::Future<Output = ()>,
381 EventHandler: Fn(Event) -> EventHandlerFuture,
382 PS: 'static + Deref + Send,
383 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
384 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
385 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
386 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
387 UMH: 'static + Deref + Send + Sync,
388 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
389 S: 'static + Deref<Target = SC> + Send + Sync,
390 SC: WriteableScore<'a>,
391 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
392 Sleeper: Fn(Duration) -> SleepFuture
394 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
395 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
397 ) -> Result<(), io::Error>
399 CA::Target: 'static + chain::Access,
400 CF::Target: 'static + chain::Filter,
401 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
402 T::Target: 'static + BroadcasterInterface,
403 ES::Target: 'static + EntropySource,
404 NS::Target: 'static + NodeSigner,
405 SP::Target: 'static + SignerProvider,
406 F::Target: 'static + FeeEstimator,
407 R::Target: 'static + Router,
408 L::Target: 'static + Logger,
409 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
410 CMH::Target: 'static + ChannelMessageHandler,
411 OMH::Target: 'static + OnionMessageHandler,
412 RMH::Target: 'static + RoutingMessageHandler,
413 UMH::Target: 'static + CustomMessageHandler,
414 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
416 let mut should_break = true;
417 let async_event_handler = |event| {
418 let network_graph = gossip_sync.network_graph();
419 let event_handler = &event_handler;
421 if let Some(network_graph) = network_graph {
422 handle_network_graph_update(network_graph, &event)
424 event_handler(event).await;
427 define_run_body!(persister,
428 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
429 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
430 gossip_sync, peer_manager, logger, scorer, should_break, {
432 _ = channel_manager.get_persistable_update_future().fuse() => true,
433 exit = sleeper(Duration::from_millis(100)).fuse() => {
438 }, |t| sleeper(Duration::from_secs(t)),
439 |fut: &mut SleepFuture, _| {
440 let mut waker = task::noop_waker();
441 let mut ctx = task::Context::from_waker(&mut waker);
442 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
446 #[cfg(feature = "std")]
447 impl BackgroundProcessor {
448 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
451 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
452 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
453 /// either [`join`] or [`stop`].
455 /// # Data Persistence
457 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
458 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
459 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
460 /// provided implementation.
462 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
463 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
464 /// See the `lightning-persister` crate for LDK's provided implementation.
466 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
467 /// error or call [`join`] and handle any error that may arise. For the latter case,
468 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
472 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
473 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
474 /// functionality implemented by other handlers.
475 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
477 /// # Rapid Gossip Sync
479 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
480 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
481 /// until the [`RapidGossipSync`] instance completes its first sync.
483 /// [top-level documentation]: BackgroundProcessor
484 /// [`join`]: Self::join
485 /// [`stop`]: Self::stop
486 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
487 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
488 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
489 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
490 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
491 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
494 CA: 'static + Deref + Send + Sync,
495 CF: 'static + Deref + Send + Sync,
496 CW: 'static + Deref + Send + Sync,
497 T: 'static + Deref + Send + Sync,
498 ES: 'static + Deref + Send + Sync,
499 NS: 'static + Deref + Send + Sync,
500 SP: 'static + Deref + Send + Sync,
501 F: 'static + Deref + Send + Sync,
502 R: 'static + Deref + Send + Sync,
503 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
504 L: 'static + Deref + Send + Sync,
505 P: 'static + Deref + Send + Sync,
506 Descriptor: 'static + SocketDescriptor + Send + Sync,
507 CMH: 'static + Deref + Send + Sync,
508 OMH: 'static + Deref + Send + Sync,
509 RMH: 'static + Deref + Send + Sync,
510 EH: 'static + EventHandler + Send,
511 PS: 'static + Deref + Send,
512 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
513 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
514 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
515 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
516 UMH: 'static + Deref + Send + Sync,
517 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
518 S: 'static + Deref<Target = SC> + Send + Sync,
519 SC: WriteableScore<'a>,
521 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
522 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
525 CA::Target: 'static + chain::Access,
526 CF::Target: 'static + chain::Filter,
527 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
528 T::Target: 'static + BroadcasterInterface,
529 ES::Target: 'static + EntropySource,
530 NS::Target: 'static + NodeSigner,
531 SP::Target: 'static + SignerProvider,
532 F::Target: 'static + FeeEstimator,
533 R::Target: 'static + Router,
534 L::Target: 'static + Logger,
535 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
536 CMH::Target: 'static + ChannelMessageHandler,
537 OMH::Target: 'static + OnionMessageHandler,
538 RMH::Target: 'static + RoutingMessageHandler,
539 UMH::Target: 'static + CustomMessageHandler,
540 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
542 let stop_thread = Arc::new(AtomicBool::new(false));
543 let stop_thread_clone = stop_thread.clone();
544 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
545 let event_handler = |event| {
546 let network_graph = gossip_sync.network_graph();
547 if let Some(network_graph) = network_graph {
548 handle_network_graph_update(network_graph, &event)
550 event_handler.handle_event(event);
552 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
553 channel_manager, channel_manager.process_pending_events(&event_handler),
554 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
555 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
556 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
558 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
561 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
562 /// [`ChannelManager`].
566 /// This function panics if the background thread has panicked such as while persisting or
569 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
570 pub fn join(mut self) -> Result<(), std::io::Error> {
571 assert!(self.thread_handle.is_some());
575 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
576 /// [`ChannelManager`].
580 /// This function panics if the background thread has panicked such as while persisting or
583 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
584 pub fn stop(mut self) -> Result<(), std::io::Error> {
585 assert!(self.thread_handle.is_some());
586 self.stop_and_join_thread()
589 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
590 self.stop_thread.store(true, Ordering::Release);
594 fn join_thread(&mut self) -> Result<(), std::io::Error> {
595 match self.thread_handle.take() {
596 Some(handle) => handle.join().unwrap(),
602 #[cfg(feature = "std")]
603 impl Drop for BackgroundProcessor {
605 self.stop_and_join_thread().unwrap();
609 #[cfg(all(feature = "std", test))]
611 use bitcoin::blockdata::block::BlockHeader;
612 use bitcoin::blockdata::constants::genesis_block;
613 use bitcoin::blockdata::locktime::PackedLockTime;
614 use bitcoin::blockdata::transaction::{Transaction, TxOut};
615 use bitcoin::network::constants::Network;
616 use lightning::chain::{BestBlock, Confirm, chainmonitor};
617 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
618 use lightning::chain::keysinterface::{InMemorySigner, EntropySource, KeysManager};
619 use lightning::chain::transaction::OutPoint;
620 use lightning::get_event_msg;
621 use lightning::ln::channelmanager;
622 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters};
623 use lightning::ln::features::ChannelFeatures;
624 use lightning::ln::msgs::{ChannelMessageHandler, Init};
625 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
626 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
627 use lightning::routing::router::{DefaultRouter, RouteHop};
628 use lightning::routing::scoring::{ChannelUsage, Score};
629 use lightning::util::config::UserConfig;
630 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
631 use lightning::util::ser::Writeable;
632 use lightning::util::test_utils;
633 use lightning::util::persist::KVStorePersister;
634 use lightning_invoice::payment::{InvoicePayer, Retry};
635 use lightning_persister::FilesystemPersister;
636 use std::collections::VecDeque;
638 use std::path::PathBuf;
639 use std::sync::{Arc, Mutex};
640 use std::sync::mpsc::SyncSender;
641 use std::time::Duration;
642 use bitcoin::hashes::Hash;
643 use bitcoin::TxMerkleNode;
644 use lightning_rapid_gossip_sync::RapidGossipSync;
645 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
647 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
649 #[derive(Clone, Hash, PartialEq, Eq)]
650 struct TestDescriptor{}
651 impl SocketDescriptor for TestDescriptor {
652 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
656 fn disconnect_socket(&mut self) {}
659 type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
661 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
663 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
664 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
667 node: Arc<ChannelManager>,
668 p2p_gossip_sync: PGS,
669 rapid_gossip_sync: RGS,
670 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
671 chain_monitor: Arc<ChainMonitor>,
672 persister: Arc<FilesystemPersister>,
673 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
674 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
675 logger: Arc<test_utils::TestLogger>,
676 best_block: BestBlock,
677 scorer: Arc<Mutex<TestScorer>>,
681 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
682 GossipSync::P2P(self.p2p_gossip_sync.clone())
685 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
686 GossipSync::Rapid(self.rapid_gossip_sync.clone())
689 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
696 let data_dir = self.persister.get_data_dir();
697 match fs::remove_dir_all(data_dir.clone()) {
698 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
705 graph_error: Option<(std::io::ErrorKind, &'static str)>,
706 graph_persistence_notifier: Option<SyncSender<()>>,
707 manager_error: Option<(std::io::ErrorKind, &'static str)>,
708 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
709 filesystem_persister: FilesystemPersister,
713 fn new(data_dir: String) -> Self {
714 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
715 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
718 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
719 Self { graph_error: Some((error, message)), ..self }
722 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
723 Self { graph_persistence_notifier: Some(sender), ..self }
726 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
727 Self { manager_error: Some((error, message)), ..self }
730 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
731 Self { scorer_error: Some((error, message)), ..self }
735 impl KVStorePersister for Persister {
736 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
737 if key == "manager" {
738 if let Some((error, message)) = self.manager_error {
739 return Err(std::io::Error::new(error, message))
743 if key == "network_graph" {
744 if let Some(sender) = &self.graph_persistence_notifier {
745 sender.send(()).unwrap();
748 if let Some((error, message)) = self.graph_error {
749 return Err(std::io::Error::new(error, message))
754 if let Some((error, message)) = self.scorer_error {
755 return Err(std::io::Error::new(error, message))
759 self.filesystem_persister.persist(key, object)
764 event_expectations: Option<VecDeque<TestResult>>,
769 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
770 PaymentSuccess { path: Vec<RouteHop> },
771 ProbeFailure { path: Vec<RouteHop> },
772 ProbeSuccess { path: Vec<RouteHop> },
777 Self { event_expectations: None }
780 fn expect(&mut self, expectation: TestResult) {
781 self.event_expectations.get_or_insert_with(|| VecDeque::new()).push_back(expectation);
785 impl lightning::util::ser::Writeable for TestScorer {
786 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
789 impl Score for TestScorer {
790 fn channel_penalty_msat(
791 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
792 ) -> u64 { unimplemented!(); }
794 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
795 if let Some(expectations) = &mut self.event_expectations {
796 match expectations.pop_front().unwrap() {
797 TestResult::PaymentFailure { path, short_channel_id } => {
798 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
799 assert_eq!(actual_short_channel_id, short_channel_id);
801 TestResult::PaymentSuccess { path } => {
802 panic!("Unexpected successful payment path: {:?}", path)
804 TestResult::ProbeFailure { path } => {
805 panic!("Unexpected probe failure: {:?}", path)
807 TestResult::ProbeSuccess { path } => {
808 panic!("Unexpected probe success: {:?}", path)
814 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
815 if let Some(expectations) = &mut self.event_expectations {
816 match expectations.pop_front().unwrap() {
817 TestResult::PaymentFailure { path, .. } => {
818 panic!("Unexpected payment path failure: {:?}", path)
820 TestResult::PaymentSuccess { path } => {
821 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
823 TestResult::ProbeFailure { path } => {
824 panic!("Unexpected probe failure: {:?}", path)
826 TestResult::ProbeSuccess { path } => {
827 panic!("Unexpected probe success: {:?}", path)
833 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
834 if let Some(expectations) = &mut self.event_expectations {
835 match expectations.pop_front().unwrap() {
836 TestResult::PaymentFailure { path, .. } => {
837 panic!("Unexpected payment path failure: {:?}", path)
839 TestResult::PaymentSuccess { path } => {
840 panic!("Unexpected payment path success: {:?}", path)
842 TestResult::ProbeFailure { path } => {
843 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
845 TestResult::ProbeSuccess { path } => {
846 panic!("Unexpected probe success: {:?}", path)
851 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
852 if let Some(expectations) = &mut self.event_expectations {
853 match expectations.pop_front().unwrap() {
854 TestResult::PaymentFailure { path, .. } => {
855 panic!("Unexpected payment path failure: {:?}", path)
857 TestResult::PaymentSuccess { path } => {
858 panic!("Unexpected payment path success: {:?}", path)
860 TestResult::ProbeFailure { path } => {
861 panic!("Unexpected probe failure: {:?}", path)
863 TestResult::ProbeSuccess { path } => {
864 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
871 impl Drop for TestScorer {
873 if std::thread::panicking() {
877 if let Some(event_expectations) = &self.event_expectations {
878 if !event_expectations.is_empty() {
879 panic!("Unsatisfied event expectations: {:?}", event_expectations);
885 fn get_full_filepath(filepath: String, filename: String) -> String {
886 let mut path = PathBuf::from(filepath);
888 path.to_str().unwrap().to_string()
891 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
892 let mut nodes = Vec::new();
893 for i in 0..num_nodes {
894 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
895 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
896 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
897 let network = Network::Testnet;
898 let genesis_block = genesis_block(network);
899 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
900 let scorer = Arc::new(Mutex::new(TestScorer::new()));
901 let seed = [i as u8; 32];
902 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
903 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
904 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
905 let now = Duration::from_secs(genesis_block.header.time as u64);
906 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
907 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
908 let best_block = BestBlock::from_genesis(network);
909 let params = ChainParameters { network, best_block };
910 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
911 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
912 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
913 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
914 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
915 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
919 for i in 0..num_nodes {
920 for j in (i+1)..num_nodes {
921 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
922 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
929 macro_rules! open_channel {
930 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
931 begin_open_channel!($node_a, $node_b, $channel_value);
932 let events = $node_a.node.get_and_clear_pending_events();
933 assert_eq!(events.len(), 1);
934 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
935 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
940 macro_rules! begin_open_channel {
941 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
942 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
943 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
944 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
948 macro_rules! handle_funding_generation_ready {
949 ($event: expr, $channel_value: expr) => {{
951 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
952 assert_eq!(channel_value_satoshis, $channel_value);
953 assert_eq!(user_channel_id, 42);
955 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
956 value: channel_value_satoshis, script_pubkey: output_script.clone(),
958 (temporary_channel_id, tx)
960 _ => panic!("Unexpected event"),
965 macro_rules! end_open_channel {
966 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
967 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
968 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
969 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
973 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
975 let prev_blockhash = node.best_block.block_hash();
976 let height = node.best_block.height() + 1;
977 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
978 let txdata = vec![(0, tx)];
979 node.best_block = BestBlock::new(header.block_hash(), height);
982 node.node.transactions_confirmed(&header, &txdata, height);
983 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
986 node.node.best_block_updated(&header, height);
987 node.chain_monitor.best_block_updated(&header, height);
993 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
994 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
998 fn test_background_processor() {
999 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1000 // updates. Also test that when new updates are available, the manager signals that it needs
1001 // re-persistence and is successfully re-persisted.
1002 let nodes = create_nodes(2, "test_background_processor".to_string());
1004 // Go through the channel creation process so that each node has something to persist. Since
1005 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1006 // avoid a race with processing events.
1007 let tx = open_channel!(nodes[0], nodes[1], 100000);
1009 // Initiate the background processors to watch each node.
1010 let data_dir = nodes[0].persister.get_data_dir();
1011 let persister = Arc::new(Persister::new(data_dir));
1012 let event_handler = |_: _| {};
1013 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1015 macro_rules! check_persisted_data {
1016 ($node: expr, $filepath: expr) => {
1017 let mut expected_bytes = Vec::new();
1019 expected_bytes.clear();
1020 match $node.write(&mut expected_bytes) {
1022 match std::fs::read($filepath) {
1024 if bytes == expected_bytes {
1033 Err(e) => panic!("Unexpected error: {}", e)
1039 // Check that the initial channel manager data is persisted as expected.
1040 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1041 check_persisted_data!(nodes[0].node, filepath.clone());
1044 if !nodes[0].node.get_persistence_condvar_value() { break }
1047 // Force-close the channel.
1048 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1050 // Check that the force-close updates are persisted.
1051 check_persisted_data!(nodes[0].node, filepath.clone());
1053 if !nodes[0].node.get_persistence_condvar_value() { break }
1056 // Check network graph is persisted
1057 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1058 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1060 // Check scorer is persisted
1061 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1062 check_persisted_data!(nodes[0].scorer, filepath.clone());
1064 assert!(bg_processor.stop().is_ok());
1068 fn test_timer_tick_called() {
1069 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1070 // `FRESHNESS_TIMER`.
1071 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1072 let data_dir = nodes[0].persister.get_data_dir();
1073 let persister = Arc::new(Persister::new(data_dir));
1074 let event_handler = |_: _| {};
1075 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1077 let log_entries = nodes[0].logger.lines.lock().unwrap();
1078 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1079 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1080 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1081 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1086 assert!(bg_processor.stop().is_ok());
1090 fn test_channel_manager_persist_error() {
1091 // Test that if we encounter an error during manager persistence, the thread panics.
1092 let nodes = create_nodes(2, "test_persist_error".to_string());
1093 open_channel!(nodes[0], nodes[1], 100000);
1095 let data_dir = nodes[0].persister.get_data_dir();
1096 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1097 let event_handler = |_: _| {};
1098 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1099 match bg_processor.join() {
1100 Ok(_) => panic!("Expected error persisting manager"),
1102 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1103 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1109 fn test_network_graph_persist_error() {
1110 // Test that if we encounter an error during network graph persistence, an error gets returned.
1111 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1112 let data_dir = nodes[0].persister.get_data_dir();
1113 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1114 let event_handler = |_: _| {};
1115 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1117 match bg_processor.stop() {
1118 Ok(_) => panic!("Expected error persisting network graph"),
1120 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1121 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1127 fn test_scorer_persist_error() {
1128 // Test that if we encounter an error during scorer persistence, an error gets returned.
1129 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1130 let data_dir = nodes[0].persister.get_data_dir();
1131 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1132 let event_handler = |_: _| {};
1133 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1135 match bg_processor.stop() {
1136 Ok(_) => panic!("Expected error persisting scorer"),
1138 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1139 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1145 fn test_background_event_handling() {
1146 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1147 let channel_value = 100000;
1148 let data_dir = nodes[0].persister.get_data_dir();
1149 let persister = Arc::new(Persister::new(data_dir.clone()));
1151 // Set up a background event handler for FundingGenerationReady events.
1152 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1153 let event_handler = move |event: Event| match event {
1154 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1155 Event::ChannelReady { .. } => {},
1156 _ => panic!("Unexpected event: {:?}", event),
1159 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1161 // Open a channel and check that the FundingGenerationReady event was handled.
1162 begin_open_channel!(nodes[0], nodes[1], channel_value);
1163 let (temporary_channel_id, funding_tx) = receiver
1164 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1165 .expect("FundingGenerationReady not handled within deadline");
1166 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1168 // Confirm the funding transaction.
1169 confirm_transaction(&mut nodes[0], &funding_tx);
1170 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1171 confirm_transaction(&mut nodes[1], &funding_tx);
1172 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1173 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1174 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1175 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1176 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1178 assert!(bg_processor.stop().is_ok());
1180 // Set up a background event handler for SpendableOutputs events.
1181 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1182 let event_handler = move |event: Event| match event {
1183 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1184 Event::ChannelReady { .. } => {},
1185 Event::ChannelClosed { .. } => {},
1186 _ => panic!("Unexpected event: {:?}", event),
1188 let persister = Arc::new(Persister::new(data_dir));
1189 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1191 // Force close the channel and check that the SpendableOutputs event was handled.
1192 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1193 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1194 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1196 let event = receiver
1197 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1198 .expect("Events not handled within deadline");
1200 Event::SpendableOutputs { .. } => {},
1201 _ => panic!("Unexpected event: {:?}", event),
1204 assert!(bg_processor.stop().is_ok());
1208 fn test_scorer_persistence() {
1209 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1210 let data_dir = nodes[0].persister.get_data_dir();
1211 let persister = Arc::new(Persister::new(data_dir));
1212 let event_handler = |_: _| {};
1213 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1216 let log_entries = nodes[0].logger.lines.lock().unwrap();
1217 let expected_log = "Persisting scorer".to_string();
1218 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1223 assert!(bg_processor.stop().is_ok());
1227 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1228 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1229 let data_dir = nodes[0].persister.get_data_dir();
1230 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1231 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1232 let network_graph = nodes[0].network_graph.clone();
1233 let features = ChannelFeatures::empty();
1234 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1235 .expect("Failed to update channel from partial announcement");
1236 let original_graph_description = network_graph.to_string();
1237 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1238 assert_eq!(network_graph.read_only().channels().len(), 1);
1240 let event_handler = |_: _| {};
1241 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1244 let log_entries = nodes[0].logger.lines.lock().unwrap();
1245 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1246 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1249 // Wait until the loop has gone around at least twice.
1254 let initialization_input = vec![
1255 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1256 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1257 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1258 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1259 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1260 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1261 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1262 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1263 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1264 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1265 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1266 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1267 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1269 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1271 // this should have added two channels
1272 assert_eq!(network_graph.read_only().channels().len(), 3);
1275 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1276 .expect("Network graph not pruned within deadline");
1278 background_processor.stop().unwrap();
1280 // all channels should now be pruned
1281 assert_eq!(network_graph.read_only().channels().len(), 0);
1285 fn test_invoice_payer() {
1286 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1287 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1288 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1290 // Initiate the background processors to watch each node.
1291 let data_dir = nodes[0].persister.get_data_dir();
1292 let persister = Arc::new(Persister::new(data_dir));
1293 let router = Arc::new(DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer)));
1294 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1295 let event_handler = Arc::clone(&invoice_payer);
1296 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1297 assert!(bg_processor.stop().is_ok());