1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[cfg(not(feature = "std"))]
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::ln::channelmanager::ChannelManager;
30 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
31 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
32 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
33 use lightning::routing::utxo::UtxoLookup;
34 use lightning::routing::router::Router;
35 use lightning::routing::scoring::{Score, WriteableScore};
36 use lightning::util::events::{Event, EventHandler, EventsProvider};
37 use lightning::util::logger::Logger;
38 use lightning::util::persist::Persister;
39 use lightning_rapid_gossip_sync::RapidGossipSync;
43 use core::time::Duration;
45 #[cfg(feature = "std")]
47 #[cfg(feature = "std")]
48 use core::sync::atomic::{AtomicBool, Ordering};
49 #[cfg(feature = "std")]
50 use std::thread::{self, JoinHandle};
51 #[cfg(feature = "std")]
52 use std::time::Instant;
54 #[cfg(feature = "futures")]
55 use futures_util::{select_biased, future::FutureExt, task};
56 #[cfg(not(feature = "std"))]
59 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
60 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
61 /// responsibilities are:
62 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
63 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
64 /// writing it to disk/backups by invoking the callback given to it at startup.
65 /// [`ChannelManager`] persistence should be done in the background.
66 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
67 /// at the appropriate intervals.
68 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
69 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
71 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
72 /// upon as doing so may result in high latency.
76 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
77 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
78 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
79 /// unilateral chain closure fees are at risk.
81 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
82 /// [`Event`]: lightning::util::events::Event
83 #[cfg(feature = "std")]
84 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
85 pub struct BackgroundProcessor {
86 stop_thread: Arc<AtomicBool>,
87 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
91 const FRESHNESS_TIMER: u64 = 60;
93 const FRESHNESS_TIMER: u64 = 1;
95 #[cfg(all(not(test), not(debug_assertions)))]
96 const PING_TIMER: u64 = 10;
97 /// Signature operations take a lot longer without compiler optimisations.
98 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
99 /// timeout is reached.
100 #[cfg(all(not(test), debug_assertions))]
101 const PING_TIMER: u64 = 30;
103 const PING_TIMER: u64 = 1;
105 /// Prune the network graph of stale entries hourly.
106 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
109 const SCORER_PERSIST_TIMER: u64 = 30;
111 const SCORER_PERSIST_TIMER: u64 = 1;
114 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
116 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
118 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
120 P: Deref<Target = P2PGossipSync<G, U, L>>,
121 R: Deref<Target = RapidGossipSync<G, L>>,
122 G: Deref<Target = NetworkGraph<L>>,
126 where U::Target: UtxoLookup, L::Target: Logger {
127 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
129 /// Rapid gossip sync from a trusted server.
136 P: Deref<Target = P2PGossipSync<G, U, L>>,
137 R: Deref<Target = RapidGossipSync<G, L>>,
138 G: Deref<Target = NetworkGraph<L>>,
141 > GossipSync<P, R, G, U, L>
142 where U::Target: UtxoLookup, L::Target: Logger {
143 fn network_graph(&self) -> Option<&G> {
145 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
146 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
147 GossipSync::None => None,
151 fn prunable_network_graph(&self) -> Option<&G> {
153 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
154 GossipSync::Rapid(gossip_sync) => {
155 if gossip_sync.is_initial_sync_complete() {
156 Some(gossip_sync.network_graph())
161 GossipSync::None => None,
166 /// (C-not exported) as the bindings concretize everything and have constructors for us
167 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
168 GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
170 U::Target: UtxoLookup,
173 /// Initializes a new [`GossipSync::P2P`] variant.
174 pub fn p2p(gossip_sync: P) -> Self {
175 GossipSync::P2P(gossip_sync)
179 /// (C-not exported) as the bindings concretize everything and have constructors for us
180 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
182 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
185 &'a (dyn UtxoLookup + Send + Sync),
191 /// Initializes a new [`GossipSync::Rapid`] variant.
192 pub fn rapid(gossip_sync: R) -> Self {
193 GossipSync::Rapid(gossip_sync)
197 /// (C-not exported) as the bindings concretize everything and have constructors for us
200 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
201 &RapidGossipSync<&'a NetworkGraph<L>, L>,
203 &'a (dyn UtxoLookup + Send + Sync),
209 /// Initializes a new [`GossipSync::None`] variant.
210 pub fn none() -> Self {
215 fn handle_network_graph_update<L: Deref>(
216 network_graph: &NetworkGraph<L>, event: &Event
217 ) where L::Target: Logger {
218 if let Event::PaymentPathFailed { ref network_update, .. } = event {
219 if let Some(network_update) = network_update {
220 network_graph.handle_network_update(&network_update);
225 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
226 scorer: &'a S, event: &Event
228 let mut score = scorer.lock();
230 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
231 let path = path.iter().collect::<Vec<_>>();
232 score.payment_path_failed(&path, *scid);
234 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
235 // Reached if the destination explicitly failed it back. We treat this as a successful probe
236 // because the payment made it all the way to the destination with sufficient liquidity.
237 let path = path.iter().collect::<Vec<_>>();
238 score.probe_successful(&path);
240 Event::PaymentPathSuccessful { path, .. } => {
241 let path = path.iter().collect::<Vec<_>>();
242 score.payment_path_successful(&path);
244 Event::ProbeSuccessful { path, .. } => {
245 let path = path.iter().collect::<Vec<_>>();
246 score.probe_successful(&path);
248 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
249 let path = path.iter().collect::<Vec<_>>();
250 score.probe_failed(&path, *scid);
256 macro_rules! define_run_body {
257 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
258 $channel_manager: ident, $process_channel_manager_events: expr,
259 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
260 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
262 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
263 $channel_manager.timer_tick_occurred();
265 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
266 let mut last_ping_call = $get_timer(PING_TIMER);
267 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
268 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
269 let mut have_pruned = false;
272 $process_channel_manager_events;
273 $process_chain_monitor_events;
275 // Note that the PeerManager::process_events may block on ChannelManager's locks,
276 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
277 // we want to ensure we get into `persist_manager` as quickly as we can, especially
278 // without running the normal event processing above and handing events to users.
280 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
281 // processing a message effectively at any point during this loop. In order to
282 // minimize the time between such processing completing and persisting the updated
283 // ChannelManager, we want to minimize methods blocking on a ChannelManager
284 // generally, and as a fallback place such blocking only immediately before
286 $peer_manager.process_events();
288 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
289 // see `await_start`'s use below.
290 let mut await_start = $get_timer(1);
291 let updates_available = $await;
292 let await_slow = $timer_elapsed(&mut await_start, 1);
294 if updates_available {
295 log_trace!($logger, "Persisting ChannelManager...");
296 $persister.persist_manager(&*$channel_manager)?;
297 log_trace!($logger, "Done persisting ChannelManager.");
299 // Exit the loop if the background processor was requested to stop.
300 if $loop_exit_check {
301 log_trace!($logger, "Terminating background processor.");
304 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
305 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
306 $channel_manager.timer_tick_occurred();
307 last_freshness_call = $get_timer(FRESHNESS_TIMER);
310 // On various platforms, we may be starved of CPU cycles for several reasons.
311 // E.g. on iOS, if we've been in the background, we will be entirely paused.
312 // Similarly, if we're on a desktop platform and the device has been asleep, we
313 // may not get any cycles.
314 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
315 // full second, at which point we assume sockets may have been killed (they
316 // appear to be at least on some platforms, even if it has only been a second).
317 // Note that we have to take care to not get here just because user event
318 // processing was slow at the top of the loop. For example, the sample client
319 // may call Bitcoin Core RPCs during event handling, which very often takes
320 // more than a handful of seconds to complete, and shouldn't disconnect all our
322 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
323 $peer_manager.disconnect_all_peers();
324 last_ping_call = $get_timer(PING_TIMER);
325 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
326 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
327 $peer_manager.timer_tick_occurred();
328 last_ping_call = $get_timer(PING_TIMER);
331 // Note that we want to run a graph prune once not long after startup before
332 // falling back to our usual hourly prunes. This avoids short-lived clients never
333 // pruning their network graph. We run once 60 seconds after startup before
334 // continuing our normal cadence.
335 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
336 // The network graph must not be pruned while rapid sync completion is pending
337 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
338 #[cfg(feature = "std")] {
339 log_trace!($logger, "Pruning and persisting network graph.");
340 network_graph.remove_stale_channels_and_tracking();
342 #[cfg(not(feature = "std"))] {
343 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
344 log_trace!($logger, "Persisting network graph.");
347 if let Err(e) = $persister.persist_graph(network_graph) {
348 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
351 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
356 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
357 if let Some(ref scorer) = $scorer {
358 log_trace!($logger, "Persisting scorer");
359 if let Err(e) = $persister.persist_scorer(&scorer) {
360 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
363 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
367 // After we exit, ensure we persist the ChannelManager one final time - this avoids
368 // some races where users quit while channel updates were in-flight, with
369 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
370 $persister.persist_manager(&*$channel_manager)?;
372 // Persist Scorer on exit
373 if let Some(ref scorer) = $scorer {
374 $persister.persist_scorer(&scorer)?;
377 // Persist NetworkGraph on exit
378 if let Some(network_graph) = $gossip_sync.network_graph() {
379 $persister.persist_graph(network_graph)?;
386 /// Processes background events in a future.
388 /// `sleeper` should return a future which completes in the given amount of time and returns a
389 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
390 /// future which outputs true, the loop will exit and this function's future will complete.
392 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
394 /// Requires the `futures` feature. Note that while this method is available without the `std`
395 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
396 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
397 /// manually instead.
398 #[cfg(feature = "futures")]
399 pub async fn process_events_async<
401 UL: 'static + Deref + Send + Sync,
402 CF: 'static + Deref + Send + Sync,
403 CW: 'static + Deref + Send + Sync,
404 T: 'static + Deref + Send + Sync,
405 ES: 'static + Deref + Send + Sync,
406 NS: 'static + Deref + Send + Sync,
407 SP: 'static + Deref + Send + Sync,
408 F: 'static + Deref + Send + Sync,
409 R: 'static + Deref + Send + Sync,
410 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
411 L: 'static + Deref + Send + Sync,
412 P: 'static + Deref + Send + Sync,
413 Descriptor: 'static + SocketDescriptor + Send + Sync,
414 CMH: 'static + Deref + Send + Sync,
415 RMH: 'static + Deref + Send + Sync,
416 OMH: 'static + Deref + Send + Sync,
417 EventHandlerFuture: core::future::Future<Output = ()>,
418 EventHandler: Fn(Event) -> EventHandlerFuture,
419 PS: 'static + Deref + Send,
420 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
421 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
422 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
423 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
424 UMH: 'static + Deref + Send + Sync,
425 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
426 S: 'static + Deref<Target = SC> + Send + Sync,
427 SC: for<'b> WriteableScore<'b>,
428 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
429 Sleeper: Fn(Duration) -> SleepFuture
431 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
432 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
434 ) -> Result<(), io::Error>
436 UL::Target: 'static + UtxoLookup,
437 CF::Target: 'static + chain::Filter,
438 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
439 T::Target: 'static + BroadcasterInterface,
440 ES::Target: 'static + EntropySource,
441 NS::Target: 'static + NodeSigner,
442 SP::Target: 'static + SignerProvider,
443 F::Target: 'static + FeeEstimator,
444 R::Target: 'static + Router,
445 L::Target: 'static + Logger,
446 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
447 CMH::Target: 'static + ChannelMessageHandler,
448 OMH::Target: 'static + OnionMessageHandler,
449 RMH::Target: 'static + RoutingMessageHandler,
450 UMH::Target: 'static + CustomMessageHandler,
451 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
453 let mut should_break = true;
454 let async_event_handler = |event| {
455 let network_graph = gossip_sync.network_graph();
456 let event_handler = &event_handler;
457 let scorer = &scorer;
459 if let Some(network_graph) = network_graph {
460 handle_network_graph_update(network_graph, &event)
462 if let Some(ref scorer) = scorer {
463 update_scorer(scorer, &event);
465 event_handler(event).await;
468 define_run_body!(persister,
469 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
470 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
471 gossip_sync, peer_manager, logger, scorer, should_break, {
473 _ = channel_manager.get_persistable_update_future().fuse() => true,
474 exit = sleeper(Duration::from_millis(100)).fuse() => {
479 }, |t| sleeper(Duration::from_secs(t)),
480 |fut: &mut SleepFuture, _| {
481 let mut waker = task::noop_waker();
482 let mut ctx = task::Context::from_waker(&mut waker);
483 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
487 #[cfg(feature = "std")]
488 impl BackgroundProcessor {
489 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
492 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
493 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
494 /// either [`join`] or [`stop`].
496 /// # Data Persistence
498 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
499 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
500 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
501 /// provided implementation.
503 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
504 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
505 /// See the `lightning-persister` crate for LDK's provided implementation.
507 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
508 /// error or call [`join`] and handle any error that may arise. For the latter case,
509 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
513 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
514 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
515 /// functionality implemented by other handlers.
516 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
518 /// # Rapid Gossip Sync
520 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
521 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
522 /// until the [`RapidGossipSync`] instance completes its first sync.
524 /// [top-level documentation]: BackgroundProcessor
525 /// [`join`]: Self::join
526 /// [`stop`]: Self::stop
527 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
528 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
529 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
530 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
531 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
532 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
535 UL: 'static + Deref + Send + Sync,
536 CF: 'static + Deref + Send + Sync,
537 CW: 'static + Deref + Send + Sync,
538 T: 'static + Deref + Send + Sync,
539 ES: 'static + Deref + Send + Sync,
540 NS: 'static + Deref + Send + Sync,
541 SP: 'static + Deref + Send + Sync,
542 F: 'static + Deref + Send + Sync,
543 R: 'static + Deref + Send + Sync,
544 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
545 L: 'static + Deref + Send + Sync,
546 P: 'static + Deref + Send + Sync,
547 Descriptor: 'static + SocketDescriptor + Send + Sync,
548 CMH: 'static + Deref + Send + Sync,
549 OMH: 'static + Deref + Send + Sync,
550 RMH: 'static + Deref + Send + Sync,
551 EH: 'static + EventHandler + Send,
552 PS: 'static + Deref + Send,
553 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
554 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
555 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
556 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
557 UMH: 'static + Deref + Send + Sync,
558 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
559 S: 'static + Deref<Target = SC> + Send + Sync,
560 SC: for <'b> WriteableScore<'b>,
562 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
563 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
566 UL::Target: 'static + UtxoLookup,
567 CF::Target: 'static + chain::Filter,
568 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
569 T::Target: 'static + BroadcasterInterface,
570 ES::Target: 'static + EntropySource,
571 NS::Target: 'static + NodeSigner,
572 SP::Target: 'static + SignerProvider,
573 F::Target: 'static + FeeEstimator,
574 R::Target: 'static + Router,
575 L::Target: 'static + Logger,
576 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
577 CMH::Target: 'static + ChannelMessageHandler,
578 OMH::Target: 'static + OnionMessageHandler,
579 RMH::Target: 'static + RoutingMessageHandler,
580 UMH::Target: 'static + CustomMessageHandler,
581 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
583 let stop_thread = Arc::new(AtomicBool::new(false));
584 let stop_thread_clone = stop_thread.clone();
585 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
586 let event_handler = |event| {
587 let network_graph = gossip_sync.network_graph();
588 if let Some(network_graph) = network_graph {
589 handle_network_graph_update(network_graph, &event)
591 if let Some(ref scorer) = scorer {
592 update_scorer(scorer, &event);
594 event_handler.handle_event(event);
596 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
597 channel_manager, channel_manager.process_pending_events(&event_handler),
598 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
599 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
600 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
602 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
605 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
606 /// [`ChannelManager`].
610 /// This function panics if the background thread has panicked such as while persisting or
613 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
614 pub fn join(mut self) -> Result<(), std::io::Error> {
615 assert!(self.thread_handle.is_some());
619 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
620 /// [`ChannelManager`].
624 /// This function panics if the background thread has panicked such as while persisting or
627 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
628 pub fn stop(mut self) -> Result<(), std::io::Error> {
629 assert!(self.thread_handle.is_some());
630 self.stop_and_join_thread()
633 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
634 self.stop_thread.store(true, Ordering::Release);
638 fn join_thread(&mut self) -> Result<(), std::io::Error> {
639 match self.thread_handle.take() {
640 Some(handle) => handle.join().unwrap(),
646 #[cfg(feature = "std")]
647 impl Drop for BackgroundProcessor {
649 self.stop_and_join_thread().unwrap();
653 #[cfg(all(feature = "std", test))]
655 use bitcoin::blockdata::block::BlockHeader;
656 use bitcoin::blockdata::constants::genesis_block;
657 use bitcoin::blockdata::locktime::PackedLockTime;
658 use bitcoin::blockdata::transaction::{Transaction, TxOut};
659 use bitcoin::network::constants::Network;
660 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
661 use lightning::chain::{BestBlock, Confirm, chainmonitor};
662 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
663 use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
664 use lightning::chain::transaction::OutPoint;
665 use lightning::get_event_msg;
666 use lightning::ln::PaymentHash;
667 use lightning::ln::channelmanager;
668 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
669 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
670 use lightning::ln::msgs::{ChannelMessageHandler, Init};
671 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
672 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
673 use lightning::routing::router::{DefaultRouter, RouteHop};
674 use lightning::routing::scoring::{ChannelUsage, Score};
675 use lightning::util::config::UserConfig;
676 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
677 use lightning::util::ser::Writeable;
678 use lightning::util::test_utils;
679 use lightning::util::persist::KVStorePersister;
680 use lightning_persister::FilesystemPersister;
681 use std::collections::VecDeque;
683 use std::path::PathBuf;
684 use std::sync::{Arc, Mutex};
685 use std::sync::mpsc::SyncSender;
686 use std::time::Duration;
687 use bitcoin::hashes::Hash;
688 use bitcoin::TxMerkleNode;
689 use lightning_rapid_gossip_sync::RapidGossipSync;
690 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
692 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
694 #[derive(Clone, Hash, PartialEq, Eq)]
695 struct TestDescriptor{}
696 impl SocketDescriptor for TestDescriptor {
697 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
701 fn disconnect_socket(&mut self) {}
704 type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
706 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
708 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
709 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
712 node: Arc<ChannelManager>,
713 p2p_gossip_sync: PGS,
714 rapid_gossip_sync: RGS,
715 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
716 chain_monitor: Arc<ChainMonitor>,
717 persister: Arc<FilesystemPersister>,
718 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
719 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
720 logger: Arc<test_utils::TestLogger>,
721 best_block: BestBlock,
722 scorer: Arc<Mutex<TestScorer>>,
726 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
727 GossipSync::P2P(self.p2p_gossip_sync.clone())
730 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
731 GossipSync::Rapid(self.rapid_gossip_sync.clone())
734 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
741 let data_dir = self.persister.get_data_dir();
742 match fs::remove_dir_all(data_dir.clone()) {
743 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
750 graph_error: Option<(std::io::ErrorKind, &'static str)>,
751 graph_persistence_notifier: Option<SyncSender<()>>,
752 manager_error: Option<(std::io::ErrorKind, &'static str)>,
753 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
754 filesystem_persister: FilesystemPersister,
758 fn new(data_dir: String) -> Self {
759 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
760 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
763 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
764 Self { graph_error: Some((error, message)), ..self }
767 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
768 Self { graph_persistence_notifier: Some(sender), ..self }
771 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
772 Self { manager_error: Some((error, message)), ..self }
775 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
776 Self { scorer_error: Some((error, message)), ..self }
780 impl KVStorePersister for Persister {
781 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
782 if key == "manager" {
783 if let Some((error, message)) = self.manager_error {
784 return Err(std::io::Error::new(error, message))
788 if key == "network_graph" {
789 if let Some(sender) = &self.graph_persistence_notifier {
790 sender.send(()).unwrap();
793 if let Some((error, message)) = self.graph_error {
794 return Err(std::io::Error::new(error, message))
799 if let Some((error, message)) = self.scorer_error {
800 return Err(std::io::Error::new(error, message))
804 self.filesystem_persister.persist(key, object)
809 event_expectations: Option<VecDeque<TestResult>>,
814 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
815 PaymentSuccess { path: Vec<RouteHop> },
816 ProbeFailure { path: Vec<RouteHop> },
817 ProbeSuccess { path: Vec<RouteHop> },
822 Self { event_expectations: None }
825 fn expect(&mut self, expectation: TestResult) {
826 self.event_expectations.get_or_insert_with(|| VecDeque::new()).push_back(expectation);
830 impl lightning::util::ser::Writeable for TestScorer {
831 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
834 impl Score for TestScorer {
835 fn channel_penalty_msat(
836 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
837 ) -> u64 { unimplemented!(); }
839 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
840 if let Some(expectations) = &mut self.event_expectations {
841 match expectations.pop_front().unwrap() {
842 TestResult::PaymentFailure { path, short_channel_id } => {
843 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
844 assert_eq!(actual_short_channel_id, short_channel_id);
846 TestResult::PaymentSuccess { path } => {
847 panic!("Unexpected successful payment path: {:?}", path)
849 TestResult::ProbeFailure { path } => {
850 panic!("Unexpected probe failure: {:?}", path)
852 TestResult::ProbeSuccess { path } => {
853 panic!("Unexpected probe success: {:?}", path)
859 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
860 if let Some(expectations) = &mut self.event_expectations {
861 match expectations.pop_front().unwrap() {
862 TestResult::PaymentFailure { path, .. } => {
863 panic!("Unexpected payment path failure: {:?}", path)
865 TestResult::PaymentSuccess { path } => {
866 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
868 TestResult::ProbeFailure { path } => {
869 panic!("Unexpected probe failure: {:?}", path)
871 TestResult::ProbeSuccess { path } => {
872 panic!("Unexpected probe success: {:?}", path)
878 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
879 if let Some(expectations) = &mut self.event_expectations {
880 match expectations.pop_front().unwrap() {
881 TestResult::PaymentFailure { path, .. } => {
882 panic!("Unexpected payment path failure: {:?}", path)
884 TestResult::PaymentSuccess { path } => {
885 panic!("Unexpected payment path success: {:?}", path)
887 TestResult::ProbeFailure { path } => {
888 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
890 TestResult::ProbeSuccess { path } => {
891 panic!("Unexpected probe success: {:?}", path)
896 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
897 if let Some(expectations) = &mut self.event_expectations {
898 match expectations.pop_front().unwrap() {
899 TestResult::PaymentFailure { path, .. } => {
900 panic!("Unexpected payment path failure: {:?}", path)
902 TestResult::PaymentSuccess { path } => {
903 panic!("Unexpected payment path success: {:?}", path)
905 TestResult::ProbeFailure { path } => {
906 panic!("Unexpected probe failure: {:?}", path)
908 TestResult::ProbeSuccess { path } => {
909 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
916 impl Drop for TestScorer {
918 if std::thread::panicking() {
922 if let Some(event_expectations) = &self.event_expectations {
923 if !event_expectations.is_empty() {
924 panic!("Unsatisfied event expectations: {:?}", event_expectations);
930 fn get_full_filepath(filepath: String, filename: String) -> String {
931 let mut path = PathBuf::from(filepath);
933 path.to_str().unwrap().to_string()
936 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
937 let mut nodes = Vec::new();
938 for i in 0..num_nodes {
939 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
940 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
941 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
942 let network = Network::Testnet;
943 let genesis_block = genesis_block(network);
944 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
945 let scorer = Arc::new(Mutex::new(TestScorer::new()));
946 let seed = [i as u8; 32];
947 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
948 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
949 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
950 let now = Duration::from_secs(genesis_block.header.time as u64);
951 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
952 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
953 let best_block = BestBlock::from_genesis(network);
954 let params = ChainParameters { network, best_block };
955 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
956 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
957 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
958 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
959 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
960 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
964 for i in 0..num_nodes {
965 for j in (i+1)..num_nodes {
966 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
967 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
974 macro_rules! open_channel {
975 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
976 begin_open_channel!($node_a, $node_b, $channel_value);
977 let events = $node_a.node.get_and_clear_pending_events();
978 assert_eq!(events.len(), 1);
979 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
980 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
985 macro_rules! begin_open_channel {
986 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
987 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
988 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
989 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
993 macro_rules! handle_funding_generation_ready {
994 ($event: expr, $channel_value: expr) => {{
996 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
997 assert_eq!(channel_value_satoshis, $channel_value);
998 assert_eq!(user_channel_id, 42);
1000 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1001 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1003 (temporary_channel_id, tx)
1005 _ => panic!("Unexpected event"),
1010 macro_rules! end_open_channel {
1011 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1012 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1013 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1014 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1018 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1019 for i in 1..=depth {
1020 let prev_blockhash = node.best_block.block_hash();
1021 let height = node.best_block.height() + 1;
1022 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1023 let txdata = vec![(0, tx)];
1024 node.best_block = BestBlock::new(header.block_hash(), height);
1027 node.node.transactions_confirmed(&header, &txdata, height);
1028 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1030 x if x == depth => {
1031 node.node.best_block_updated(&header, height);
1032 node.chain_monitor.best_block_updated(&header, height);
1038 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1039 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1043 fn test_background_processor() {
1044 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1045 // updates. Also test that when new updates are available, the manager signals that it needs
1046 // re-persistence and is successfully re-persisted.
1047 let nodes = create_nodes(2, "test_background_processor".to_string());
1049 // Go through the channel creation process so that each node has something to persist. Since
1050 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1051 // avoid a race with processing events.
1052 let tx = open_channel!(nodes[0], nodes[1], 100000);
1054 // Initiate the background processors to watch each node.
1055 let data_dir = nodes[0].persister.get_data_dir();
1056 let persister = Arc::new(Persister::new(data_dir));
1057 let event_handler = |_: _| {};
1058 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1060 macro_rules! check_persisted_data {
1061 ($node: expr, $filepath: expr) => {
1062 let mut expected_bytes = Vec::new();
1064 expected_bytes.clear();
1065 match $node.write(&mut expected_bytes) {
1067 match std::fs::read($filepath) {
1069 if bytes == expected_bytes {
1078 Err(e) => panic!("Unexpected error: {}", e)
1084 // Check that the initial channel manager data is persisted as expected.
1085 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1086 check_persisted_data!(nodes[0].node, filepath.clone());
1089 if !nodes[0].node.get_persistence_condvar_value() { break }
1092 // Force-close the channel.
1093 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1095 // Check that the force-close updates are persisted.
1096 check_persisted_data!(nodes[0].node, filepath.clone());
1098 if !nodes[0].node.get_persistence_condvar_value() { break }
1101 // Check network graph is persisted
1102 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1103 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1105 // Check scorer is persisted
1106 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1107 check_persisted_data!(nodes[0].scorer, filepath.clone());
1109 assert!(bg_processor.stop().is_ok());
1113 fn test_timer_tick_called() {
1114 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1115 // `FRESHNESS_TIMER`.
1116 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1117 let data_dir = nodes[0].persister.get_data_dir();
1118 let persister = Arc::new(Persister::new(data_dir));
1119 let event_handler = |_: _| {};
1120 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1122 let log_entries = nodes[0].logger.lines.lock().unwrap();
1123 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1124 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1125 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1126 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1131 assert!(bg_processor.stop().is_ok());
1135 fn test_channel_manager_persist_error() {
1136 // Test that if we encounter an error during manager persistence, the thread panics.
1137 let nodes = create_nodes(2, "test_persist_error".to_string());
1138 open_channel!(nodes[0], nodes[1], 100000);
1140 let data_dir = nodes[0].persister.get_data_dir();
1141 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1142 let event_handler = |_: _| {};
1143 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1144 match bg_processor.join() {
1145 Ok(_) => panic!("Expected error persisting manager"),
1147 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1148 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1154 fn test_network_graph_persist_error() {
1155 // Test that if we encounter an error during network graph persistence, an error gets returned.
1156 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1157 let data_dir = nodes[0].persister.get_data_dir();
1158 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1159 let event_handler = |_: _| {};
1160 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1162 match bg_processor.stop() {
1163 Ok(_) => panic!("Expected error persisting network graph"),
1165 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1166 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1172 fn test_scorer_persist_error() {
1173 // Test that if we encounter an error during scorer persistence, an error gets returned.
1174 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1175 let data_dir = nodes[0].persister.get_data_dir();
1176 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1177 let event_handler = |_: _| {};
1178 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1180 match bg_processor.stop() {
1181 Ok(_) => panic!("Expected error persisting scorer"),
1183 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1184 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1190 fn test_background_event_handling() {
1191 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1192 let channel_value = 100000;
1193 let data_dir = nodes[0].persister.get_data_dir();
1194 let persister = Arc::new(Persister::new(data_dir.clone()));
1196 // Set up a background event handler for FundingGenerationReady events.
1197 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1198 let event_handler = move |event: Event| match event {
1199 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1200 Event::ChannelReady { .. } => {},
1201 _ => panic!("Unexpected event: {:?}", event),
1204 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1206 // Open a channel and check that the FundingGenerationReady event was handled.
1207 begin_open_channel!(nodes[0], nodes[1], channel_value);
1208 let (temporary_channel_id, funding_tx) = receiver
1209 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1210 .expect("FundingGenerationReady not handled within deadline");
1211 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1213 // Confirm the funding transaction.
1214 confirm_transaction(&mut nodes[0], &funding_tx);
1215 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1216 confirm_transaction(&mut nodes[1], &funding_tx);
1217 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1218 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1219 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1220 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1221 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1223 assert!(bg_processor.stop().is_ok());
1225 // Set up a background event handler for SpendableOutputs events.
1226 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1227 let event_handler = move |event: Event| match event {
1228 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1229 Event::ChannelReady { .. } => {},
1230 Event::ChannelClosed { .. } => {},
1231 _ => panic!("Unexpected event: {:?}", event),
1233 let persister = Arc::new(Persister::new(data_dir));
1234 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1236 // Force close the channel and check that the SpendableOutputs event was handled.
1237 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1238 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1239 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1241 let event = receiver
1242 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1243 .expect("Events not handled within deadline");
1245 Event::SpendableOutputs { .. } => {},
1246 _ => panic!("Unexpected event: {:?}", event),
1249 assert!(bg_processor.stop().is_ok());
1253 fn test_scorer_persistence() {
1254 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1255 let data_dir = nodes[0].persister.get_data_dir();
1256 let persister = Arc::new(Persister::new(data_dir));
1257 let event_handler = |_: _| {};
1258 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1261 let log_entries = nodes[0].logger.lines.lock().unwrap();
1262 let expected_log = "Persisting scorer".to_string();
1263 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1268 assert!(bg_processor.stop().is_ok());
1272 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1273 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1274 let data_dir = nodes[0].persister.get_data_dir();
1275 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1276 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1277 let network_graph = nodes[0].network_graph.clone();
1278 let features = ChannelFeatures::empty();
1279 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1280 .expect("Failed to update channel from partial announcement");
1281 let original_graph_description = network_graph.to_string();
1282 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1283 assert_eq!(network_graph.read_only().channels().len(), 1);
1285 let event_handler = |_: _| {};
1286 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1289 let log_entries = nodes[0].logger.lines.lock().unwrap();
1290 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1291 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1294 // Wait until the loop has gone around at least twice.
1299 let initialization_input = vec![
1300 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1301 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1302 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1303 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1304 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1305 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1306 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1307 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1308 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1309 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1310 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1311 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1312 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1314 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1316 // this should have added two channels
1317 assert_eq!(network_graph.read_only().channels().len(), 3);
1320 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1321 .expect("Network graph not pruned within deadline");
1323 background_processor.stop().unwrap();
1325 // all channels should now be pruned
1326 assert_eq!(network_graph.read_only().channels().len(), 0);
1330 fn test_payment_path_scoring() {
1331 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1332 // that we update the scorer upon a payment path succeeding (note that the channel must be
1333 // public or else we won't score it).
1334 // Set up a background event handler for FundingGenerationReady events.
1335 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1336 let event_handler = move |event: Event| match event {
1337 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1338 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1339 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1340 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1341 _ => panic!("Unexpected event: {:?}", event),
1344 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1345 let data_dir = nodes[0].persister.get_data_dir();
1346 let persister = Arc::new(Persister::new(data_dir.clone()));
1347 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1349 let scored_scid = 4242;
1350 let secp_ctx = Secp256k1::new();
1351 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1352 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1354 let path = vec![RouteHop {
1356 node_features: NodeFeatures::empty(),
1357 short_channel_id: scored_scid,
1358 channel_features: ChannelFeatures::empty(),
1360 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1363 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1364 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1366 payment_hash: PaymentHash([42; 32]),
1367 payment_failed_permanently: false,
1368 network_update: None,
1369 all_paths_failed: true,
1371 short_channel_id: Some(scored_scid),
1374 let event = receiver
1375 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1376 .expect("PaymentPathFailed not handled within deadline");
1378 Event::PaymentPathFailed { .. } => {},
1379 _ => panic!("Unexpected event"),
1382 // Ensure we'll score payments that were explicitly failed back by the destination as
1384 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1385 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1387 payment_hash: PaymentHash([42; 32]),
1388 payment_failed_permanently: true,
1389 network_update: None,
1390 all_paths_failed: true,
1392 short_channel_id: None,
1395 let event = receiver
1396 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1397 .expect("PaymentPathFailed not handled within deadline");
1399 Event::PaymentPathFailed { .. } => {},
1400 _ => panic!("Unexpected event"),
1403 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1404 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1405 payment_id: PaymentId([42; 32]),
1409 let event = receiver
1410 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1411 .expect("PaymentPathSuccessful not handled within deadline");
1413 Event::PaymentPathSuccessful { .. } => {},
1414 _ => panic!("Unexpected event"),
1417 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1418 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1419 payment_id: PaymentId([42; 32]),
1420 payment_hash: PaymentHash([42; 32]),
1423 let event = receiver
1424 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1425 .expect("ProbeSuccessful not handled within deadline");
1427 Event::ProbeSuccessful { .. } => {},
1428 _ => panic!("Unexpected event"),
1431 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1432 nodes[0].node.push_pending_event(Event::ProbeFailed {
1433 payment_id: PaymentId([42; 32]),
1434 payment_hash: PaymentHash([42; 32]),
1436 short_channel_id: Some(scored_scid),
1438 let event = receiver
1439 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1440 .expect("ProbeFailure not handled within deadline");
1442 Event::ProbeFailed { .. } => {},
1443 _ => panic!("Unexpected event"),
1446 assert!(bg_processor.stop().is_ok());