1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[cfg(not(feature = "std"))]
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::ln::channelmanager::ChannelManager;
30 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
31 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
32 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
33 use lightning::routing::utxo::UtxoLookup;
34 use lightning::routing::router::Router;
35 use lightning::routing::scoring::{Score, WriteableScore};
36 use lightning::util::events::{Event, PathFailure};
37 #[cfg(feature = "std")]
38 use lightning::util::events::{EventHandler, EventsProvider};
39 use lightning::util::logger::Logger;
40 use lightning::util::persist::Persister;
41 use lightning_rapid_gossip_sync::RapidGossipSync;
44 use core::time::Duration;
46 #[cfg(feature = "std")]
48 #[cfg(feature = "std")]
49 use core::sync::atomic::{AtomicBool, Ordering};
50 #[cfg(feature = "std")]
51 use std::thread::{self, JoinHandle};
52 #[cfg(feature = "std")]
53 use std::time::Instant;
55 #[cfg(feature = "futures")]
56 use futures_util::{select_biased, future::FutureExt, task};
57 #[cfg(not(feature = "std"))]
60 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
61 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
62 /// responsibilities are:
63 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
64 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
65 /// writing it to disk/backups by invoking the callback given to it at startup.
66 /// [`ChannelManager`] persistence should be done in the background.
67 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
68 /// at the appropriate intervals.
69 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
70 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
72 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
73 /// upon as doing so may result in high latency.
77 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
78 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
79 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
80 /// unilateral chain closure fees are at risk.
82 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
83 /// [`Event`]: lightning::util::events::Event
84 #[cfg(feature = "std")]
85 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
86 pub struct BackgroundProcessor {
87 stop_thread: Arc<AtomicBool>,
88 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
92 const FRESHNESS_TIMER: u64 = 60;
94 const FRESHNESS_TIMER: u64 = 1;
96 #[cfg(all(not(test), not(debug_assertions)))]
97 const PING_TIMER: u64 = 10;
98 /// Signature operations take a lot longer without compiler optimisations.
99 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
100 /// timeout is reached.
101 #[cfg(all(not(test), debug_assertions))]
102 const PING_TIMER: u64 = 30;
104 const PING_TIMER: u64 = 1;
106 /// Prune the network graph of stale entries hourly.
107 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
110 const SCORER_PERSIST_TIMER: u64 = 30;
112 const SCORER_PERSIST_TIMER: u64 = 1;
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
117 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
119 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
121 P: Deref<Target = P2PGossipSync<G, U, L>>,
122 R: Deref<Target = RapidGossipSync<G, L>>,
123 G: Deref<Target = NetworkGraph<L>>,
127 where U::Target: UtxoLookup, L::Target: Logger {
128 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
130 /// Rapid gossip sync from a trusted server.
137 P: Deref<Target = P2PGossipSync<G, U, L>>,
138 R: Deref<Target = RapidGossipSync<G, L>>,
139 G: Deref<Target = NetworkGraph<L>>,
142 > GossipSync<P, R, G, U, L>
143 where U::Target: UtxoLookup, L::Target: Logger {
144 fn network_graph(&self) -> Option<&G> {
146 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
147 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
148 GossipSync::None => None,
152 fn prunable_network_graph(&self) -> Option<&G> {
154 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
155 GossipSync::Rapid(gossip_sync) => {
156 if gossip_sync.is_initial_sync_complete() {
157 Some(gossip_sync.network_graph())
162 GossipSync::None => None,
167 /// (C-not exported) as the bindings concretize everything and have constructors for us
168 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
169 GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
171 U::Target: UtxoLookup,
174 /// Initializes a new [`GossipSync::P2P`] variant.
175 pub fn p2p(gossip_sync: P) -> Self {
176 GossipSync::P2P(gossip_sync)
180 /// (C-not exported) as the bindings concretize everything and have constructors for us
181 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
183 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
186 &'a (dyn UtxoLookup + Send + Sync),
192 /// Initializes a new [`GossipSync::Rapid`] variant.
193 pub fn rapid(gossip_sync: R) -> Self {
194 GossipSync::Rapid(gossip_sync)
198 /// (C-not exported) as the bindings concretize everything and have constructors for us
201 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
202 &RapidGossipSync<&'a NetworkGraph<L>, L>,
204 &'a (dyn UtxoLookup + Send + Sync),
210 /// Initializes a new [`GossipSync::None`] variant.
211 pub fn none() -> Self {
216 fn handle_network_graph_update<L: Deref>(
217 network_graph: &NetworkGraph<L>, event: &Event
218 ) where L::Target: Logger {
219 if let Event::PaymentPathFailed {
220 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
222 network_graph.handle_network_update(upd);
226 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
227 scorer: &'a S, event: &Event
229 let mut score = scorer.lock();
231 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
232 let path = path.iter().collect::<Vec<_>>();
233 score.payment_path_failed(&path, *scid);
235 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
236 // Reached if the destination explicitly failed it back. We treat this as a successful probe
237 // because the payment made it all the way to the destination with sufficient liquidity.
238 let path = path.iter().collect::<Vec<_>>();
239 score.probe_successful(&path);
241 Event::PaymentPathSuccessful { path, .. } => {
242 let path = path.iter().collect::<Vec<_>>();
243 score.payment_path_successful(&path);
245 Event::ProbeSuccessful { path, .. } => {
246 let path = path.iter().collect::<Vec<_>>();
247 score.probe_successful(&path);
249 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
250 let path = path.iter().collect::<Vec<_>>();
251 score.probe_failed(&path, *scid);
257 macro_rules! define_run_body {
258 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
259 $channel_manager: ident, $process_channel_manager_events: expr,
260 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
261 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
263 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
264 $channel_manager.timer_tick_occurred();
266 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
267 let mut last_ping_call = $get_timer(PING_TIMER);
268 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
269 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
270 let mut have_pruned = false;
273 $process_channel_manager_events;
274 $process_chain_monitor_events;
276 // Note that the PeerManager::process_events may block on ChannelManager's locks,
277 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
278 // we want to ensure we get into `persist_manager` as quickly as we can, especially
279 // without running the normal event processing above and handing events to users.
281 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
282 // processing a message effectively at any point during this loop. In order to
283 // minimize the time between such processing completing and persisting the updated
284 // ChannelManager, we want to minimize methods blocking on a ChannelManager
285 // generally, and as a fallback place such blocking only immediately before
287 $peer_manager.process_events();
289 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
290 // see `await_start`'s use below.
291 let mut await_start = $get_timer(1);
292 let updates_available = $await;
293 let await_slow = $timer_elapsed(&mut await_start, 1);
295 if updates_available {
296 log_trace!($logger, "Persisting ChannelManager...");
297 $persister.persist_manager(&*$channel_manager)?;
298 log_trace!($logger, "Done persisting ChannelManager.");
300 // Exit the loop if the background processor was requested to stop.
301 if $loop_exit_check {
302 log_trace!($logger, "Terminating background processor.");
305 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
306 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
307 $channel_manager.timer_tick_occurred();
308 last_freshness_call = $get_timer(FRESHNESS_TIMER);
311 // On various platforms, we may be starved of CPU cycles for several reasons.
312 // E.g. on iOS, if we've been in the background, we will be entirely paused.
313 // Similarly, if we're on a desktop platform and the device has been asleep, we
314 // may not get any cycles.
315 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
316 // full second, at which point we assume sockets may have been killed (they
317 // appear to be at least on some platforms, even if it has only been a second).
318 // Note that we have to take care to not get here just because user event
319 // processing was slow at the top of the loop. For example, the sample client
320 // may call Bitcoin Core RPCs during event handling, which very often takes
321 // more than a handful of seconds to complete, and shouldn't disconnect all our
323 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
324 $peer_manager.disconnect_all_peers();
325 last_ping_call = $get_timer(PING_TIMER);
326 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
327 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
328 $peer_manager.timer_tick_occurred();
329 last_ping_call = $get_timer(PING_TIMER);
332 // Note that we want to run a graph prune once not long after startup before
333 // falling back to our usual hourly prunes. This avoids short-lived clients never
334 // pruning their network graph. We run once 60 seconds after startup before
335 // continuing our normal cadence.
336 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
337 // The network graph must not be pruned while rapid sync completion is pending
338 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
339 #[cfg(feature = "std")] {
340 log_trace!($logger, "Pruning and persisting network graph.");
341 network_graph.remove_stale_channels_and_tracking();
343 #[cfg(not(feature = "std"))] {
344 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
345 log_trace!($logger, "Persisting network graph.");
348 if let Err(e) = $persister.persist_graph(network_graph) {
349 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
352 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
357 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
358 if let Some(ref scorer) = $scorer {
359 log_trace!($logger, "Persisting scorer");
360 if let Err(e) = $persister.persist_scorer(&scorer) {
361 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
364 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
368 // After we exit, ensure we persist the ChannelManager one final time - this avoids
369 // some races where users quit while channel updates were in-flight, with
370 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
371 $persister.persist_manager(&*$channel_manager)?;
373 // Persist Scorer on exit
374 if let Some(ref scorer) = $scorer {
375 $persister.persist_scorer(&scorer)?;
378 // Persist NetworkGraph on exit
379 if let Some(network_graph) = $gossip_sync.network_graph() {
380 $persister.persist_graph(network_graph)?;
387 /// Processes background events in a future.
389 /// `sleeper` should return a future which completes in the given amount of time and returns a
390 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
391 /// future which outputs true, the loop will exit and this function's future will complete.
393 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
395 /// Requires the `futures` feature. Note that while this method is available without the `std`
396 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
397 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
398 /// manually instead.
399 #[cfg(feature = "futures")]
400 pub async fn process_events_async<
402 UL: 'static + Deref + Send + Sync,
403 CF: 'static + Deref + Send + Sync,
404 CW: 'static + Deref + Send + Sync,
405 T: 'static + Deref + Send + Sync,
406 ES: 'static + Deref + Send + Sync,
407 NS: 'static + Deref + Send + Sync,
408 SP: 'static + Deref + Send + Sync,
409 F: 'static + Deref + Send + Sync,
410 R: 'static + Deref + Send + Sync,
411 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
412 L: 'static + Deref + Send + Sync,
413 P: 'static + Deref + Send + Sync,
414 Descriptor: 'static + SocketDescriptor + Send + Sync,
415 CMH: 'static + Deref + Send + Sync,
416 RMH: 'static + Deref + Send + Sync,
417 OMH: 'static + Deref + Send + Sync,
418 EventHandlerFuture: core::future::Future<Output = ()>,
419 EventHandler: Fn(Event) -> EventHandlerFuture,
420 PS: 'static + Deref + Send,
421 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
422 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
423 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
424 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
425 UMH: 'static + Deref + Send + Sync,
426 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
427 S: 'static + Deref<Target = SC> + Send + Sync,
428 SC: for<'b> WriteableScore<'b>,
429 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
430 Sleeper: Fn(Duration) -> SleepFuture
432 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
433 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
435 ) -> Result<(), lightning::io::Error>
437 UL::Target: 'static + UtxoLookup,
438 CF::Target: 'static + chain::Filter,
439 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
440 T::Target: 'static + BroadcasterInterface,
441 ES::Target: 'static + EntropySource,
442 NS::Target: 'static + NodeSigner,
443 SP::Target: 'static + SignerProvider,
444 F::Target: 'static + FeeEstimator,
445 R::Target: 'static + Router,
446 L::Target: 'static + Logger,
447 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
448 CMH::Target: 'static + ChannelMessageHandler,
449 OMH::Target: 'static + OnionMessageHandler,
450 RMH::Target: 'static + RoutingMessageHandler,
451 UMH::Target: 'static + CustomMessageHandler,
452 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
454 let mut should_break = true;
455 let async_event_handler = |event| {
456 let network_graph = gossip_sync.network_graph();
457 let event_handler = &event_handler;
458 let scorer = &scorer;
460 if let Some(network_graph) = network_graph {
461 handle_network_graph_update(network_graph, &event)
463 if let Some(ref scorer) = scorer {
464 update_scorer(scorer, &event);
466 event_handler(event).await;
469 define_run_body!(persister,
470 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
471 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
472 gossip_sync, peer_manager, logger, scorer, should_break, {
474 _ = channel_manager.get_persistable_update_future().fuse() => true,
475 exit = sleeper(Duration::from_millis(100)).fuse() => {
480 }, |t| sleeper(Duration::from_secs(t)),
481 |fut: &mut SleepFuture, _| {
482 let mut waker = task::noop_waker();
483 let mut ctx = task::Context::from_waker(&mut waker);
484 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
488 #[cfg(feature = "std")]
489 impl BackgroundProcessor {
490 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
493 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
494 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
495 /// either [`join`] or [`stop`].
497 /// # Data Persistence
499 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
500 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
501 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
502 /// provided implementation.
504 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
505 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
506 /// See the `lightning-persister` crate for LDK's provided implementation.
508 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
509 /// error or call [`join`] and handle any error that may arise. For the latter case,
510 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
514 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
515 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
516 /// functionality implemented by other handlers.
517 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
519 /// # Rapid Gossip Sync
521 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
522 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
523 /// until the [`RapidGossipSync`] instance completes its first sync.
525 /// [top-level documentation]: BackgroundProcessor
526 /// [`join`]: Self::join
527 /// [`stop`]: Self::stop
528 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
529 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
530 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
531 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
532 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
533 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
536 UL: 'static + Deref + Send + Sync,
537 CF: 'static + Deref + Send + Sync,
538 CW: 'static + Deref + Send + Sync,
539 T: 'static + Deref + Send + Sync,
540 ES: 'static + Deref + Send + Sync,
541 NS: 'static + Deref + Send + Sync,
542 SP: 'static + Deref + Send + Sync,
543 F: 'static + Deref + Send + Sync,
544 R: 'static + Deref + Send + Sync,
545 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
546 L: 'static + Deref + Send + Sync,
547 P: 'static + Deref + Send + Sync,
548 Descriptor: 'static + SocketDescriptor + Send + Sync,
549 CMH: 'static + Deref + Send + Sync,
550 OMH: 'static + Deref + Send + Sync,
551 RMH: 'static + Deref + Send + Sync,
552 EH: 'static + EventHandler + Send,
553 PS: 'static + Deref + Send,
554 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
555 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
556 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
557 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
558 UMH: 'static + Deref + Send + Sync,
559 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
560 S: 'static + Deref<Target = SC> + Send + Sync,
561 SC: for <'b> WriteableScore<'b>,
563 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
564 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
567 UL::Target: 'static + UtxoLookup,
568 CF::Target: 'static + chain::Filter,
569 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
570 T::Target: 'static + BroadcasterInterface,
571 ES::Target: 'static + EntropySource,
572 NS::Target: 'static + NodeSigner,
573 SP::Target: 'static + SignerProvider,
574 F::Target: 'static + FeeEstimator,
575 R::Target: 'static + Router,
576 L::Target: 'static + Logger,
577 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
578 CMH::Target: 'static + ChannelMessageHandler,
579 OMH::Target: 'static + OnionMessageHandler,
580 RMH::Target: 'static + RoutingMessageHandler,
581 UMH::Target: 'static + CustomMessageHandler,
582 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
584 let stop_thread = Arc::new(AtomicBool::new(false));
585 let stop_thread_clone = stop_thread.clone();
586 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
587 let event_handler = |event| {
588 let network_graph = gossip_sync.network_graph();
589 if let Some(network_graph) = network_graph {
590 handle_network_graph_update(network_graph, &event)
592 if let Some(ref scorer) = scorer {
593 update_scorer(scorer, &event);
595 event_handler.handle_event(event);
597 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
598 channel_manager, channel_manager.process_pending_events(&event_handler),
599 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
600 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
601 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
603 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
606 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
607 /// [`ChannelManager`].
611 /// This function panics if the background thread has panicked such as while persisting or
614 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
615 pub fn join(mut self) -> Result<(), std::io::Error> {
616 assert!(self.thread_handle.is_some());
620 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
621 /// [`ChannelManager`].
625 /// This function panics if the background thread has panicked such as while persisting or
628 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
629 pub fn stop(mut self) -> Result<(), std::io::Error> {
630 assert!(self.thread_handle.is_some());
631 self.stop_and_join_thread()
634 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
635 self.stop_thread.store(true, Ordering::Release);
639 fn join_thread(&mut self) -> Result<(), std::io::Error> {
640 match self.thread_handle.take() {
641 Some(handle) => handle.join().unwrap(),
647 #[cfg(feature = "std")]
648 impl Drop for BackgroundProcessor {
650 self.stop_and_join_thread().unwrap();
654 #[cfg(all(feature = "std", test))]
656 use bitcoin::blockdata::block::BlockHeader;
657 use bitcoin::blockdata::constants::genesis_block;
658 use bitcoin::blockdata::locktime::PackedLockTime;
659 use bitcoin::blockdata::transaction::{Transaction, TxOut};
660 use bitcoin::network::constants::Network;
661 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
662 use lightning::chain::{BestBlock, Confirm, chainmonitor};
663 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
664 use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
665 use lightning::chain::transaction::OutPoint;
666 use lightning::get_event_msg;
667 use lightning::ln::PaymentHash;
668 use lightning::ln::channelmanager;
669 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
670 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
671 use lightning::ln::msgs::{ChannelMessageHandler, Init};
672 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
673 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
674 use lightning::routing::router::{DefaultRouter, RouteHop};
675 use lightning::routing::scoring::{ChannelUsage, Score};
676 use lightning::util::config::UserConfig;
677 use lightning::util::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
678 use lightning::util::ser::Writeable;
679 use lightning::util::test_utils;
680 use lightning::util::persist::KVStorePersister;
681 use lightning_persister::FilesystemPersister;
682 use std::collections::VecDeque;
684 use std::path::PathBuf;
685 use std::sync::{Arc, Mutex};
686 use std::sync::mpsc::SyncSender;
687 use std::time::Duration;
688 use bitcoin::hashes::Hash;
689 use bitcoin::TxMerkleNode;
690 use lightning_rapid_gossip_sync::RapidGossipSync;
691 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
693 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
695 #[derive(Clone, Hash, PartialEq, Eq)]
696 struct TestDescriptor{}
697 impl SocketDescriptor for TestDescriptor {
698 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
702 fn disconnect_socket(&mut self) {}
705 type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
707 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
709 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
710 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
713 node: Arc<ChannelManager>,
714 p2p_gossip_sync: PGS,
715 rapid_gossip_sync: RGS,
716 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
717 chain_monitor: Arc<ChainMonitor>,
718 persister: Arc<FilesystemPersister>,
719 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
720 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
721 logger: Arc<test_utils::TestLogger>,
722 best_block: BestBlock,
723 scorer: Arc<Mutex<TestScorer>>,
727 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
728 GossipSync::P2P(self.p2p_gossip_sync.clone())
731 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
732 GossipSync::Rapid(self.rapid_gossip_sync.clone())
735 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
742 let data_dir = self.persister.get_data_dir();
743 match fs::remove_dir_all(data_dir.clone()) {
744 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
751 graph_error: Option<(std::io::ErrorKind, &'static str)>,
752 graph_persistence_notifier: Option<SyncSender<()>>,
753 manager_error: Option<(std::io::ErrorKind, &'static str)>,
754 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
755 filesystem_persister: FilesystemPersister,
759 fn new(data_dir: String) -> Self {
760 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
761 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
764 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
765 Self { graph_error: Some((error, message)), ..self }
768 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
769 Self { graph_persistence_notifier: Some(sender), ..self }
772 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
773 Self { manager_error: Some((error, message)), ..self }
776 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
777 Self { scorer_error: Some((error, message)), ..self }
781 impl KVStorePersister for Persister {
782 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
783 if key == "manager" {
784 if let Some((error, message)) = self.manager_error {
785 return Err(std::io::Error::new(error, message))
789 if key == "network_graph" {
790 if let Some(sender) = &self.graph_persistence_notifier {
791 sender.send(()).unwrap();
794 if let Some((error, message)) = self.graph_error {
795 return Err(std::io::Error::new(error, message))
800 if let Some((error, message)) = self.scorer_error {
801 return Err(std::io::Error::new(error, message))
805 self.filesystem_persister.persist(key, object)
810 event_expectations: Option<VecDeque<TestResult>>,
815 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
816 PaymentSuccess { path: Vec<RouteHop> },
817 ProbeFailure { path: Vec<RouteHop> },
818 ProbeSuccess { path: Vec<RouteHop> },
823 Self { event_expectations: None }
826 fn expect(&mut self, expectation: TestResult) {
827 self.event_expectations.get_or_insert_with(|| VecDeque::new()).push_back(expectation);
831 impl lightning::util::ser::Writeable for TestScorer {
832 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
835 impl Score for TestScorer {
836 fn channel_penalty_msat(
837 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
838 ) -> u64 { unimplemented!(); }
840 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
841 if let Some(expectations) = &mut self.event_expectations {
842 match expectations.pop_front().unwrap() {
843 TestResult::PaymentFailure { path, short_channel_id } => {
844 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
845 assert_eq!(actual_short_channel_id, short_channel_id);
847 TestResult::PaymentSuccess { path } => {
848 panic!("Unexpected successful payment path: {:?}", path)
850 TestResult::ProbeFailure { path } => {
851 panic!("Unexpected probe failure: {:?}", path)
853 TestResult::ProbeSuccess { path } => {
854 panic!("Unexpected probe success: {:?}", path)
860 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
861 if let Some(expectations) = &mut self.event_expectations {
862 match expectations.pop_front().unwrap() {
863 TestResult::PaymentFailure { path, .. } => {
864 panic!("Unexpected payment path failure: {:?}", path)
866 TestResult::PaymentSuccess { path } => {
867 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
869 TestResult::ProbeFailure { path } => {
870 panic!("Unexpected probe failure: {:?}", path)
872 TestResult::ProbeSuccess { path } => {
873 panic!("Unexpected probe success: {:?}", path)
879 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
880 if let Some(expectations) = &mut self.event_expectations {
881 match expectations.pop_front().unwrap() {
882 TestResult::PaymentFailure { path, .. } => {
883 panic!("Unexpected payment path failure: {:?}", path)
885 TestResult::PaymentSuccess { path } => {
886 panic!("Unexpected payment path success: {:?}", path)
888 TestResult::ProbeFailure { path } => {
889 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
891 TestResult::ProbeSuccess { path } => {
892 panic!("Unexpected probe success: {:?}", path)
897 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
898 if let Some(expectations) = &mut self.event_expectations {
899 match expectations.pop_front().unwrap() {
900 TestResult::PaymentFailure { path, .. } => {
901 panic!("Unexpected payment path failure: {:?}", path)
903 TestResult::PaymentSuccess { path } => {
904 panic!("Unexpected payment path success: {:?}", path)
906 TestResult::ProbeFailure { path } => {
907 panic!("Unexpected probe failure: {:?}", path)
909 TestResult::ProbeSuccess { path } => {
910 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
917 impl Drop for TestScorer {
919 if std::thread::panicking() {
923 if let Some(event_expectations) = &self.event_expectations {
924 if !event_expectations.is_empty() {
925 panic!("Unsatisfied event expectations: {:?}", event_expectations);
931 fn get_full_filepath(filepath: String, filename: String) -> String {
932 let mut path = PathBuf::from(filepath);
934 path.to_str().unwrap().to_string()
937 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
938 let mut nodes = Vec::new();
939 for i in 0..num_nodes {
940 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
941 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
942 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
943 let network = Network::Testnet;
944 let genesis_block = genesis_block(network);
945 let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
946 let scorer = Arc::new(Mutex::new(TestScorer::new()));
947 let seed = [i as u8; 32];
948 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
949 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
950 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
951 let now = Duration::from_secs(genesis_block.header.time as u64);
952 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
953 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
954 let best_block = BestBlock::from_network(network);
955 let params = ChainParameters { network, best_block };
956 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
957 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
958 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
959 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
960 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
961 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
965 for i in 0..num_nodes {
966 for j in (i+1)..num_nodes {
967 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap();
968 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap();
975 macro_rules! open_channel {
976 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
977 begin_open_channel!($node_a, $node_b, $channel_value);
978 let events = $node_a.node.get_and_clear_pending_events();
979 assert_eq!(events.len(), 1);
980 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
981 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
986 macro_rules! begin_open_channel {
987 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
988 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
989 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
990 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
994 macro_rules! handle_funding_generation_ready {
995 ($event: expr, $channel_value: expr) => {{
997 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
998 assert_eq!(channel_value_satoshis, $channel_value);
999 assert_eq!(user_channel_id, 42);
1001 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1002 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1004 (temporary_channel_id, tx)
1006 _ => panic!("Unexpected event"),
1011 macro_rules! end_open_channel {
1012 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1013 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1014 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1015 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1019 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1020 for i in 1..=depth {
1021 let prev_blockhash = node.best_block.block_hash();
1022 let height = node.best_block.height() + 1;
1023 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1024 let txdata = vec![(0, tx)];
1025 node.best_block = BestBlock::new(header.block_hash(), height);
1028 node.node.transactions_confirmed(&header, &txdata, height);
1029 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1031 x if x == depth => {
1032 node.node.best_block_updated(&header, height);
1033 node.chain_monitor.best_block_updated(&header, height);
1039 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1040 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1044 fn test_background_processor() {
1045 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1046 // updates. Also test that when new updates are available, the manager signals that it needs
1047 // re-persistence and is successfully re-persisted.
1048 let nodes = create_nodes(2, "test_background_processor".to_string());
1050 // Go through the channel creation process so that each node has something to persist. Since
1051 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1052 // avoid a race with processing events.
1053 let tx = open_channel!(nodes[0], nodes[1], 100000);
1055 // Initiate the background processors to watch each node.
1056 let data_dir = nodes[0].persister.get_data_dir();
1057 let persister = Arc::new(Persister::new(data_dir));
1058 let event_handler = |_: _| {};
1059 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1061 macro_rules! check_persisted_data {
1062 ($node: expr, $filepath: expr) => {
1063 let mut expected_bytes = Vec::new();
1065 expected_bytes.clear();
1066 match $node.write(&mut expected_bytes) {
1068 match std::fs::read($filepath) {
1070 if bytes == expected_bytes {
1079 Err(e) => panic!("Unexpected error: {}", e)
1085 // Check that the initial channel manager data is persisted as expected.
1086 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1087 check_persisted_data!(nodes[0].node, filepath.clone());
1090 if !nodes[0].node.get_persistence_condvar_value() { break }
1093 // Force-close the channel.
1094 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1096 // Check that the force-close updates are persisted.
1097 check_persisted_data!(nodes[0].node, filepath.clone());
1099 if !nodes[0].node.get_persistence_condvar_value() { break }
1102 // Check network graph is persisted
1103 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1104 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1106 // Check scorer is persisted
1107 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1108 check_persisted_data!(nodes[0].scorer, filepath.clone());
1110 assert!(bg_processor.stop().is_ok());
1114 fn test_timer_tick_called() {
1115 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1116 // `FRESHNESS_TIMER`.
1117 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1118 let data_dir = nodes[0].persister.get_data_dir();
1119 let persister = Arc::new(Persister::new(data_dir));
1120 let event_handler = |_: _| {};
1121 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1123 let log_entries = nodes[0].logger.lines.lock().unwrap();
1124 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1125 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1126 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1127 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1132 assert!(bg_processor.stop().is_ok());
1136 fn test_channel_manager_persist_error() {
1137 // Test that if we encounter an error during manager persistence, the thread panics.
1138 let nodes = create_nodes(2, "test_persist_error".to_string());
1139 open_channel!(nodes[0], nodes[1], 100000);
1141 let data_dir = nodes[0].persister.get_data_dir();
1142 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1143 let event_handler = |_: _| {};
1144 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1145 match bg_processor.join() {
1146 Ok(_) => panic!("Expected error persisting manager"),
1148 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1149 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1155 fn test_network_graph_persist_error() {
1156 // Test that if we encounter an error during network graph persistence, an error gets returned.
1157 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1158 let data_dir = nodes[0].persister.get_data_dir();
1159 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1160 let event_handler = |_: _| {};
1161 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1163 match bg_processor.stop() {
1164 Ok(_) => panic!("Expected error persisting network graph"),
1166 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1167 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1173 fn test_scorer_persist_error() {
1174 // Test that if we encounter an error during scorer persistence, an error gets returned.
1175 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1176 let data_dir = nodes[0].persister.get_data_dir();
1177 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1178 let event_handler = |_: _| {};
1179 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1181 match bg_processor.stop() {
1182 Ok(_) => panic!("Expected error persisting scorer"),
1184 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1185 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1191 fn test_background_event_handling() {
1192 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1193 let channel_value = 100000;
1194 let data_dir = nodes[0].persister.get_data_dir();
1195 let persister = Arc::new(Persister::new(data_dir.clone()));
1197 // Set up a background event handler for FundingGenerationReady events.
1198 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1199 let event_handler = move |event: Event| match event {
1200 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1201 Event::ChannelReady { .. } => {},
1202 _ => panic!("Unexpected event: {:?}", event),
1205 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1207 // Open a channel and check that the FundingGenerationReady event was handled.
1208 begin_open_channel!(nodes[0], nodes[1], channel_value);
1209 let (temporary_channel_id, funding_tx) = receiver
1210 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1211 .expect("FundingGenerationReady not handled within deadline");
1212 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1214 // Confirm the funding transaction.
1215 confirm_transaction(&mut nodes[0], &funding_tx);
1216 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1217 confirm_transaction(&mut nodes[1], &funding_tx);
1218 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1219 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1220 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1221 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1222 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1224 assert!(bg_processor.stop().is_ok());
1226 // Set up a background event handler for SpendableOutputs events.
1227 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1228 let event_handler = move |event: Event| match event {
1229 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1230 Event::ChannelReady { .. } => {},
1231 Event::ChannelClosed { .. } => {},
1232 _ => panic!("Unexpected event: {:?}", event),
1234 let persister = Arc::new(Persister::new(data_dir));
1235 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1237 // Force close the channel and check that the SpendableOutputs event was handled.
1238 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1239 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1240 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1242 let event = receiver
1243 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1244 .expect("Events not handled within deadline");
1246 Event::SpendableOutputs { .. } => {},
1247 _ => panic!("Unexpected event: {:?}", event),
1250 assert!(bg_processor.stop().is_ok());
1254 fn test_scorer_persistence() {
1255 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1256 let data_dir = nodes[0].persister.get_data_dir();
1257 let persister = Arc::new(Persister::new(data_dir));
1258 let event_handler = |_: _| {};
1259 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1262 let log_entries = nodes[0].logger.lines.lock().unwrap();
1263 let expected_log = "Persisting scorer".to_string();
1264 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1269 assert!(bg_processor.stop().is_ok());
1273 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1274 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1275 let data_dir = nodes[0].persister.get_data_dir();
1276 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1277 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1278 let network_graph = nodes[0].network_graph.clone();
1279 let features = ChannelFeatures::empty();
1280 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1281 .expect("Failed to update channel from partial announcement");
1282 let original_graph_description = network_graph.to_string();
1283 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1284 assert_eq!(network_graph.read_only().channels().len(), 1);
1286 let event_handler = |_: _| {};
1287 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1290 let log_entries = nodes[0].logger.lines.lock().unwrap();
1291 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1292 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1295 // Wait until the loop has gone around at least twice.
1300 let initialization_input = vec![
1301 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1302 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1303 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1304 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1305 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1306 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1307 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1308 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1309 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1310 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1311 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1312 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1313 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1315 nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1317 // this should have added two channels
1318 assert_eq!(network_graph.read_only().channels().len(), 3);
1321 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1322 .expect("Network graph not pruned within deadline");
1324 background_processor.stop().unwrap();
1326 // all channels should now be pruned
1327 assert_eq!(network_graph.read_only().channels().len(), 0);
1331 fn test_payment_path_scoring() {
1332 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1333 // that we update the scorer upon a payment path succeeding (note that the channel must be
1334 // public or else we won't score it).
1335 // Set up a background event handler for FundingGenerationReady events.
1336 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1337 let event_handler = move |event: Event| match event {
1338 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1339 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1340 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1341 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1342 _ => panic!("Unexpected event: {:?}", event),
1345 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1346 let data_dir = nodes[0].persister.get_data_dir();
1347 let persister = Arc::new(Persister::new(data_dir.clone()));
1348 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1350 let scored_scid = 4242;
1351 let secp_ctx = Secp256k1::new();
1352 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1353 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1355 let path = vec![RouteHop {
1357 node_features: NodeFeatures::empty(),
1358 short_channel_id: scored_scid,
1359 channel_features: ChannelFeatures::empty(),
1361 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1364 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1365 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1367 payment_hash: PaymentHash([42; 32]),
1368 payment_failed_permanently: false,
1369 failure: PathFailure::OnPath { network_update: None },
1371 short_channel_id: Some(scored_scid),
1374 let event = receiver
1375 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1376 .expect("PaymentPathFailed not handled within deadline");
1378 Event::PaymentPathFailed { .. } => {},
1379 _ => panic!("Unexpected event"),
1382 // Ensure we'll score payments that were explicitly failed back by the destination as
1384 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1385 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1387 payment_hash: PaymentHash([42; 32]),
1388 payment_failed_permanently: true,
1389 failure: PathFailure::OnPath { network_update: None },
1391 short_channel_id: None,
1394 let event = receiver
1395 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1396 .expect("PaymentPathFailed not handled within deadline");
1398 Event::PaymentPathFailed { .. } => {},
1399 _ => panic!("Unexpected event"),
1402 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1403 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1404 payment_id: PaymentId([42; 32]),
1408 let event = receiver
1409 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1410 .expect("PaymentPathSuccessful not handled within deadline");
1412 Event::PaymentPathSuccessful { .. } => {},
1413 _ => panic!("Unexpected event"),
1416 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1417 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1418 payment_id: PaymentId([42; 32]),
1419 payment_hash: PaymentHash([42; 32]),
1422 let event = receiver
1423 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1424 .expect("ProbeSuccessful not handled within deadline");
1426 Event::ProbeSuccessful { .. } => {},
1427 _ => panic!("Unexpected event"),
1430 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1431 nodes[0].node.push_pending_event(Event::ProbeFailed {
1432 payment_id: PaymentId([42; 32]),
1433 payment_hash: PaymentHash([42; 32]),
1435 short_channel_id: Some(scored_scid),
1437 let event = receiver
1438 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1439 .expect("ProbeFailure not handled within deadline");
1441 Event::ProbeFailed { .. } => {},
1442 _ => panic!("Unexpected event"),
1445 assert!(bg_processor.stop().is_ok());