1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[cfg(not(feature = "std"))]
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::ln::channelmanager::ChannelManager;
30 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
31 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
32 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
33 use lightning::routing::router::Router;
34 use lightning::routing::scoring::{Score, WriteableScore};
35 use lightning::util::events::{Event, EventHandler, EventsProvider};
36 use lightning::util::logger::Logger;
37 use lightning::util::persist::Persister;
38 use lightning_rapid_gossip_sync::RapidGossipSync;
42 use core::time::Duration;
44 #[cfg(feature = "std")]
46 #[cfg(feature = "std")]
47 use core::sync::atomic::{AtomicBool, Ordering};
48 #[cfg(feature = "std")]
49 use std::thread::{self, JoinHandle};
50 #[cfg(feature = "std")]
51 use std::time::Instant;
53 #[cfg(feature = "futures")]
54 use futures_util::{select_biased, future::FutureExt, task};
55 #[cfg(not(feature = "std"))]
58 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
59 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
60 /// responsibilities are:
61 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
62 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
63 /// writing it to disk/backups by invoking the callback given to it at startup.
64 /// [`ChannelManager`] persistence should be done in the background.
65 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
66 /// at the appropriate intervals.
67 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
68 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
70 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
71 /// upon as doing so may result in high latency.
75 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
76 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
77 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
78 /// unilateral chain closure fees are at risk.
80 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
81 /// [`Event`]: lightning::util::events::Event
82 #[cfg(feature = "std")]
83 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
84 pub struct BackgroundProcessor {
85 stop_thread: Arc<AtomicBool>,
86 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
90 const FRESHNESS_TIMER: u64 = 60;
92 const FRESHNESS_TIMER: u64 = 1;
94 #[cfg(all(not(test), not(debug_assertions)))]
95 const PING_TIMER: u64 = 10;
96 /// Signature operations take a lot longer without compiler optimisations.
97 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
98 /// timeout is reached.
99 #[cfg(all(not(test), debug_assertions))]
100 const PING_TIMER: u64 = 30;
102 const PING_TIMER: u64 = 1;
104 /// Prune the network graph of stale entries hourly.
105 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
108 const SCORER_PERSIST_TIMER: u64 = 30;
110 const SCORER_PERSIST_TIMER: u64 = 1;
113 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
117 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
119 P: Deref<Target = P2PGossipSync<G, A, L>>,
120 R: Deref<Target = RapidGossipSync<G, L>>,
121 G: Deref<Target = NetworkGraph<L>>,
125 where A::Target: chain::Access, L::Target: Logger {
126 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
128 /// Rapid gossip sync from a trusted server.
135 P: Deref<Target = P2PGossipSync<G, A, L>>,
136 R: Deref<Target = RapidGossipSync<G, L>>,
137 G: Deref<Target = NetworkGraph<L>>,
140 > GossipSync<P, R, G, A, L>
141 where A::Target: chain::Access, L::Target: Logger {
142 fn network_graph(&self) -> Option<&G> {
144 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
145 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
146 GossipSync::None => None,
150 fn prunable_network_graph(&self) -> Option<&G> {
152 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
153 GossipSync::Rapid(gossip_sync) => {
154 if gossip_sync.is_initial_sync_complete() {
155 Some(gossip_sync.network_graph())
160 GossipSync::None => None,
165 /// (C-not exported) as the bindings concretize everything and have constructors for us
166 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
167 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
169 A::Target: chain::Access,
172 /// Initializes a new [`GossipSync::P2P`] variant.
173 pub fn p2p(gossip_sync: P) -> Self {
174 GossipSync::P2P(gossip_sync)
178 /// (C-not exported) as the bindings concretize everything and have constructors for us
179 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
181 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
184 &'a (dyn chain::Access + Send + Sync),
190 /// Initializes a new [`GossipSync::Rapid`] variant.
191 pub fn rapid(gossip_sync: R) -> Self {
192 GossipSync::Rapid(gossip_sync)
196 /// (C-not exported) as the bindings concretize everything and have constructors for us
199 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
200 &RapidGossipSync<&'a NetworkGraph<L>, L>,
202 &'a (dyn chain::Access + Send + Sync),
208 /// Initializes a new [`GossipSync::None`] variant.
209 pub fn none() -> Self {
214 fn handle_network_graph_update<L: Deref>(
215 network_graph: &NetworkGraph<L>, event: &Event
216 ) where L::Target: Logger {
217 if let Event::PaymentPathFailed { ref network_update, .. } = event {
218 if let Some(network_update) = network_update {
219 network_graph.handle_network_update(&network_update);
224 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
225 scorer: &'a S, event: &Event
227 let mut score = scorer.lock();
229 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
230 let path = path.iter().collect::<Vec<_>>();
231 score.payment_path_failed(&path, *scid);
233 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
234 // Reached if the destination explicitly failed it back. We treat this as a successful probe
235 // because the payment made it all the way to the destination with sufficient liquidity.
236 let path = path.iter().collect::<Vec<_>>();
237 score.probe_successful(&path);
239 Event::PaymentPathSuccessful { path, .. } => {
240 let path = path.iter().collect::<Vec<_>>();
241 score.payment_path_successful(&path);
243 Event::ProbeSuccessful { path, .. } => {
244 let path = path.iter().collect::<Vec<_>>();
245 score.probe_successful(&path);
247 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
248 let path = path.iter().collect::<Vec<_>>();
249 score.probe_failed(&path, *scid);
255 macro_rules! define_run_body {
256 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
257 $channel_manager: ident, $process_channel_manager_events: expr,
258 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
259 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
261 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
262 $channel_manager.timer_tick_occurred();
264 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
265 let mut last_ping_call = $get_timer(PING_TIMER);
266 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
267 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
268 let mut have_pruned = false;
271 $process_channel_manager_events;
272 $process_chain_monitor_events;
274 // Note that the PeerManager::process_events may block on ChannelManager's locks,
275 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
276 // we want to ensure we get into `persist_manager` as quickly as we can, especially
277 // without running the normal event processing above and handing events to users.
279 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
280 // processing a message effectively at any point during this loop. In order to
281 // minimize the time between such processing completing and persisting the updated
282 // ChannelManager, we want to minimize methods blocking on a ChannelManager
283 // generally, and as a fallback place such blocking only immediately before
285 $peer_manager.process_events();
287 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
288 // see `await_start`'s use below.
289 let mut await_start = $get_timer(1);
290 let updates_available = $await;
291 let await_slow = $timer_elapsed(&mut await_start, 1);
293 if updates_available {
294 log_trace!($logger, "Persisting ChannelManager...");
295 $persister.persist_manager(&*$channel_manager)?;
296 log_trace!($logger, "Done persisting ChannelManager.");
298 // Exit the loop if the background processor was requested to stop.
299 if $loop_exit_check {
300 log_trace!($logger, "Terminating background processor.");
303 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
304 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
305 $channel_manager.timer_tick_occurred();
306 last_freshness_call = $get_timer(FRESHNESS_TIMER);
309 // On various platforms, we may be starved of CPU cycles for several reasons.
310 // E.g. on iOS, if we've been in the background, we will be entirely paused.
311 // Similarly, if we're on a desktop platform and the device has been asleep, we
312 // may not get any cycles.
313 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
314 // full second, at which point we assume sockets may have been killed (they
315 // appear to be at least on some platforms, even if it has only been a second).
316 // Note that we have to take care to not get here just because user event
317 // processing was slow at the top of the loop. For example, the sample client
318 // may call Bitcoin Core RPCs during event handling, which very often takes
319 // more than a handful of seconds to complete, and shouldn't disconnect all our
321 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
322 $peer_manager.disconnect_all_peers();
323 last_ping_call = $get_timer(PING_TIMER);
324 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
325 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
326 $peer_manager.timer_tick_occurred();
327 last_ping_call = $get_timer(PING_TIMER);
330 // Note that we want to run a graph prune once not long after startup before
331 // falling back to our usual hourly prunes. This avoids short-lived clients never
332 // pruning their network graph. We run once 60 seconds after startup before
333 // continuing our normal cadence.
334 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
335 // The network graph must not be pruned while rapid sync completion is pending
336 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
337 #[cfg(feature = "std")] {
338 log_trace!($logger, "Pruning and persisting network graph.");
339 network_graph.remove_stale_channels_and_tracking();
341 #[cfg(not(feature = "std"))] {
342 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
343 log_trace!($logger, "Persisting network graph.");
346 if let Err(e) = $persister.persist_graph(network_graph) {
347 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
350 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
355 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
356 if let Some(ref scorer) = $scorer {
357 log_trace!($logger, "Persisting scorer");
358 if let Err(e) = $persister.persist_scorer(&scorer) {
359 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
362 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
366 // After we exit, ensure we persist the ChannelManager one final time - this avoids
367 // some races where users quit while channel updates were in-flight, with
368 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
369 $persister.persist_manager(&*$channel_manager)?;
371 // Persist Scorer on exit
372 if let Some(ref scorer) = $scorer {
373 $persister.persist_scorer(&scorer)?;
376 // Persist NetworkGraph on exit
377 if let Some(network_graph) = $gossip_sync.network_graph() {
378 $persister.persist_graph(network_graph)?;
385 /// Processes background events in a future.
387 /// `sleeper` should return a future which completes in the given amount of time and returns a
388 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
389 /// future which outputs true, the loop will exit and this function's future will complete.
391 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
393 /// Requires the `futures` feature. Note that while this method is available without the `std`
394 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
395 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
396 /// manually instead.
397 #[cfg(feature = "futures")]
398 pub async fn process_events_async<
400 CA: 'static + Deref + Send + Sync,
401 CF: 'static + Deref + Send + Sync,
402 CW: 'static + Deref + Send + Sync,
403 T: 'static + Deref + Send + Sync,
404 ES: 'static + Deref + Send + Sync,
405 NS: 'static + Deref + Send + Sync,
406 SP: 'static + Deref + Send + Sync,
407 F: 'static + Deref + Send + Sync,
408 R: 'static + Deref + Send + Sync,
409 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
410 L: 'static + Deref + Send + Sync,
411 P: 'static + Deref + Send + Sync,
412 Descriptor: 'static + SocketDescriptor + Send + Sync,
413 CMH: 'static + Deref + Send + Sync,
414 RMH: 'static + Deref + Send + Sync,
415 OMH: 'static + Deref + Send + Sync,
416 EventHandlerFuture: core::future::Future<Output = ()>,
417 EventHandler: Fn(Event) -> EventHandlerFuture,
418 PS: 'static + Deref + Send,
419 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
420 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
421 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
422 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
423 UMH: 'static + Deref + Send + Sync,
424 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
425 S: 'static + Deref<Target = SC> + Send + Sync,
426 SC: for<'b> WriteableScore<'b>,
427 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
428 Sleeper: Fn(Duration) -> SleepFuture
430 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
431 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
433 ) -> Result<(), io::Error>
435 CA::Target: 'static + chain::Access,
436 CF::Target: 'static + chain::Filter,
437 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
438 T::Target: 'static + BroadcasterInterface,
439 ES::Target: 'static + EntropySource,
440 NS::Target: 'static + NodeSigner,
441 SP::Target: 'static + SignerProvider,
442 F::Target: 'static + FeeEstimator,
443 R::Target: 'static + Router,
444 L::Target: 'static + Logger,
445 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
446 CMH::Target: 'static + ChannelMessageHandler,
447 OMH::Target: 'static + OnionMessageHandler,
448 RMH::Target: 'static + RoutingMessageHandler,
449 UMH::Target: 'static + CustomMessageHandler,
450 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
452 let mut should_break = true;
453 let async_event_handler = |event| {
454 let network_graph = gossip_sync.network_graph();
455 let event_handler = &event_handler;
456 let scorer = &scorer;
458 if let Some(network_graph) = network_graph {
459 handle_network_graph_update(network_graph, &event)
461 if let Some(ref scorer) = scorer {
462 update_scorer(scorer, &event);
464 event_handler(event).await;
467 define_run_body!(persister,
468 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
469 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
470 gossip_sync, peer_manager, logger, scorer, should_break, {
472 _ = channel_manager.get_persistable_update_future().fuse() => true,
473 exit = sleeper(Duration::from_millis(100)).fuse() => {
478 }, |t| sleeper(Duration::from_secs(t)),
479 |fut: &mut SleepFuture, _| {
480 let mut waker = task::noop_waker();
481 let mut ctx = task::Context::from_waker(&mut waker);
482 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
486 #[cfg(feature = "std")]
487 impl BackgroundProcessor {
488 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
491 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
492 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
493 /// either [`join`] or [`stop`].
495 /// # Data Persistence
497 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
498 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
499 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
500 /// provided implementation.
502 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
503 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
504 /// See the `lightning-persister` crate for LDK's provided implementation.
506 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
507 /// error or call [`join`] and handle any error that may arise. For the latter case,
508 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
512 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
513 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
514 /// functionality implemented by other handlers.
515 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
517 /// # Rapid Gossip Sync
519 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
520 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
521 /// until the [`RapidGossipSync`] instance completes its first sync.
523 /// [top-level documentation]: BackgroundProcessor
524 /// [`join`]: Self::join
525 /// [`stop`]: Self::stop
526 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
527 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
528 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
529 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
530 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
531 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
534 CA: 'static + Deref + Send + Sync,
535 CF: 'static + Deref + Send + Sync,
536 CW: 'static + Deref + Send + Sync,
537 T: 'static + Deref + Send + Sync,
538 ES: 'static + Deref + Send + Sync,
539 NS: 'static + Deref + Send + Sync,
540 SP: 'static + Deref + Send + Sync,
541 F: 'static + Deref + Send + Sync,
542 R: 'static + Deref + Send + Sync,
543 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
544 L: 'static + Deref + Send + Sync,
545 P: 'static + Deref + Send + Sync,
546 Descriptor: 'static + SocketDescriptor + Send + Sync,
547 CMH: 'static + Deref + Send + Sync,
548 OMH: 'static + Deref + Send + Sync,
549 RMH: 'static + Deref + Send + Sync,
550 EH: 'static + EventHandler + Send,
551 PS: 'static + Deref + Send,
552 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
553 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
554 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
555 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
556 UMH: 'static + Deref + Send + Sync,
557 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
558 S: 'static + Deref<Target = SC> + Send + Sync,
559 SC: for <'b> WriteableScore<'b>,
561 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
562 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
565 CA::Target: 'static + chain::Access,
566 CF::Target: 'static + chain::Filter,
567 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
568 T::Target: 'static + BroadcasterInterface,
569 ES::Target: 'static + EntropySource,
570 NS::Target: 'static + NodeSigner,
571 SP::Target: 'static + SignerProvider,
572 F::Target: 'static + FeeEstimator,
573 R::Target: 'static + Router,
574 L::Target: 'static + Logger,
575 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
576 CMH::Target: 'static + ChannelMessageHandler,
577 OMH::Target: 'static + OnionMessageHandler,
578 RMH::Target: 'static + RoutingMessageHandler,
579 UMH::Target: 'static + CustomMessageHandler,
580 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
582 let stop_thread = Arc::new(AtomicBool::new(false));
583 let stop_thread_clone = stop_thread.clone();
584 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
585 let event_handler = |event| {
586 let network_graph = gossip_sync.network_graph();
587 if let Some(network_graph) = network_graph {
588 handle_network_graph_update(network_graph, &event)
590 if let Some(ref scorer) = scorer {
591 update_scorer(scorer, &event);
593 event_handler.handle_event(event);
595 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
596 channel_manager, channel_manager.process_pending_events(&event_handler),
597 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
598 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
599 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
601 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
604 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
605 /// [`ChannelManager`].
609 /// This function panics if the background thread has panicked such as while persisting or
612 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
613 pub fn join(mut self) -> Result<(), std::io::Error> {
614 assert!(self.thread_handle.is_some());
618 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
619 /// [`ChannelManager`].
623 /// This function panics if the background thread has panicked such as while persisting or
626 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
627 pub fn stop(mut self) -> Result<(), std::io::Error> {
628 assert!(self.thread_handle.is_some());
629 self.stop_and_join_thread()
632 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
633 self.stop_thread.store(true, Ordering::Release);
637 fn join_thread(&mut self) -> Result<(), std::io::Error> {
638 match self.thread_handle.take() {
639 Some(handle) => handle.join().unwrap(),
645 #[cfg(feature = "std")]
646 impl Drop for BackgroundProcessor {
648 self.stop_and_join_thread().unwrap();
652 #[cfg(all(feature = "std", test))]
654 use bitcoin::blockdata::block::BlockHeader;
655 use bitcoin::blockdata::constants::genesis_block;
656 use bitcoin::blockdata::locktime::PackedLockTime;
657 use bitcoin::blockdata::transaction::{Transaction, TxOut};
658 use bitcoin::network::constants::Network;
659 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
660 use lightning::chain::{BestBlock, Confirm, chainmonitor};
661 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
662 use lightning::chain::keysinterface::{InMemorySigner, EntropySource, KeysManager};
663 use lightning::chain::transaction::OutPoint;
664 use lightning::get_event_msg;
665 use lightning::ln::PaymentHash;
666 use lightning::ln::channelmanager;
667 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
668 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
669 use lightning::ln::msgs::{ChannelMessageHandler, Init};
670 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
671 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
672 use lightning::routing::router::{DefaultRouter, RouteHop};
673 use lightning::routing::scoring::{ChannelUsage, Score};
674 use lightning::util::config::UserConfig;
675 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
676 use lightning::util::ser::Writeable;
677 use lightning::util::test_utils;
678 use lightning::util::persist::KVStorePersister;
679 use lightning_invoice::payment::{InvoicePayer, Retry};
680 use lightning_persister::FilesystemPersister;
681 use std::collections::VecDeque;
683 use std::path::PathBuf;
684 use std::sync::{Arc, Mutex};
685 use std::sync::mpsc::SyncSender;
686 use std::time::Duration;
687 use bitcoin::hashes::Hash;
688 use bitcoin::TxMerkleNode;
689 use lightning_rapid_gossip_sync::RapidGossipSync;
690 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
692 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
694 #[derive(Clone, Hash, PartialEq, Eq)]
695 struct TestDescriptor{}
696 impl SocketDescriptor for TestDescriptor {
697 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
701 fn disconnect_socket(&mut self) {}
704 type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
706 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
708 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
709 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
712 node: Arc<ChannelManager>,
713 p2p_gossip_sync: PGS,
714 rapid_gossip_sync: RGS,
715 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
716 chain_monitor: Arc<ChainMonitor>,
717 persister: Arc<FilesystemPersister>,
718 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
719 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
720 logger: Arc<test_utils::TestLogger>,
721 best_block: BestBlock,
722 scorer: Arc<Mutex<TestScorer>>,
726 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
727 GossipSync::P2P(self.p2p_gossip_sync.clone())
730 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
731 GossipSync::Rapid(self.rapid_gossip_sync.clone())
734 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
741 let data_dir = self.persister.get_data_dir();
742 match fs::remove_dir_all(data_dir.clone()) {
743 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
750 graph_error: Option<(std::io::ErrorKind, &'static str)>,
751 graph_persistence_notifier: Option<SyncSender<()>>,
752 manager_error: Option<(std::io::ErrorKind, &'static str)>,
753 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
754 filesystem_persister: FilesystemPersister,
758 fn new(data_dir: String) -> Self {
759 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
760 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
763 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
764 Self { graph_error: Some((error, message)), ..self }
767 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
768 Self { graph_persistence_notifier: Some(sender), ..self }
771 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
772 Self { manager_error: Some((error, message)), ..self }
775 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
776 Self { scorer_error: Some((error, message)), ..self }
780 impl KVStorePersister for Persister {
781 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
782 if key == "manager" {
783 if let Some((error, message)) = self.manager_error {
784 return Err(std::io::Error::new(error, message))
788 if key == "network_graph" {
789 if let Some(sender) = &self.graph_persistence_notifier {
790 sender.send(()).unwrap();
793 if let Some((error, message)) = self.graph_error {
794 return Err(std::io::Error::new(error, message))
799 if let Some((error, message)) = self.scorer_error {
800 return Err(std::io::Error::new(error, message))
804 self.filesystem_persister.persist(key, object)
809 event_expectations: Option<VecDeque<TestResult>>,
814 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
815 PaymentSuccess { path: Vec<RouteHop> },
816 ProbeFailure { path: Vec<RouteHop> },
817 ProbeSuccess { path: Vec<RouteHop> },
822 Self { event_expectations: None }
825 fn expect(&mut self, expectation: TestResult) {
826 self.event_expectations.get_or_insert_with(|| VecDeque::new()).push_back(expectation);
830 impl lightning::util::ser::Writeable for TestScorer {
831 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
834 impl Score for TestScorer {
835 fn channel_penalty_msat(
836 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
837 ) -> u64 { unimplemented!(); }
839 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
840 if let Some(expectations) = &mut self.event_expectations {
841 match expectations.pop_front().unwrap() {
842 TestResult::PaymentFailure { path, short_channel_id } => {
843 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
844 assert_eq!(actual_short_channel_id, short_channel_id);
846 TestResult::PaymentSuccess { path } => {
847 panic!("Unexpected successful payment path: {:?}", path)
849 TestResult::ProbeFailure { path } => {
850 panic!("Unexpected probe failure: {:?}", path)
852 TestResult::ProbeSuccess { path } => {
853 panic!("Unexpected probe success: {:?}", path)
859 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
860 if let Some(expectations) = &mut self.event_expectations {
861 match expectations.pop_front().unwrap() {
862 TestResult::PaymentFailure { path, .. } => {
863 panic!("Unexpected payment path failure: {:?}", path)
865 TestResult::PaymentSuccess { path } => {
866 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
868 TestResult::ProbeFailure { path } => {
869 panic!("Unexpected probe failure: {:?}", path)
871 TestResult::ProbeSuccess { path } => {
872 panic!("Unexpected probe success: {:?}", path)
878 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
879 if let Some(expectations) = &mut self.event_expectations {
880 match expectations.pop_front().unwrap() {
881 TestResult::PaymentFailure { path, .. } => {
882 panic!("Unexpected payment path failure: {:?}", path)
884 TestResult::PaymentSuccess { path } => {
885 panic!("Unexpected payment path success: {:?}", path)
887 TestResult::ProbeFailure { path } => {
888 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
890 TestResult::ProbeSuccess { path } => {
891 panic!("Unexpected probe success: {:?}", path)
896 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
897 if let Some(expectations) = &mut self.event_expectations {
898 match expectations.pop_front().unwrap() {
899 TestResult::PaymentFailure { path, .. } => {
900 panic!("Unexpected payment path failure: {:?}", path)
902 TestResult::PaymentSuccess { path } => {
903 panic!("Unexpected payment path success: {:?}", path)
905 TestResult::ProbeFailure { path } => {
906 panic!("Unexpected probe failure: {:?}", path)
908 TestResult::ProbeSuccess { path } => {
909 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
916 impl Drop for TestScorer {
918 if std::thread::panicking() {
922 if let Some(event_expectations) = &self.event_expectations {
923 if !event_expectations.is_empty() {
924 panic!("Unsatisfied event expectations: {:?}", event_expectations);
930 fn get_full_filepath(filepath: String, filename: String) -> String {
931 let mut path = PathBuf::from(filepath);
933 path.to_str().unwrap().to_string()
936 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
937 let mut nodes = Vec::new();
938 for i in 0..num_nodes {
939 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
940 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
941 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
942 let network = Network::Testnet;
943 let genesis_block = genesis_block(network);
944 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
945 let scorer = Arc::new(Mutex::new(TestScorer::new()));
946 let seed = [i as u8; 32];
947 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
948 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
949 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
950 let now = Duration::from_secs(genesis_block.header.time as u64);
951 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
952 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
953 let best_block = BestBlock::from_genesis(network);
954 let params = ChainParameters { network, best_block };
955 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
956 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
957 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
958 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
959 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
960 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
964 for i in 0..num_nodes {
965 for j in (i+1)..num_nodes {
966 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
967 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
974 macro_rules! open_channel {
975 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
976 begin_open_channel!($node_a, $node_b, $channel_value);
977 let events = $node_a.node.get_and_clear_pending_events();
978 assert_eq!(events.len(), 1);
979 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
980 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
985 macro_rules! begin_open_channel {
986 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
987 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
988 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
989 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
993 macro_rules! handle_funding_generation_ready {
994 ($event: expr, $channel_value: expr) => {{
996 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
997 assert_eq!(channel_value_satoshis, $channel_value);
998 assert_eq!(user_channel_id, 42);
1000 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1001 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1003 (temporary_channel_id, tx)
1005 _ => panic!("Unexpected event"),
1010 macro_rules! end_open_channel {
1011 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1012 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1013 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1014 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1018 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1019 for i in 1..=depth {
1020 let prev_blockhash = node.best_block.block_hash();
1021 let height = node.best_block.height() + 1;
1022 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1023 let txdata = vec![(0, tx)];
1024 node.best_block = BestBlock::new(header.block_hash(), height);
1027 node.node.transactions_confirmed(&header, &txdata, height);
1028 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1030 x if x == depth => {
1031 node.node.best_block_updated(&header, height);
1032 node.chain_monitor.best_block_updated(&header, height);
1038 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1039 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1043 fn test_background_processor() {
1044 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1045 // updates. Also test that when new updates are available, the manager signals that it needs
1046 // re-persistence and is successfully re-persisted.
1047 let nodes = create_nodes(2, "test_background_processor".to_string());
1049 // Go through the channel creation process so that each node has something to persist. Since
1050 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1051 // avoid a race with processing events.
1052 let tx = open_channel!(nodes[0], nodes[1], 100000);
1054 // Initiate the background processors to watch each node.
1055 let data_dir = nodes[0].persister.get_data_dir();
1056 let persister = Arc::new(Persister::new(data_dir));
1057 let event_handler = |_: _| {};
1058 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1060 macro_rules! check_persisted_data {
1061 ($node: expr, $filepath: expr) => {
1062 let mut expected_bytes = Vec::new();
1064 expected_bytes.clear();
1065 match $node.write(&mut expected_bytes) {
1067 match std::fs::read($filepath) {
1069 if bytes == expected_bytes {
1078 Err(e) => panic!("Unexpected error: {}", e)
1084 // Check that the initial channel manager data is persisted as expected.
1085 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1086 check_persisted_data!(nodes[0].node, filepath.clone());
1089 if !nodes[0].node.get_persistence_condvar_value() { break }
1092 // Force-close the channel.
1093 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1095 // Check that the force-close updates are persisted.
1096 check_persisted_data!(nodes[0].node, filepath.clone());
1098 if !nodes[0].node.get_persistence_condvar_value() { break }
1101 // Check network graph is persisted
1102 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1103 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1105 // Check scorer is persisted
1106 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1107 check_persisted_data!(nodes[0].scorer, filepath.clone());
1109 assert!(bg_processor.stop().is_ok());
1113 fn test_timer_tick_called() {
1114 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1115 // `FRESHNESS_TIMER`.
1116 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1117 let data_dir = nodes[0].persister.get_data_dir();
1118 let persister = Arc::new(Persister::new(data_dir));
1119 let event_handler = |_: _| {};
1120 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1122 let log_entries = nodes[0].logger.lines.lock().unwrap();
1123 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1124 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1125 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1126 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1131 assert!(bg_processor.stop().is_ok());
1135 fn test_channel_manager_persist_error() {
1136 // Test that if we encounter an error during manager persistence, the thread panics.
1137 let nodes = create_nodes(2, "test_persist_error".to_string());
1138 open_channel!(nodes[0], nodes[1], 100000);
1140 let data_dir = nodes[0].persister.get_data_dir();
1141 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1142 let event_handler = |_: _| {};
1143 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1144 match bg_processor.join() {
1145 Ok(_) => panic!("Expected error persisting manager"),
1147 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1148 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1154 fn test_network_graph_persist_error() {
1155 // Test that if we encounter an error during network graph persistence, an error gets returned.
1156 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1157 let data_dir = nodes[0].persister.get_data_dir();
1158 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1159 let event_handler = |_: _| {};
1160 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1162 match bg_processor.stop() {
1163 Ok(_) => panic!("Expected error persisting network graph"),
1165 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1166 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1172 fn test_scorer_persist_error() {
1173 // Test that if we encounter an error during scorer persistence, an error gets returned.
1174 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1175 let data_dir = nodes[0].persister.get_data_dir();
1176 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1177 let event_handler = |_: _| {};
1178 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1180 match bg_processor.stop() {
1181 Ok(_) => panic!("Expected error persisting scorer"),
1183 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1184 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1190 fn test_background_event_handling() {
1191 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1192 let channel_value = 100000;
1193 let data_dir = nodes[0].persister.get_data_dir();
1194 let persister = Arc::new(Persister::new(data_dir.clone()));
1196 // Set up a background event handler for FundingGenerationReady events.
1197 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1198 let event_handler = move |event: Event| match event {
1199 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1200 Event::ChannelReady { .. } => {},
1201 _ => panic!("Unexpected event: {:?}", event),
1204 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1206 // Open a channel and check that the FundingGenerationReady event was handled.
1207 begin_open_channel!(nodes[0], nodes[1], channel_value);
1208 let (temporary_channel_id, funding_tx) = receiver
1209 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1210 .expect("FundingGenerationReady not handled within deadline");
1211 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1213 // Confirm the funding transaction.
1214 confirm_transaction(&mut nodes[0], &funding_tx);
1215 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1216 confirm_transaction(&mut nodes[1], &funding_tx);
1217 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1218 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1219 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1220 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1221 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1223 assert!(bg_processor.stop().is_ok());
1225 // Set up a background event handler for SpendableOutputs events.
1226 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1227 let event_handler = move |event: Event| match event {
1228 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1229 Event::ChannelReady { .. } => {},
1230 Event::ChannelClosed { .. } => {},
1231 _ => panic!("Unexpected event: {:?}", event),
1233 let persister = Arc::new(Persister::new(data_dir));
1234 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1236 // Force close the channel and check that the SpendableOutputs event was handled.
1237 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1238 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1239 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1241 let event = receiver
1242 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1243 .expect("Events not handled within deadline");
1245 Event::SpendableOutputs { .. } => {},
1246 _ => panic!("Unexpected event: {:?}", event),
1249 assert!(bg_processor.stop().is_ok());
1253 fn test_scorer_persistence() {
1254 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1255 let data_dir = nodes[0].persister.get_data_dir();
1256 let persister = Arc::new(Persister::new(data_dir));
1257 let event_handler = |_: _| {};
1258 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1261 let log_entries = nodes[0].logger.lines.lock().unwrap();
1262 let expected_log = "Persisting scorer".to_string();
1263 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1268 assert!(bg_processor.stop().is_ok());
1272 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1273 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1274 let data_dir = nodes[0].persister.get_data_dir();
1275 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1276 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1277 let network_graph = nodes[0].network_graph.clone();
1278 let features = ChannelFeatures::empty();
1279 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1280 .expect("Failed to update channel from partial announcement");
1281 let original_graph_description = network_graph.to_string();
1282 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1283 assert_eq!(network_graph.read_only().channels().len(), 1);
1285 let event_handler = |_: _| {};
1286 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1289 let log_entries = nodes[0].logger.lines.lock().unwrap();
1290 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1291 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1294 // Wait until the loop has gone around at least twice.
1299 let initialization_input = vec![
1300 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1301 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1302 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1303 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1304 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1305 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1306 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1307 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1308 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1309 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1310 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1311 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1312 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1314 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1316 // this should have added two channels
1317 assert_eq!(network_graph.read_only().channels().len(), 3);
1320 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1321 .expect("Network graph not pruned within deadline");
1323 background_processor.stop().unwrap();
1325 // all channels should now be pruned
1326 assert_eq!(network_graph.read_only().channels().len(), 0);
1330 fn test_invoice_payer() {
1331 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1332 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1333 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1335 // Initiate the background processors to watch each node.
1336 let data_dir = nodes[0].persister.get_data_dir();
1337 let persister = Arc::new(Persister::new(data_dir));
1338 let router = Arc::new(DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer)));
1339 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1340 let event_handler = Arc::clone(&invoice_payer);
1341 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1342 assert!(bg_processor.stop().is_ok());
1346 fn test_payment_path_scoring() {
1347 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1348 // that we update the scorer upon a payment path succeeding (note that the channel must be
1349 // public or else we won't score it).
1350 // Set up a background event handler for FundingGenerationReady events.
1351 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1352 let event_handler = move |event: Event| match event {
1353 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1354 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1355 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1356 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1357 _ => panic!("Unexpected event: {:?}", event),
1360 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1361 let data_dir = nodes[0].persister.get_data_dir();
1362 let persister = Arc::new(Persister::new(data_dir.clone()));
1363 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1365 let scored_scid = 4242;
1366 let secp_ctx = Secp256k1::new();
1367 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1368 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1370 let path = vec![RouteHop {
1372 node_features: NodeFeatures::empty(),
1373 short_channel_id: scored_scid,
1374 channel_features: ChannelFeatures::empty(),
1376 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1379 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1380 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1382 payment_hash: PaymentHash([42; 32]),
1383 payment_failed_permanently: false,
1384 network_update: None,
1385 all_paths_failed: true,
1387 short_channel_id: Some(scored_scid),
1390 let event = receiver
1391 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1392 .expect("PaymentPathFailed not handled within deadline");
1394 Event::PaymentPathFailed { .. } => {},
1395 _ => panic!("Unexpected event"),
1398 // Ensure we'll score payments that were explicitly failed back by the destination as
1400 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1401 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1403 payment_hash: PaymentHash([42; 32]),
1404 payment_failed_permanently: true,
1405 network_update: None,
1406 all_paths_failed: true,
1408 short_channel_id: None,
1411 let event = receiver
1412 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1413 .expect("PaymentPathFailed not handled within deadline");
1415 Event::PaymentPathFailed { .. } => {},
1416 _ => panic!("Unexpected event"),
1419 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1420 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1421 payment_id: PaymentId([42; 32]),
1425 let event = receiver
1426 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1427 .expect("PaymentPathSuccessful not handled within deadline");
1429 Event::PaymentPathSuccessful { .. } => {},
1430 _ => panic!("Unexpected event"),
1433 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1434 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1435 payment_id: PaymentId([42; 32]),
1436 payment_hash: PaymentHash([42; 32]),
1439 let event = receiver
1440 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1441 .expect("ProbeSuccessful not handled within deadline");
1443 Event::ProbeSuccessful { .. } => {},
1444 _ => panic!("Unexpected event"),
1447 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1448 nodes[0].node.push_pending_event(Event::ProbeFailed {
1449 payment_id: PaymentId([42; 32]),
1450 payment_hash: PaymentHash([42; 32]),
1452 short_channel_id: Some(scored_scid),
1454 let event = receiver
1455 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1456 .expect("ProbeFailure not handled within deadline");
1458 Event::ProbeFailed { .. } => {},
1459 _ => panic!("Unexpected event"),
1462 assert!(bg_processor.stop().is_ok());