1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[cfg(not(feature = "std"))]
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::ln::channelmanager::ChannelManager;
30 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
31 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
32 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
33 use lightning::routing::utxo::UtxoLookup;
34 use lightning::routing::router::Router;
35 use lightning::routing::scoring::{Score, WriteableScore};
36 use lightning::util::events::{Event, EventHandler, EventsProvider, PathFailure};
37 use lightning::util::logger::Logger;
38 use lightning::util::persist::Persister;
39 use lightning_rapid_gossip_sync::RapidGossipSync;
42 use core::time::Duration;
44 #[cfg(feature = "std")]
46 #[cfg(feature = "std")]
47 use core::sync::atomic::{AtomicBool, Ordering};
48 #[cfg(feature = "std")]
49 use std::thread::{self, JoinHandle};
50 #[cfg(feature = "std")]
51 use std::time::Instant;
53 #[cfg(feature = "futures")]
54 use futures_util::{select_biased, future::FutureExt, task};
55 #[cfg(not(feature = "std"))]
58 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
59 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
60 /// responsibilities are:
61 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
62 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
63 /// writing it to disk/backups by invoking the callback given to it at startup.
64 /// [`ChannelManager`] persistence should be done in the background.
65 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
66 /// at the appropriate intervals.
67 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
68 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
70 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
71 /// upon as doing so may result in high latency.
75 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
76 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
77 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
78 /// unilateral chain closure fees are at risk.
80 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
81 /// [`Event`]: lightning::util::events::Event
82 #[cfg(feature = "std")]
83 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
84 pub struct BackgroundProcessor {
85 stop_thread: Arc<AtomicBool>,
86 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
90 const FRESHNESS_TIMER: u64 = 60;
92 const FRESHNESS_TIMER: u64 = 1;
94 #[cfg(all(not(test), not(debug_assertions)))]
95 const PING_TIMER: u64 = 10;
96 /// Signature operations take a lot longer without compiler optimisations.
97 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
98 /// timeout is reached.
99 #[cfg(all(not(test), debug_assertions))]
100 const PING_TIMER: u64 = 30;
102 const PING_TIMER: u64 = 1;
104 /// Prune the network graph of stale entries hourly.
105 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
108 const SCORER_PERSIST_TIMER: u64 = 30;
110 const SCORER_PERSIST_TIMER: u64 = 1;
113 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
117 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
119 P: Deref<Target = P2PGossipSync<G, U, L>>,
120 R: Deref<Target = RapidGossipSync<G, L>>,
121 G: Deref<Target = NetworkGraph<L>>,
125 where U::Target: UtxoLookup, L::Target: Logger {
126 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
128 /// Rapid gossip sync from a trusted server.
135 P: Deref<Target = P2PGossipSync<G, U, L>>,
136 R: Deref<Target = RapidGossipSync<G, L>>,
137 G: Deref<Target = NetworkGraph<L>>,
140 > GossipSync<P, R, G, U, L>
141 where U::Target: UtxoLookup, L::Target: Logger {
142 fn network_graph(&self) -> Option<&G> {
144 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
145 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
146 GossipSync::None => None,
150 fn prunable_network_graph(&self) -> Option<&G> {
152 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
153 GossipSync::Rapid(gossip_sync) => {
154 if gossip_sync.is_initial_sync_complete() {
155 Some(gossip_sync.network_graph())
160 GossipSync::None => None,
165 /// (C-not exported) as the bindings concretize everything and have constructors for us
166 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
167 GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
169 U::Target: UtxoLookup,
172 /// Initializes a new [`GossipSync::P2P`] variant.
173 pub fn p2p(gossip_sync: P) -> Self {
174 GossipSync::P2P(gossip_sync)
178 /// (C-not exported) as the bindings concretize everything and have constructors for us
179 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
181 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
184 &'a (dyn UtxoLookup + Send + Sync),
190 /// Initializes a new [`GossipSync::Rapid`] variant.
191 pub fn rapid(gossip_sync: R) -> Self {
192 GossipSync::Rapid(gossip_sync)
196 /// (C-not exported) as the bindings concretize everything and have constructors for us
199 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
200 &RapidGossipSync<&'a NetworkGraph<L>, L>,
202 &'a (dyn UtxoLookup + Send + Sync),
208 /// Initializes a new [`GossipSync::None`] variant.
209 pub fn none() -> Self {
214 fn handle_network_graph_update<L: Deref>(
215 network_graph: &NetworkGraph<L>, event: &Event
216 ) where L::Target: Logger {
217 if let Event::PaymentPathFailed {
218 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
220 network_graph.handle_network_update(upd);
224 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
225 scorer: &'a S, event: &Event
227 let mut score = scorer.lock();
229 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
230 let path = path.iter().collect::<Vec<_>>();
231 score.payment_path_failed(&path, *scid);
233 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
234 // Reached if the destination explicitly failed it back. We treat this as a successful probe
235 // because the payment made it all the way to the destination with sufficient liquidity.
236 let path = path.iter().collect::<Vec<_>>();
237 score.probe_successful(&path);
239 Event::PaymentPathSuccessful { path, .. } => {
240 let path = path.iter().collect::<Vec<_>>();
241 score.payment_path_successful(&path);
243 Event::ProbeSuccessful { path, .. } => {
244 let path = path.iter().collect::<Vec<_>>();
245 score.probe_successful(&path);
247 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
248 let path = path.iter().collect::<Vec<_>>();
249 score.probe_failed(&path, *scid);
255 macro_rules! define_run_body {
256 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
257 $channel_manager: ident, $process_channel_manager_events: expr,
258 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
259 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
261 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
262 $channel_manager.timer_tick_occurred();
264 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
265 let mut last_ping_call = $get_timer(PING_TIMER);
266 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
267 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
268 let mut have_pruned = false;
271 $process_channel_manager_events;
272 $process_chain_monitor_events;
274 // Note that the PeerManager::process_events may block on ChannelManager's locks,
275 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
276 // we want to ensure we get into `persist_manager` as quickly as we can, especially
277 // without running the normal event processing above and handing events to users.
279 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
280 // processing a message effectively at any point during this loop. In order to
281 // minimize the time between such processing completing and persisting the updated
282 // ChannelManager, we want to minimize methods blocking on a ChannelManager
283 // generally, and as a fallback place such blocking only immediately before
285 $peer_manager.process_events();
287 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
288 // see `await_start`'s use below.
289 let mut await_start = $get_timer(1);
290 let updates_available = $await;
291 let await_slow = $timer_elapsed(&mut await_start, 1);
293 if updates_available {
294 log_trace!($logger, "Persisting ChannelManager...");
295 $persister.persist_manager(&*$channel_manager)?;
296 log_trace!($logger, "Done persisting ChannelManager.");
298 // Exit the loop if the background processor was requested to stop.
299 if $loop_exit_check {
300 log_trace!($logger, "Terminating background processor.");
303 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
304 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
305 $channel_manager.timer_tick_occurred();
306 last_freshness_call = $get_timer(FRESHNESS_TIMER);
309 // On various platforms, we may be starved of CPU cycles for several reasons.
310 // E.g. on iOS, if we've been in the background, we will be entirely paused.
311 // Similarly, if we're on a desktop platform and the device has been asleep, we
312 // may not get any cycles.
313 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
314 // full second, at which point we assume sockets may have been killed (they
315 // appear to be at least on some platforms, even if it has only been a second).
316 // Note that we have to take care to not get here just because user event
317 // processing was slow at the top of the loop. For example, the sample client
318 // may call Bitcoin Core RPCs during event handling, which very often takes
319 // more than a handful of seconds to complete, and shouldn't disconnect all our
321 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
322 $peer_manager.disconnect_all_peers();
323 last_ping_call = $get_timer(PING_TIMER);
324 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
325 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
326 $peer_manager.timer_tick_occurred();
327 last_ping_call = $get_timer(PING_TIMER);
330 // Note that we want to run a graph prune once not long after startup before
331 // falling back to our usual hourly prunes. This avoids short-lived clients never
332 // pruning their network graph. We run once 60 seconds after startup before
333 // continuing our normal cadence.
334 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
335 // The network graph must not be pruned while rapid sync completion is pending
336 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
337 #[cfg(feature = "std")] {
338 log_trace!($logger, "Pruning and persisting network graph.");
339 network_graph.remove_stale_channels_and_tracking();
341 #[cfg(not(feature = "std"))] {
342 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
343 log_trace!($logger, "Persisting network graph.");
346 if let Err(e) = $persister.persist_graph(network_graph) {
347 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
350 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
355 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
356 if let Some(ref scorer) = $scorer {
357 log_trace!($logger, "Persisting scorer");
358 if let Err(e) = $persister.persist_scorer(&scorer) {
359 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
362 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
366 // After we exit, ensure we persist the ChannelManager one final time - this avoids
367 // some races where users quit while channel updates were in-flight, with
368 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
369 $persister.persist_manager(&*$channel_manager)?;
371 // Persist Scorer on exit
372 if let Some(ref scorer) = $scorer {
373 $persister.persist_scorer(&scorer)?;
376 // Persist NetworkGraph on exit
377 if let Some(network_graph) = $gossip_sync.network_graph() {
378 $persister.persist_graph(network_graph)?;
385 /// Processes background events in a future.
387 /// `sleeper` should return a future which completes in the given amount of time and returns a
388 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
389 /// future which outputs true, the loop will exit and this function's future will complete.
391 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
393 /// Requires the `futures` feature. Note that while this method is available without the `std`
394 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
395 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
396 /// manually instead.
397 #[cfg(feature = "futures")]
398 pub async fn process_events_async<
400 UL: 'static + Deref + Send + Sync,
401 CF: 'static + Deref + Send + Sync,
402 CW: 'static + Deref + Send + Sync,
403 T: 'static + Deref + Send + Sync,
404 ES: 'static + Deref + Send + Sync,
405 NS: 'static + Deref + Send + Sync,
406 SP: 'static + Deref + Send + Sync,
407 F: 'static + Deref + Send + Sync,
408 R: 'static + Deref + Send + Sync,
409 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
410 L: 'static + Deref + Send + Sync,
411 P: 'static + Deref + Send + Sync,
412 Descriptor: 'static + SocketDescriptor + Send + Sync,
413 CMH: 'static + Deref + Send + Sync,
414 RMH: 'static + Deref + Send + Sync,
415 OMH: 'static + Deref + Send + Sync,
416 EventHandlerFuture: core::future::Future<Output = ()>,
417 EventHandler: Fn(Event) -> EventHandlerFuture,
418 PS: 'static + Deref + Send,
419 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
420 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
421 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
422 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
423 UMH: 'static + Deref + Send + Sync,
424 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
425 S: 'static + Deref<Target = SC> + Send + Sync,
426 SC: for<'b> WriteableScore<'b>,
427 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
428 Sleeper: Fn(Duration) -> SleepFuture
430 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
431 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
433 ) -> Result<(), lightning::io::Error>
435 UL::Target: 'static + UtxoLookup,
436 CF::Target: 'static + chain::Filter,
437 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
438 T::Target: 'static + BroadcasterInterface,
439 ES::Target: 'static + EntropySource,
440 NS::Target: 'static + NodeSigner,
441 SP::Target: 'static + SignerProvider,
442 F::Target: 'static + FeeEstimator,
443 R::Target: 'static + Router,
444 L::Target: 'static + Logger,
445 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
446 CMH::Target: 'static + ChannelMessageHandler,
447 OMH::Target: 'static + OnionMessageHandler,
448 RMH::Target: 'static + RoutingMessageHandler,
449 UMH::Target: 'static + CustomMessageHandler,
450 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
452 let mut should_break = true;
453 let async_event_handler = |event| {
454 let network_graph = gossip_sync.network_graph();
455 let event_handler = &event_handler;
456 let scorer = &scorer;
458 if let Some(network_graph) = network_graph {
459 handle_network_graph_update(network_graph, &event)
461 if let Some(ref scorer) = scorer {
462 update_scorer(scorer, &event);
464 event_handler(event).await;
467 define_run_body!(persister,
468 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
469 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
470 gossip_sync, peer_manager, logger, scorer, should_break, {
472 _ = channel_manager.get_persistable_update_future().fuse() => true,
473 exit = sleeper(Duration::from_millis(100)).fuse() => {
478 }, |t| sleeper(Duration::from_secs(t)),
479 |fut: &mut SleepFuture, _| {
480 let mut waker = task::noop_waker();
481 let mut ctx = task::Context::from_waker(&mut waker);
482 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
486 #[cfg(feature = "std")]
487 impl BackgroundProcessor {
488 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
491 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
492 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
493 /// either [`join`] or [`stop`].
495 /// # Data Persistence
497 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
498 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
499 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
500 /// provided implementation.
502 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
503 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
504 /// See the `lightning-persister` crate for LDK's provided implementation.
506 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
507 /// error or call [`join`] and handle any error that may arise. For the latter case,
508 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
512 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
513 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
514 /// functionality implemented by other handlers.
515 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
517 /// # Rapid Gossip Sync
519 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
520 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
521 /// until the [`RapidGossipSync`] instance completes its first sync.
523 /// [top-level documentation]: BackgroundProcessor
524 /// [`join`]: Self::join
525 /// [`stop`]: Self::stop
526 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
527 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
528 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
529 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
530 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
531 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
534 UL: 'static + Deref + Send + Sync,
535 CF: 'static + Deref + Send + Sync,
536 CW: 'static + Deref + Send + Sync,
537 T: 'static + Deref + Send + Sync,
538 ES: 'static + Deref + Send + Sync,
539 NS: 'static + Deref + Send + Sync,
540 SP: 'static + Deref + Send + Sync,
541 F: 'static + Deref + Send + Sync,
542 R: 'static + Deref + Send + Sync,
543 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
544 L: 'static + Deref + Send + Sync,
545 P: 'static + Deref + Send + Sync,
546 Descriptor: 'static + SocketDescriptor + Send + Sync,
547 CMH: 'static + Deref + Send + Sync,
548 OMH: 'static + Deref + Send + Sync,
549 RMH: 'static + Deref + Send + Sync,
550 EH: 'static + EventHandler + Send,
551 PS: 'static + Deref + Send,
552 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
553 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
554 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
555 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
556 UMH: 'static + Deref + Send + Sync,
557 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
558 S: 'static + Deref<Target = SC> + Send + Sync,
559 SC: for <'b> WriteableScore<'b>,
561 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
562 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
565 UL::Target: 'static + UtxoLookup,
566 CF::Target: 'static + chain::Filter,
567 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
568 T::Target: 'static + BroadcasterInterface,
569 ES::Target: 'static + EntropySource,
570 NS::Target: 'static + NodeSigner,
571 SP::Target: 'static + SignerProvider,
572 F::Target: 'static + FeeEstimator,
573 R::Target: 'static + Router,
574 L::Target: 'static + Logger,
575 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
576 CMH::Target: 'static + ChannelMessageHandler,
577 OMH::Target: 'static + OnionMessageHandler,
578 RMH::Target: 'static + RoutingMessageHandler,
579 UMH::Target: 'static + CustomMessageHandler,
580 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
582 let stop_thread = Arc::new(AtomicBool::new(false));
583 let stop_thread_clone = stop_thread.clone();
584 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
585 let event_handler = |event| {
586 let network_graph = gossip_sync.network_graph();
587 if let Some(network_graph) = network_graph {
588 handle_network_graph_update(network_graph, &event)
590 if let Some(ref scorer) = scorer {
591 update_scorer(scorer, &event);
593 event_handler.handle_event(event);
595 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
596 channel_manager, channel_manager.process_pending_events(&event_handler),
597 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
598 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
599 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
601 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
604 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
605 /// [`ChannelManager`].
609 /// This function panics if the background thread has panicked such as while persisting or
612 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
613 pub fn join(mut self) -> Result<(), std::io::Error> {
614 assert!(self.thread_handle.is_some());
618 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
619 /// [`ChannelManager`].
623 /// This function panics if the background thread has panicked such as while persisting or
626 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
627 pub fn stop(mut self) -> Result<(), std::io::Error> {
628 assert!(self.thread_handle.is_some());
629 self.stop_and_join_thread()
632 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
633 self.stop_thread.store(true, Ordering::Release);
637 fn join_thread(&mut self) -> Result<(), std::io::Error> {
638 match self.thread_handle.take() {
639 Some(handle) => handle.join().unwrap(),
645 #[cfg(feature = "std")]
646 impl Drop for BackgroundProcessor {
648 self.stop_and_join_thread().unwrap();
652 #[cfg(all(feature = "std", test))]
654 use bitcoin::blockdata::block::BlockHeader;
655 use bitcoin::blockdata::constants::genesis_block;
656 use bitcoin::blockdata::locktime::PackedLockTime;
657 use bitcoin::blockdata::transaction::{Transaction, TxOut};
658 use bitcoin::network::constants::Network;
659 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
660 use lightning::chain::{BestBlock, Confirm, chainmonitor};
661 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
662 use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
663 use lightning::chain::transaction::OutPoint;
664 use lightning::get_event_msg;
665 use lightning::ln::PaymentHash;
666 use lightning::ln::channelmanager;
667 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
668 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
669 use lightning::ln::msgs::{ChannelMessageHandler, Init};
670 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
671 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
672 use lightning::routing::router::{DefaultRouter, RouteHop};
673 use lightning::routing::scoring::{ChannelUsage, Score};
674 use lightning::util::config::UserConfig;
675 use lightning::util::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
676 use lightning::util::ser::Writeable;
677 use lightning::util::test_utils;
678 use lightning::util::persist::KVStorePersister;
679 use lightning_persister::FilesystemPersister;
680 use std::collections::VecDeque;
682 use std::path::PathBuf;
683 use std::sync::{Arc, Mutex};
684 use std::sync::mpsc::SyncSender;
685 use std::time::Duration;
686 use bitcoin::hashes::Hash;
687 use bitcoin::TxMerkleNode;
688 use lightning_rapid_gossip_sync::RapidGossipSync;
689 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
691 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
693 #[derive(Clone, Hash, PartialEq, Eq)]
694 struct TestDescriptor{}
695 impl SocketDescriptor for TestDescriptor {
696 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
700 fn disconnect_socket(&mut self) {}
703 type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
705 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
707 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
708 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
711 node: Arc<ChannelManager>,
712 p2p_gossip_sync: PGS,
713 rapid_gossip_sync: RGS,
714 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
715 chain_monitor: Arc<ChainMonitor>,
716 persister: Arc<FilesystemPersister>,
717 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
718 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
719 logger: Arc<test_utils::TestLogger>,
720 best_block: BestBlock,
721 scorer: Arc<Mutex<TestScorer>>,
725 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
726 GossipSync::P2P(self.p2p_gossip_sync.clone())
729 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
730 GossipSync::Rapid(self.rapid_gossip_sync.clone())
733 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
740 let data_dir = self.persister.get_data_dir();
741 match fs::remove_dir_all(data_dir.clone()) {
742 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
749 graph_error: Option<(std::io::ErrorKind, &'static str)>,
750 graph_persistence_notifier: Option<SyncSender<()>>,
751 manager_error: Option<(std::io::ErrorKind, &'static str)>,
752 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
753 filesystem_persister: FilesystemPersister,
757 fn new(data_dir: String) -> Self {
758 let filesystem_persister = FilesystemPersister::new(data_dir);
759 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
762 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
763 Self { graph_error: Some((error, message)), ..self }
766 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
767 Self { graph_persistence_notifier: Some(sender), ..self }
770 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
771 Self { manager_error: Some((error, message)), ..self }
774 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
775 Self { scorer_error: Some((error, message)), ..self }
779 impl KVStorePersister for Persister {
780 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
781 if key == "manager" {
782 if let Some((error, message)) = self.manager_error {
783 return Err(std::io::Error::new(error, message))
787 if key == "network_graph" {
788 if let Some(sender) = &self.graph_persistence_notifier {
789 sender.send(()).unwrap();
792 if let Some((error, message)) = self.graph_error {
793 return Err(std::io::Error::new(error, message))
798 if let Some((error, message)) = self.scorer_error {
799 return Err(std::io::Error::new(error, message))
803 self.filesystem_persister.persist(key, object)
808 event_expectations: Option<VecDeque<TestResult>>,
813 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
814 PaymentSuccess { path: Vec<RouteHop> },
815 ProbeFailure { path: Vec<RouteHop> },
816 ProbeSuccess { path: Vec<RouteHop> },
821 Self { event_expectations: None }
824 fn expect(&mut self, expectation: TestResult) {
825 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
829 impl lightning::util::ser::Writeable for TestScorer {
830 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
833 impl Score for TestScorer {
834 fn channel_penalty_msat(
835 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
836 ) -> u64 { unimplemented!(); }
838 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
839 if let Some(expectations) = &mut self.event_expectations {
840 match expectations.pop_front().unwrap() {
841 TestResult::PaymentFailure { path, short_channel_id } => {
842 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
843 assert_eq!(actual_short_channel_id, short_channel_id);
845 TestResult::PaymentSuccess { path } => {
846 panic!("Unexpected successful payment path: {:?}", path)
848 TestResult::ProbeFailure { path } => {
849 panic!("Unexpected probe failure: {:?}", path)
851 TestResult::ProbeSuccess { path } => {
852 panic!("Unexpected probe success: {:?}", path)
858 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
859 if let Some(expectations) = &mut self.event_expectations {
860 match expectations.pop_front().unwrap() {
861 TestResult::PaymentFailure { path, .. } => {
862 panic!("Unexpected payment path failure: {:?}", path)
864 TestResult::PaymentSuccess { path } => {
865 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
867 TestResult::ProbeFailure { path } => {
868 panic!("Unexpected probe failure: {:?}", path)
870 TestResult::ProbeSuccess { path } => {
871 panic!("Unexpected probe success: {:?}", path)
877 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
878 if let Some(expectations) = &mut self.event_expectations {
879 match expectations.pop_front().unwrap() {
880 TestResult::PaymentFailure { path, .. } => {
881 panic!("Unexpected payment path failure: {:?}", path)
883 TestResult::PaymentSuccess { path } => {
884 panic!("Unexpected payment path success: {:?}", path)
886 TestResult::ProbeFailure { path } => {
887 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
889 TestResult::ProbeSuccess { path } => {
890 panic!("Unexpected probe success: {:?}", path)
895 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
896 if let Some(expectations) = &mut self.event_expectations {
897 match expectations.pop_front().unwrap() {
898 TestResult::PaymentFailure { path, .. } => {
899 panic!("Unexpected payment path failure: {:?}", path)
901 TestResult::PaymentSuccess { path } => {
902 panic!("Unexpected payment path success: {:?}", path)
904 TestResult::ProbeFailure { path } => {
905 panic!("Unexpected probe failure: {:?}", path)
907 TestResult::ProbeSuccess { path } => {
908 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
915 impl Drop for TestScorer {
917 if std::thread::panicking() {
921 if let Some(event_expectations) = &self.event_expectations {
922 if !event_expectations.is_empty() {
923 panic!("Unsatisfied event expectations: {:?}", event_expectations);
929 fn get_full_filepath(filepath: String, filename: String) -> String {
930 let mut path = PathBuf::from(filepath);
932 path.to_str().unwrap().to_string()
935 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
936 let mut nodes = Vec::new();
937 for i in 0..num_nodes {
938 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
939 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
940 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
941 let network = Network::Testnet;
942 let genesis_block = genesis_block(network);
943 let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
944 let scorer = Arc::new(Mutex::new(TestScorer::new()));
945 let seed = [i as u8; 32];
946 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
947 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
948 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
949 let now = Duration::from_secs(genesis_block.header.time as u64);
950 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
951 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
952 let best_block = BestBlock::from_network(network);
953 let params = ChainParameters { network, best_block };
954 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
955 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
956 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
957 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
958 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
959 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
963 for i in 0..num_nodes {
964 for j in (i+1)..num_nodes {
965 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap();
966 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap();
973 macro_rules! open_channel {
974 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
975 begin_open_channel!($node_a, $node_b, $channel_value);
976 let events = $node_a.node.get_and_clear_pending_events();
977 assert_eq!(events.len(), 1);
978 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
979 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
984 macro_rules! begin_open_channel {
985 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
986 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
987 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
988 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
992 macro_rules! handle_funding_generation_ready {
993 ($event: expr, $channel_value: expr) => {{
995 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
996 assert_eq!(channel_value_satoshis, $channel_value);
997 assert_eq!(user_channel_id, 42);
999 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1000 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1002 (temporary_channel_id, tx)
1004 _ => panic!("Unexpected event"),
1009 macro_rules! end_open_channel {
1010 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1011 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1012 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1013 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1017 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1018 for i in 1..=depth {
1019 let prev_blockhash = node.best_block.block_hash();
1020 let height = node.best_block.height() + 1;
1021 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1022 let txdata = vec![(0, tx)];
1023 node.best_block = BestBlock::new(header.block_hash(), height);
1026 node.node.transactions_confirmed(&header, &txdata, height);
1027 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1029 x if x == depth => {
1030 node.node.best_block_updated(&header, height);
1031 node.chain_monitor.best_block_updated(&header, height);
1037 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1038 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1042 fn test_background_processor() {
1043 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1044 // updates. Also test that when new updates are available, the manager signals that it needs
1045 // re-persistence and is successfully re-persisted.
1046 let nodes = create_nodes(2, "test_background_processor".to_string());
1048 // Go through the channel creation process so that each node has something to persist. Since
1049 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1050 // avoid a race with processing events.
1051 let tx = open_channel!(nodes[0], nodes[1], 100000);
1053 // Initiate the background processors to watch each node.
1054 let data_dir = nodes[0].persister.get_data_dir();
1055 let persister = Arc::new(Persister::new(data_dir));
1056 let event_handler = |_: _| {};
1057 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1059 macro_rules! check_persisted_data {
1060 ($node: expr, $filepath: expr) => {
1061 let mut expected_bytes = Vec::new();
1063 expected_bytes.clear();
1064 match $node.write(&mut expected_bytes) {
1066 match std::fs::read($filepath) {
1068 if bytes == expected_bytes {
1077 Err(e) => panic!("Unexpected error: {}", e)
1083 // Check that the initial channel manager data is persisted as expected.
1084 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1085 check_persisted_data!(nodes[0].node, filepath.clone());
1088 if !nodes[0].node.get_persistence_condvar_value() { break }
1091 // Force-close the channel.
1092 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1094 // Check that the force-close updates are persisted.
1095 check_persisted_data!(nodes[0].node, filepath.clone());
1097 if !nodes[0].node.get_persistence_condvar_value() { break }
1100 // Check network graph is persisted
1101 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1102 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1104 // Check scorer is persisted
1105 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1106 check_persisted_data!(nodes[0].scorer, filepath.clone());
1108 assert!(bg_processor.stop().is_ok());
1112 fn test_timer_tick_called() {
1113 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1114 // `FRESHNESS_TIMER`.
1115 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1116 let data_dir = nodes[0].persister.get_data_dir();
1117 let persister = Arc::new(Persister::new(data_dir));
1118 let event_handler = |_: _| {};
1119 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1121 let log_entries = nodes[0].logger.lines.lock().unwrap();
1122 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1123 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1124 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1125 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1130 assert!(bg_processor.stop().is_ok());
1134 fn test_channel_manager_persist_error() {
1135 // Test that if we encounter an error during manager persistence, the thread panics.
1136 let nodes = create_nodes(2, "test_persist_error".to_string());
1137 open_channel!(nodes[0], nodes[1], 100000);
1139 let data_dir = nodes[0].persister.get_data_dir();
1140 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1141 let event_handler = |_: _| {};
1142 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1143 match bg_processor.join() {
1144 Ok(_) => panic!("Expected error persisting manager"),
1146 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1147 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1153 fn test_network_graph_persist_error() {
1154 // Test that if we encounter an error during network graph persistence, an error gets returned.
1155 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1156 let data_dir = nodes[0].persister.get_data_dir();
1157 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1158 let event_handler = |_: _| {};
1159 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1161 match bg_processor.stop() {
1162 Ok(_) => panic!("Expected error persisting network graph"),
1164 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1165 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1171 fn test_scorer_persist_error() {
1172 // Test that if we encounter an error during scorer persistence, an error gets returned.
1173 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1174 let data_dir = nodes[0].persister.get_data_dir();
1175 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1176 let event_handler = |_: _| {};
1177 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1179 match bg_processor.stop() {
1180 Ok(_) => panic!("Expected error persisting scorer"),
1182 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1183 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1189 fn test_background_event_handling() {
1190 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1191 let channel_value = 100000;
1192 let data_dir = nodes[0].persister.get_data_dir();
1193 let persister = Arc::new(Persister::new(data_dir.clone()));
1195 // Set up a background event handler for FundingGenerationReady events.
1196 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1197 let event_handler = move |event: Event| match event {
1198 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1199 Event::ChannelReady { .. } => {},
1200 _ => panic!("Unexpected event: {:?}", event),
1203 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1205 // Open a channel and check that the FundingGenerationReady event was handled.
1206 begin_open_channel!(nodes[0], nodes[1], channel_value);
1207 let (temporary_channel_id, funding_tx) = receiver
1208 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1209 .expect("FundingGenerationReady not handled within deadline");
1210 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1212 // Confirm the funding transaction.
1213 confirm_transaction(&mut nodes[0], &funding_tx);
1214 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1215 confirm_transaction(&mut nodes[1], &funding_tx);
1216 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1217 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1218 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1219 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1220 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1222 assert!(bg_processor.stop().is_ok());
1224 // Set up a background event handler for SpendableOutputs events.
1225 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1226 let event_handler = move |event: Event| match event {
1227 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1228 Event::ChannelReady { .. } => {},
1229 Event::ChannelClosed { .. } => {},
1230 _ => panic!("Unexpected event: {:?}", event),
1232 let persister = Arc::new(Persister::new(data_dir));
1233 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1235 // Force close the channel and check that the SpendableOutputs event was handled.
1236 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1237 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1238 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1240 let event = receiver
1241 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1242 .expect("Events not handled within deadline");
1244 Event::SpendableOutputs { .. } => {},
1245 _ => panic!("Unexpected event: {:?}", event),
1248 assert!(bg_processor.stop().is_ok());
1252 fn test_scorer_persistence() {
1253 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1254 let data_dir = nodes[0].persister.get_data_dir();
1255 let persister = Arc::new(Persister::new(data_dir));
1256 let event_handler = |_: _| {};
1257 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1260 let log_entries = nodes[0].logger.lines.lock().unwrap();
1261 let expected_log = "Persisting scorer".to_string();
1262 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1267 assert!(bg_processor.stop().is_ok());
1271 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1272 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1273 let data_dir = nodes[0].persister.get_data_dir();
1274 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1275 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1276 let network_graph = nodes[0].network_graph.clone();
1277 let features = ChannelFeatures::empty();
1278 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1279 .expect("Failed to update channel from partial announcement");
1280 let original_graph_description = network_graph.to_string();
1281 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1282 assert_eq!(network_graph.read_only().channels().len(), 1);
1284 let event_handler = |_: _| {};
1285 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1288 let log_entries = nodes[0].logger.lines.lock().unwrap();
1289 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1290 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1293 // Wait until the loop has gone around at least twice.
1298 let initialization_input = vec![
1299 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1300 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1301 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1302 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1303 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1304 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1305 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1306 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1307 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1308 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1309 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1310 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1311 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1313 nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1315 // this should have added two channels
1316 assert_eq!(network_graph.read_only().channels().len(), 3);
1319 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1320 .expect("Network graph not pruned within deadline");
1322 background_processor.stop().unwrap();
1324 // all channels should now be pruned
1325 assert_eq!(network_graph.read_only().channels().len(), 0);
1329 fn test_payment_path_scoring() {
1330 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1331 // that we update the scorer upon a payment path succeeding (note that the channel must be
1332 // public or else we won't score it).
1333 // Set up a background event handler for FundingGenerationReady events.
1334 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1335 let event_handler = move |event: Event| match event {
1336 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1337 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1338 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1339 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1340 _ => panic!("Unexpected event: {:?}", event),
1343 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1344 let data_dir = nodes[0].persister.get_data_dir();
1345 let persister = Arc::new(Persister::new(data_dir));
1346 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1348 let scored_scid = 4242;
1349 let secp_ctx = Secp256k1::new();
1350 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1351 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1353 let path = vec![RouteHop {
1355 node_features: NodeFeatures::empty(),
1356 short_channel_id: scored_scid,
1357 channel_features: ChannelFeatures::empty(),
1359 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1362 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1363 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1365 payment_hash: PaymentHash([42; 32]),
1366 payment_failed_permanently: false,
1367 failure: PathFailure::OnPath { network_update: None },
1369 short_channel_id: Some(scored_scid),
1372 let event = receiver
1373 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1374 .expect("PaymentPathFailed not handled within deadline");
1376 Event::PaymentPathFailed { .. } => {},
1377 _ => panic!("Unexpected event"),
1380 // Ensure we'll score payments that were explicitly failed back by the destination as
1382 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1383 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1385 payment_hash: PaymentHash([42; 32]),
1386 payment_failed_permanently: true,
1387 failure: PathFailure::OnPath { network_update: None },
1389 short_channel_id: None,
1392 let event = receiver
1393 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1394 .expect("PaymentPathFailed not handled within deadline");
1396 Event::PaymentPathFailed { .. } => {},
1397 _ => panic!("Unexpected event"),
1400 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1401 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1402 payment_id: PaymentId([42; 32]),
1406 let event = receiver
1407 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1408 .expect("PaymentPathSuccessful not handled within deadline");
1410 Event::PaymentPathSuccessful { .. } => {},
1411 _ => panic!("Unexpected event"),
1414 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1415 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1416 payment_id: PaymentId([42; 32]),
1417 payment_hash: PaymentHash([42; 32]),
1420 let event = receiver
1421 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1422 .expect("ProbeSuccessful not handled within deadline");
1424 Event::ProbeSuccessful { .. } => {},
1425 _ => panic!("Unexpected event"),
1428 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1429 nodes[0].node.push_pending_event(Event::ProbeFailed {
1430 payment_id: PaymentId([42; 32]),
1431 payment_hash: PaymentHash([42; 32]),
1433 short_channel_id: Some(scored_scid),
1435 let event = receiver
1436 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1437 .expect("ProbeFailure not handled within deadline");
1439 Event::ProbeFailed { .. } => {},
1440 _ => panic!("Unexpected event"),
1443 assert!(bg_processor.stop().is_ok());