1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[cfg(not(feature = "std"))]
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::events::{Event, PathFailure};
30 #[cfg(feature = "std")]
31 use lightning::events::{EventHandler, EventsProvider};
32 use lightning::ln::channelmanager::ChannelManager;
33 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
34 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
35 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
36 use lightning::routing::utxo::UtxoLookup;
37 use lightning::routing::router::Router;
38 use lightning::routing::scoring::{Score, WriteableScore};
39 use lightning::util::logger::Logger;
40 use lightning::util::persist::Persister;
41 use lightning_rapid_gossip_sync::RapidGossipSync;
44 use core::time::Duration;
46 #[cfg(feature = "std")]
48 #[cfg(feature = "std")]
49 use core::sync::atomic::{AtomicBool, Ordering};
50 #[cfg(feature = "std")]
51 use std::thread::{self, JoinHandle};
52 #[cfg(feature = "std")]
53 use std::time::Instant;
55 #[cfg(feature = "futures")]
56 use futures_util::task;
57 #[cfg(not(feature = "std"))]
60 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
61 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
62 /// responsibilities are:
63 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
64 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
65 /// writing it to disk/backups by invoking the callback given to it at startup.
66 /// [`ChannelManager`] persistence should be done in the background.
67 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
68 /// at the appropriate intervals.
69 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
70 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
72 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
73 /// upon as doing so may result in high latency.
77 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
78 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
79 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
80 /// unilateral chain closure fees are at risk.
82 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
83 /// [`Event`]: lightning::events::Event
84 #[cfg(feature = "std")]
85 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
86 pub struct BackgroundProcessor {
87 stop_thread: Arc<AtomicBool>,
88 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
92 const FRESHNESS_TIMER: u64 = 60;
94 const FRESHNESS_TIMER: u64 = 1;
96 #[cfg(all(not(test), not(debug_assertions)))]
97 const PING_TIMER: u64 = 10;
98 /// Signature operations take a lot longer without compiler optimisations.
99 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
100 /// timeout is reached.
101 #[cfg(all(not(test), debug_assertions))]
102 const PING_TIMER: u64 = 30;
104 const PING_TIMER: u64 = 1;
106 /// Prune the network graph of stale entries hourly.
107 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
110 const SCORER_PERSIST_TIMER: u64 = 30;
112 const SCORER_PERSIST_TIMER: u64 = 1;
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
117 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
119 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
121 P: Deref<Target = P2PGossipSync<G, U, L>>,
122 R: Deref<Target = RapidGossipSync<G, L>>,
123 G: Deref<Target = NetworkGraph<L>>,
127 where U::Target: UtxoLookup, L::Target: Logger {
128 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
130 /// Rapid gossip sync from a trusted server.
137 P: Deref<Target = P2PGossipSync<G, U, L>>,
138 R: Deref<Target = RapidGossipSync<G, L>>,
139 G: Deref<Target = NetworkGraph<L>>,
142 > GossipSync<P, R, G, U, L>
143 where U::Target: UtxoLookup, L::Target: Logger {
144 fn network_graph(&self) -> Option<&G> {
146 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
147 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
148 GossipSync::None => None,
152 fn prunable_network_graph(&self) -> Option<&G> {
154 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
155 GossipSync::Rapid(gossip_sync) => {
156 if gossip_sync.is_initial_sync_complete() {
157 Some(gossip_sync.network_graph())
162 GossipSync::None => None,
167 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
168 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
169 GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
171 U::Target: UtxoLookup,
174 /// Initializes a new [`GossipSync::P2P`] variant.
175 pub fn p2p(gossip_sync: P) -> Self {
176 GossipSync::P2P(gossip_sync)
180 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
181 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
183 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
186 &'a (dyn UtxoLookup + Send + Sync),
192 /// Initializes a new [`GossipSync::Rapid`] variant.
193 pub fn rapid(gossip_sync: R) -> Self {
194 GossipSync::Rapid(gossip_sync)
198 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
201 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
202 &RapidGossipSync<&'a NetworkGraph<L>, L>,
204 &'a (dyn UtxoLookup + Send + Sync),
210 /// Initializes a new [`GossipSync::None`] variant.
211 pub fn none() -> Self {
216 fn handle_network_graph_update<L: Deref>(
217 network_graph: &NetworkGraph<L>, event: &Event
218 ) where L::Target: Logger {
219 if let Event::PaymentPathFailed {
220 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
222 network_graph.handle_network_update(upd);
226 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
227 scorer: &'a S, event: &Event
229 let mut score = scorer.lock();
231 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
232 let path = path.iter().collect::<Vec<_>>();
233 score.payment_path_failed(&path, *scid);
235 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
236 // Reached if the destination explicitly failed it back. We treat this as a successful probe
237 // because the payment made it all the way to the destination with sufficient liquidity.
238 let path = path.iter().collect::<Vec<_>>();
239 score.probe_successful(&path);
241 Event::PaymentPathSuccessful { path, .. } => {
242 let path = path.iter().collect::<Vec<_>>();
243 score.payment_path_successful(&path);
245 Event::ProbeSuccessful { path, .. } => {
246 let path = path.iter().collect::<Vec<_>>();
247 score.probe_successful(&path);
249 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
250 let path = path.iter().collect::<Vec<_>>();
251 score.probe_failed(&path, *scid);
257 macro_rules! define_run_body {
258 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
259 $channel_manager: ident, $process_channel_manager_events: expr,
260 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
261 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
263 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
264 $channel_manager.timer_tick_occurred();
266 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
267 let mut last_ping_call = $get_timer(PING_TIMER);
268 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
269 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
270 let mut have_pruned = false;
273 $process_channel_manager_events;
274 $process_chain_monitor_events;
276 // Note that the PeerManager::process_events may block on ChannelManager's locks,
277 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
278 // we want to ensure we get into `persist_manager` as quickly as we can, especially
279 // without running the normal event processing above and handing events to users.
281 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
282 // processing a message effectively at any point during this loop. In order to
283 // minimize the time between such processing completing and persisting the updated
284 // ChannelManager, we want to minimize methods blocking on a ChannelManager
285 // generally, and as a fallback place such blocking only immediately before
287 $peer_manager.process_events();
289 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
290 // see `await_start`'s use below.
291 let mut await_start = $get_timer(1);
292 let updates_available = $await;
293 let await_slow = $timer_elapsed(&mut await_start, 1);
295 if updates_available {
296 log_trace!($logger, "Persisting ChannelManager...");
297 $persister.persist_manager(&*$channel_manager)?;
298 log_trace!($logger, "Done persisting ChannelManager.");
300 // Exit the loop if the background processor was requested to stop.
301 if $loop_exit_check {
302 log_trace!($logger, "Terminating background processor.");
305 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
306 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
307 $channel_manager.timer_tick_occurred();
308 last_freshness_call = $get_timer(FRESHNESS_TIMER);
311 // On various platforms, we may be starved of CPU cycles for several reasons.
312 // E.g. on iOS, if we've been in the background, we will be entirely paused.
313 // Similarly, if we're on a desktop platform and the device has been asleep, we
314 // may not get any cycles.
315 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
316 // full second, at which point we assume sockets may have been killed (they
317 // appear to be at least on some platforms, even if it has only been a second).
318 // Note that we have to take care to not get here just because user event
319 // processing was slow at the top of the loop. For example, the sample client
320 // may call Bitcoin Core RPCs during event handling, which very often takes
321 // more than a handful of seconds to complete, and shouldn't disconnect all our
323 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
324 $peer_manager.disconnect_all_peers();
325 last_ping_call = $get_timer(PING_TIMER);
326 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
327 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
328 $peer_manager.timer_tick_occurred();
329 last_ping_call = $get_timer(PING_TIMER);
332 // Note that we want to run a graph prune once not long after startup before
333 // falling back to our usual hourly prunes. This avoids short-lived clients never
334 // pruning their network graph. We run once 60 seconds after startup before
335 // continuing our normal cadence.
336 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
337 // The network graph must not be pruned while rapid sync completion is pending
338 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
339 #[cfg(feature = "std")] {
340 log_trace!($logger, "Pruning and persisting network graph.");
341 network_graph.remove_stale_channels_and_tracking();
343 #[cfg(not(feature = "std"))] {
344 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
345 log_trace!($logger, "Persisting network graph.");
348 if let Err(e) = $persister.persist_graph(network_graph) {
349 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
354 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
357 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
358 if let Some(ref scorer) = $scorer {
359 log_trace!($logger, "Persisting scorer");
360 if let Err(e) = $persister.persist_scorer(&scorer) {
361 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
364 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
368 // After we exit, ensure we persist the ChannelManager one final time - this avoids
369 // some races where users quit while channel updates were in-flight, with
370 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
371 $persister.persist_manager(&*$channel_manager)?;
373 // Persist Scorer on exit
374 if let Some(ref scorer) = $scorer {
375 $persister.persist_scorer(&scorer)?;
378 // Persist NetworkGraph on exit
379 if let Some(network_graph) = $gossip_sync.network_graph() {
380 $persister.persist_graph(network_graph)?;
387 #[cfg(feature = "futures")]
388 use core::future::Future;
389 #[cfg(feature = "futures")]
390 use core::task::Poll;
391 #[cfg(feature = "futures")]
393 #[cfg(feature = "futures")]
394 use core::marker::Unpin;
395 #[cfg(feature = "futures")]
396 struct Selector<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> {
400 #[cfg(feature = "futures")]
401 enum SelectorOutput {
405 #[cfg(feature = "futures")]
406 impl<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> Future for Selector<A, B> {
407 type Output = SelectorOutput;
408 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
409 match Pin::new(&mut self.a).poll(ctx) {
410 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
413 match Pin::new(&mut self.b).poll(ctx) {
414 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
421 /// Processes background events in a future.
423 /// `sleeper` should return a future which completes in the given amount of time and returns a
424 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
425 /// future which outputs true, the loop will exit and this function's future will complete.
427 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
429 /// Requires the `futures` feature. Note that while this method is available without the `std`
430 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
431 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
432 /// manually instead.
433 #[cfg(feature = "futures")]
434 pub async fn process_events_async<
436 UL: 'static + Deref + Send + Sync,
437 CF: 'static + Deref + Send + Sync,
438 CW: 'static + Deref + Send + Sync,
439 T: 'static + Deref + Send + Sync,
440 ES: 'static + Deref + Send + Sync,
441 NS: 'static + Deref + Send + Sync,
442 SP: 'static + Deref + Send + Sync,
443 F: 'static + Deref + Send + Sync,
444 R: 'static + Deref + Send + Sync,
445 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
446 L: 'static + Deref + Send + Sync,
447 P: 'static + Deref + Send + Sync,
448 Descriptor: 'static + SocketDescriptor + Send + Sync,
449 CMH: 'static + Deref + Send + Sync,
450 RMH: 'static + Deref + Send + Sync,
451 OMH: 'static + Deref + Send + Sync,
452 EventHandlerFuture: core::future::Future<Output = ()>,
453 EventHandler: Fn(Event) -> EventHandlerFuture,
454 PS: 'static + Deref + Send,
455 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
456 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
457 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
458 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
459 UMH: 'static + Deref + Send + Sync,
460 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
461 S: 'static + Deref<Target = SC> + Send + Sync,
462 SC: for<'b> WriteableScore<'b>,
463 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
464 Sleeper: Fn(Duration) -> SleepFuture
466 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
467 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
469 ) -> Result<(), lightning::io::Error>
471 UL::Target: 'static + UtxoLookup,
472 CF::Target: 'static + chain::Filter,
473 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
474 T::Target: 'static + BroadcasterInterface,
475 ES::Target: 'static + EntropySource,
476 NS::Target: 'static + NodeSigner,
477 SP::Target: 'static + SignerProvider,
478 F::Target: 'static + FeeEstimator,
479 R::Target: 'static + Router,
480 L::Target: 'static + Logger,
481 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
482 CMH::Target: 'static + ChannelMessageHandler,
483 OMH::Target: 'static + OnionMessageHandler,
484 RMH::Target: 'static + RoutingMessageHandler,
485 UMH::Target: 'static + CustomMessageHandler,
486 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
488 let mut should_break = true;
489 let async_event_handler = |event| {
490 let network_graph = gossip_sync.network_graph();
491 let event_handler = &event_handler;
492 let scorer = &scorer;
494 if let Some(network_graph) = network_graph {
495 handle_network_graph_update(network_graph, &event)
497 if let Some(ref scorer) = scorer {
498 update_scorer(scorer, &event);
500 event_handler(event).await;
503 define_run_body!(persister,
504 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
505 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
506 gossip_sync, peer_manager, logger, scorer, should_break, {
508 a: channel_manager.get_persistable_update_future(),
509 b: sleeper(Duration::from_millis(100)),
512 SelectorOutput::A => true,
513 SelectorOutput::B(exit) => {
518 }, |t| sleeper(Duration::from_secs(t)),
519 |fut: &mut SleepFuture, _| {
520 let mut waker = task::noop_waker();
521 let mut ctx = task::Context::from_waker(&mut waker);
522 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
526 #[cfg(feature = "std")]
527 impl BackgroundProcessor {
528 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
531 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
532 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
533 /// either [`join`] or [`stop`].
535 /// # Data Persistence
537 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
538 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
539 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
540 /// provided implementation.
542 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
543 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
544 /// See the `lightning-persister` crate for LDK's provided implementation.
546 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
547 /// error or call [`join`] and handle any error that may arise. For the latter case,
548 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
552 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
553 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
554 /// functionality implemented by other handlers.
555 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
557 /// # Rapid Gossip Sync
559 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
560 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
561 /// until the [`RapidGossipSync`] instance completes its first sync.
563 /// [top-level documentation]: BackgroundProcessor
564 /// [`join`]: Self::join
565 /// [`stop`]: Self::stop
566 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
567 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
568 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
569 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
570 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
571 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
574 UL: 'static + Deref + Send + Sync,
575 CF: 'static + Deref + Send + Sync,
576 CW: 'static + Deref + Send + Sync,
577 T: 'static + Deref + Send + Sync,
578 ES: 'static + Deref + Send + Sync,
579 NS: 'static + Deref + Send + Sync,
580 SP: 'static + Deref + Send + Sync,
581 F: 'static + Deref + Send + Sync,
582 R: 'static + Deref + Send + Sync,
583 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
584 L: 'static + Deref + Send + Sync,
585 P: 'static + Deref + Send + Sync,
586 Descriptor: 'static + SocketDescriptor + Send + Sync,
587 CMH: 'static + Deref + Send + Sync,
588 OMH: 'static + Deref + Send + Sync,
589 RMH: 'static + Deref + Send + Sync,
590 EH: 'static + EventHandler + Send,
591 PS: 'static + Deref + Send,
592 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
593 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
594 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
595 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
596 UMH: 'static + Deref + Send + Sync,
597 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
598 S: 'static + Deref<Target = SC> + Send + Sync,
599 SC: for <'b> WriteableScore<'b>,
601 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
602 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
605 UL::Target: 'static + UtxoLookup,
606 CF::Target: 'static + chain::Filter,
607 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
608 T::Target: 'static + BroadcasterInterface,
609 ES::Target: 'static + EntropySource,
610 NS::Target: 'static + NodeSigner,
611 SP::Target: 'static + SignerProvider,
612 F::Target: 'static + FeeEstimator,
613 R::Target: 'static + Router,
614 L::Target: 'static + Logger,
615 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
616 CMH::Target: 'static + ChannelMessageHandler,
617 OMH::Target: 'static + OnionMessageHandler,
618 RMH::Target: 'static + RoutingMessageHandler,
619 UMH::Target: 'static + CustomMessageHandler,
620 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
622 let stop_thread = Arc::new(AtomicBool::new(false));
623 let stop_thread_clone = stop_thread.clone();
624 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
625 let event_handler = |event| {
626 let network_graph = gossip_sync.network_graph();
627 if let Some(network_graph) = network_graph {
628 handle_network_graph_update(network_graph, &event)
630 if let Some(ref scorer) = scorer {
631 update_scorer(scorer, &event);
633 event_handler.handle_event(event);
635 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
636 channel_manager, channel_manager.process_pending_events(&event_handler),
637 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
638 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
639 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
641 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
644 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
645 /// [`ChannelManager`].
649 /// This function panics if the background thread has panicked such as while persisting or
652 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
653 pub fn join(mut self) -> Result<(), std::io::Error> {
654 assert!(self.thread_handle.is_some());
658 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
659 /// [`ChannelManager`].
663 /// This function panics if the background thread has panicked such as while persisting or
666 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
667 pub fn stop(mut self) -> Result<(), std::io::Error> {
668 assert!(self.thread_handle.is_some());
669 self.stop_and_join_thread()
672 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
673 self.stop_thread.store(true, Ordering::Release);
677 fn join_thread(&mut self) -> Result<(), std::io::Error> {
678 match self.thread_handle.take() {
679 Some(handle) => handle.join().unwrap(),
685 #[cfg(feature = "std")]
686 impl Drop for BackgroundProcessor {
688 self.stop_and_join_thread().unwrap();
692 #[cfg(all(feature = "std", test))]
694 use bitcoin::blockdata::block::BlockHeader;
695 use bitcoin::blockdata::constants::genesis_block;
696 use bitcoin::blockdata::locktime::PackedLockTime;
697 use bitcoin::blockdata::transaction::{Transaction, TxOut};
698 use bitcoin::network::constants::Network;
699 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
700 use lightning::chain::{BestBlock, Confirm, chainmonitor};
701 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
702 use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
703 use lightning::chain::transaction::OutPoint;
704 use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
705 use lightning::get_event_msg;
706 use lightning::ln::PaymentHash;
707 use lightning::ln::channelmanager;
708 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
709 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
710 use lightning::ln::msgs::{ChannelMessageHandler, Init};
711 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
712 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
713 use lightning::routing::router::{DefaultRouter, RouteHop};
714 use lightning::routing::scoring::{ChannelUsage, Score};
715 use lightning::util::config::UserConfig;
716 use lightning::util::ser::Writeable;
717 use lightning::util::test_utils;
718 use lightning::util::persist::KVStorePersister;
719 use lightning_persister::FilesystemPersister;
720 use std::collections::VecDeque;
722 use std::path::PathBuf;
723 use std::sync::{Arc, Mutex};
724 use std::sync::mpsc::SyncSender;
725 use std::time::Duration;
726 use bitcoin::hashes::Hash;
727 use bitcoin::TxMerkleNode;
728 use lightning_rapid_gossip_sync::RapidGossipSync;
729 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
731 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
733 #[derive(Clone, Hash, PartialEq, Eq)]
734 struct TestDescriptor{}
735 impl SocketDescriptor for TestDescriptor {
736 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
740 fn disconnect_socket(&mut self) {}
743 type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
745 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
747 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
748 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
751 node: Arc<ChannelManager>,
752 p2p_gossip_sync: PGS,
753 rapid_gossip_sync: RGS,
754 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
755 chain_monitor: Arc<ChainMonitor>,
756 persister: Arc<FilesystemPersister>,
757 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
758 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
759 logger: Arc<test_utils::TestLogger>,
760 best_block: BestBlock,
761 scorer: Arc<Mutex<TestScorer>>,
765 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
766 GossipSync::P2P(self.p2p_gossip_sync.clone())
769 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
770 GossipSync::Rapid(self.rapid_gossip_sync.clone())
773 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
780 let data_dir = self.persister.get_data_dir();
781 match fs::remove_dir_all(data_dir.clone()) {
782 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
789 graph_error: Option<(std::io::ErrorKind, &'static str)>,
790 graph_persistence_notifier: Option<SyncSender<()>>,
791 manager_error: Option<(std::io::ErrorKind, &'static str)>,
792 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
793 filesystem_persister: FilesystemPersister,
797 fn new(data_dir: String) -> Self {
798 let filesystem_persister = FilesystemPersister::new(data_dir);
799 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
802 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
803 Self { graph_error: Some((error, message)), ..self }
806 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
807 Self { graph_persistence_notifier: Some(sender), ..self }
810 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
811 Self { manager_error: Some((error, message)), ..self }
814 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
815 Self { scorer_error: Some((error, message)), ..self }
819 impl KVStorePersister for Persister {
820 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
821 if key == "manager" {
822 if let Some((error, message)) = self.manager_error {
823 return Err(std::io::Error::new(error, message))
827 if key == "network_graph" {
828 if let Some(sender) = &self.graph_persistence_notifier {
829 sender.send(()).unwrap();
832 if let Some((error, message)) = self.graph_error {
833 return Err(std::io::Error::new(error, message))
838 if let Some((error, message)) = self.scorer_error {
839 return Err(std::io::Error::new(error, message))
843 self.filesystem_persister.persist(key, object)
848 event_expectations: Option<VecDeque<TestResult>>,
853 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
854 PaymentSuccess { path: Vec<RouteHop> },
855 ProbeFailure { path: Vec<RouteHop> },
856 ProbeSuccess { path: Vec<RouteHop> },
861 Self { event_expectations: None }
864 fn expect(&mut self, expectation: TestResult) {
865 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
869 impl lightning::util::ser::Writeable for TestScorer {
870 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
873 impl Score for TestScorer {
874 fn channel_penalty_msat(
875 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
876 ) -> u64 { unimplemented!(); }
878 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
879 if let Some(expectations) = &mut self.event_expectations {
880 match expectations.pop_front().unwrap() {
881 TestResult::PaymentFailure { path, short_channel_id } => {
882 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
883 assert_eq!(actual_short_channel_id, short_channel_id);
885 TestResult::PaymentSuccess { path } => {
886 panic!("Unexpected successful payment path: {:?}", path)
888 TestResult::ProbeFailure { path } => {
889 panic!("Unexpected probe failure: {:?}", path)
891 TestResult::ProbeSuccess { path } => {
892 panic!("Unexpected probe success: {:?}", path)
898 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
899 if let Some(expectations) = &mut self.event_expectations {
900 match expectations.pop_front().unwrap() {
901 TestResult::PaymentFailure { path, .. } => {
902 panic!("Unexpected payment path failure: {:?}", path)
904 TestResult::PaymentSuccess { path } => {
905 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
907 TestResult::ProbeFailure { path } => {
908 panic!("Unexpected probe failure: {:?}", path)
910 TestResult::ProbeSuccess { path } => {
911 panic!("Unexpected probe success: {:?}", path)
917 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
918 if let Some(expectations) = &mut self.event_expectations {
919 match expectations.pop_front().unwrap() {
920 TestResult::PaymentFailure { path, .. } => {
921 panic!("Unexpected payment path failure: {:?}", path)
923 TestResult::PaymentSuccess { path } => {
924 panic!("Unexpected payment path success: {:?}", path)
926 TestResult::ProbeFailure { path } => {
927 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
929 TestResult::ProbeSuccess { path } => {
930 panic!("Unexpected probe success: {:?}", path)
935 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
936 if let Some(expectations) = &mut self.event_expectations {
937 match expectations.pop_front().unwrap() {
938 TestResult::PaymentFailure { path, .. } => {
939 panic!("Unexpected payment path failure: {:?}", path)
941 TestResult::PaymentSuccess { path } => {
942 panic!("Unexpected payment path success: {:?}", path)
944 TestResult::ProbeFailure { path } => {
945 panic!("Unexpected probe failure: {:?}", path)
947 TestResult::ProbeSuccess { path } => {
948 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
955 impl Drop for TestScorer {
957 if std::thread::panicking() {
961 if let Some(event_expectations) = &self.event_expectations {
962 if !event_expectations.is_empty() {
963 panic!("Unsatisfied event expectations: {:?}", event_expectations);
969 fn get_full_filepath(filepath: String, filename: String) -> String {
970 let mut path = PathBuf::from(filepath);
972 path.to_str().unwrap().to_string()
975 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
976 let mut nodes = Vec::new();
977 for i in 0..num_nodes {
978 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
979 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
980 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
981 let network = Network::Testnet;
982 let genesis_block = genesis_block(network);
983 let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
984 let scorer = Arc::new(Mutex::new(TestScorer::new()));
985 let seed = [i as u8; 32];
986 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
987 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
988 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
989 let now = Duration::from_secs(genesis_block.header.time as u64);
990 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
991 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
992 let best_block = BestBlock::from_network(network);
993 let params = ChainParameters { network, best_block };
994 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
995 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
996 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
997 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
998 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
999 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
1003 for i in 0..num_nodes {
1004 for j in (i+1)..num_nodes {
1005 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap();
1006 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap();
1013 macro_rules! open_channel {
1014 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1015 begin_open_channel!($node_a, $node_b, $channel_value);
1016 let events = $node_a.node.get_and_clear_pending_events();
1017 assert_eq!(events.len(), 1);
1018 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1019 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
1024 macro_rules! begin_open_channel {
1025 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1026 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
1027 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1028 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1032 macro_rules! handle_funding_generation_ready {
1033 ($event: expr, $channel_value: expr) => {{
1035 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1036 assert_eq!(channel_value_satoshis, $channel_value);
1037 assert_eq!(user_channel_id, 42);
1039 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1040 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1042 (temporary_channel_id, tx)
1044 _ => panic!("Unexpected event"),
1049 macro_rules! end_open_channel {
1050 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1051 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1052 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1053 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1057 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1058 for i in 1..=depth {
1059 let prev_blockhash = node.best_block.block_hash();
1060 let height = node.best_block.height() + 1;
1061 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1062 let txdata = vec![(0, tx)];
1063 node.best_block = BestBlock::new(header.block_hash(), height);
1066 node.node.transactions_confirmed(&header, &txdata, height);
1067 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1069 x if x == depth => {
1070 node.node.best_block_updated(&header, height);
1071 node.chain_monitor.best_block_updated(&header, height);
1077 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1078 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1082 fn test_background_processor() {
1083 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1084 // updates. Also test that when new updates are available, the manager signals that it needs
1085 // re-persistence and is successfully re-persisted.
1086 let nodes = create_nodes(2, "test_background_processor".to_string());
1088 // Go through the channel creation process so that each node has something to persist. Since
1089 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1090 // avoid a race with processing events.
1091 let tx = open_channel!(nodes[0], nodes[1], 100000);
1093 // Initiate the background processors to watch each node.
1094 let data_dir = nodes[0].persister.get_data_dir();
1095 let persister = Arc::new(Persister::new(data_dir));
1096 let event_handler = |_: _| {};
1097 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1099 macro_rules! check_persisted_data {
1100 ($node: expr, $filepath: expr) => {
1101 let mut expected_bytes = Vec::new();
1103 expected_bytes.clear();
1104 match $node.write(&mut expected_bytes) {
1106 match std::fs::read($filepath) {
1108 if bytes == expected_bytes {
1117 Err(e) => panic!("Unexpected error: {}", e)
1123 // Check that the initial channel manager data is persisted as expected.
1124 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1125 check_persisted_data!(nodes[0].node, filepath.clone());
1128 if !nodes[0].node.get_persistence_condvar_value() { break }
1131 // Force-close the channel.
1132 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1134 // Check that the force-close updates are persisted.
1135 check_persisted_data!(nodes[0].node, filepath.clone());
1137 if !nodes[0].node.get_persistence_condvar_value() { break }
1140 // Check network graph is persisted
1141 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1142 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1144 // Check scorer is persisted
1145 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1146 check_persisted_data!(nodes[0].scorer, filepath.clone());
1148 assert!(bg_processor.stop().is_ok());
1152 fn test_timer_tick_called() {
1153 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1154 // `FRESHNESS_TIMER`.
1155 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1156 let data_dir = nodes[0].persister.get_data_dir();
1157 let persister = Arc::new(Persister::new(data_dir));
1158 let event_handler = |_: _| {};
1159 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1161 let log_entries = nodes[0].logger.lines.lock().unwrap();
1162 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1163 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1164 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1165 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1170 assert!(bg_processor.stop().is_ok());
1174 fn test_channel_manager_persist_error() {
1175 // Test that if we encounter an error during manager persistence, the thread panics.
1176 let nodes = create_nodes(2, "test_persist_error".to_string());
1177 open_channel!(nodes[0], nodes[1], 100000);
1179 let data_dir = nodes[0].persister.get_data_dir();
1180 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1181 let event_handler = |_: _| {};
1182 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1183 match bg_processor.join() {
1184 Ok(_) => panic!("Expected error persisting manager"),
1186 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1187 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1193 fn test_network_graph_persist_error() {
1194 // Test that if we encounter an error during network graph persistence, an error gets returned.
1195 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1196 let data_dir = nodes[0].persister.get_data_dir();
1197 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1198 let event_handler = |_: _| {};
1199 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1201 match bg_processor.stop() {
1202 Ok(_) => panic!("Expected error persisting network graph"),
1204 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1205 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1211 fn test_scorer_persist_error() {
1212 // Test that if we encounter an error during scorer persistence, an error gets returned.
1213 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1214 let data_dir = nodes[0].persister.get_data_dir();
1215 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1216 let event_handler = |_: _| {};
1217 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1219 match bg_processor.stop() {
1220 Ok(_) => panic!("Expected error persisting scorer"),
1222 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1223 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1229 fn test_background_event_handling() {
1230 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1231 let channel_value = 100000;
1232 let data_dir = nodes[0].persister.get_data_dir();
1233 let persister = Arc::new(Persister::new(data_dir.clone()));
1235 // Set up a background event handler for FundingGenerationReady events.
1236 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1237 let event_handler = move |event: Event| match event {
1238 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1239 Event::ChannelReady { .. } => {},
1240 _ => panic!("Unexpected event: {:?}", event),
1243 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1245 // Open a channel and check that the FundingGenerationReady event was handled.
1246 begin_open_channel!(nodes[0], nodes[1], channel_value);
1247 let (temporary_channel_id, funding_tx) = receiver
1248 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1249 .expect("FundingGenerationReady not handled within deadline");
1250 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1252 // Confirm the funding transaction.
1253 confirm_transaction(&mut nodes[0], &funding_tx);
1254 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1255 confirm_transaction(&mut nodes[1], &funding_tx);
1256 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1257 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1258 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1259 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1260 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1262 assert!(bg_processor.stop().is_ok());
1264 // Set up a background event handler for SpendableOutputs events.
1265 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1266 let event_handler = move |event: Event| match event {
1267 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1268 Event::ChannelReady { .. } => {},
1269 Event::ChannelClosed { .. } => {},
1270 _ => panic!("Unexpected event: {:?}", event),
1272 let persister = Arc::new(Persister::new(data_dir));
1273 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1275 // Force close the channel and check that the SpendableOutputs event was handled.
1276 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1277 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1278 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1280 let event = receiver
1281 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1282 .expect("Events not handled within deadline");
1284 Event::SpendableOutputs { .. } => {},
1285 _ => panic!("Unexpected event: {:?}", event),
1288 assert!(bg_processor.stop().is_ok());
1292 fn test_scorer_persistence() {
1293 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1294 let data_dir = nodes[0].persister.get_data_dir();
1295 let persister = Arc::new(Persister::new(data_dir));
1296 let event_handler = |_: _| {};
1297 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1300 let log_entries = nodes[0].logger.lines.lock().unwrap();
1301 let expected_log = "Persisting scorer".to_string();
1302 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1307 assert!(bg_processor.stop().is_ok());
1311 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1312 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1313 let data_dir = nodes[0].persister.get_data_dir();
1314 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1315 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1316 let network_graph = nodes[0].network_graph.clone();
1317 let features = ChannelFeatures::empty();
1318 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1319 .expect("Failed to update channel from partial announcement");
1320 let original_graph_description = network_graph.to_string();
1321 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1322 assert_eq!(network_graph.read_only().channels().len(), 1);
1324 let event_handler = |_: _| {};
1325 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1328 let log_entries = nodes[0].logger.lines.lock().unwrap();
1329 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1330 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1333 // Wait until the loop has gone around at least twice.
1338 let initialization_input = vec![
1339 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1340 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1341 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1342 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1343 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1344 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1345 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1346 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1347 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1348 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1349 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1350 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1351 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1353 nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1355 // this should have added two channels
1356 assert_eq!(network_graph.read_only().channels().len(), 3);
1359 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1360 .expect("Network graph not pruned within deadline");
1362 background_processor.stop().unwrap();
1364 // all channels should now be pruned
1365 assert_eq!(network_graph.read_only().channels().len(), 0);
1369 fn test_payment_path_scoring() {
1370 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1371 // that we update the scorer upon a payment path succeeding (note that the channel must be
1372 // public or else we won't score it).
1373 // Set up a background event handler for FundingGenerationReady events.
1374 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1375 let event_handler = move |event: Event| match event {
1376 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1377 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1378 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1379 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1380 _ => panic!("Unexpected event: {:?}", event),
1383 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1384 let data_dir = nodes[0].persister.get_data_dir();
1385 let persister = Arc::new(Persister::new(data_dir));
1386 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1388 let scored_scid = 4242;
1389 let secp_ctx = Secp256k1::new();
1390 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1391 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1393 let path = vec![RouteHop {
1395 node_features: NodeFeatures::empty(),
1396 short_channel_id: scored_scid,
1397 channel_features: ChannelFeatures::empty(),
1399 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1402 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1403 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1405 payment_hash: PaymentHash([42; 32]),
1406 payment_failed_permanently: false,
1407 failure: PathFailure::OnPath { network_update: None },
1409 short_channel_id: Some(scored_scid),
1411 let event = receiver
1412 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1413 .expect("PaymentPathFailed not handled within deadline");
1415 Event::PaymentPathFailed { .. } => {},
1416 _ => panic!("Unexpected event"),
1419 // Ensure we'll score payments that were explicitly failed back by the destination as
1421 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1422 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1424 payment_hash: PaymentHash([42; 32]),
1425 payment_failed_permanently: true,
1426 failure: PathFailure::OnPath { network_update: None },
1428 short_channel_id: None,
1430 let event = receiver
1431 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1432 .expect("PaymentPathFailed not handled within deadline");
1434 Event::PaymentPathFailed { .. } => {},
1435 _ => panic!("Unexpected event"),
1438 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1439 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1440 payment_id: PaymentId([42; 32]),
1444 let event = receiver
1445 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1446 .expect("PaymentPathSuccessful not handled within deadline");
1448 Event::PaymentPathSuccessful { .. } => {},
1449 _ => panic!("Unexpected event"),
1452 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1453 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1454 payment_id: PaymentId([42; 32]),
1455 payment_hash: PaymentHash([42; 32]),
1458 let event = receiver
1459 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1460 .expect("ProbeSuccessful not handled within deadline");
1462 Event::ProbeSuccessful { .. } => {},
1463 _ => panic!("Unexpected event"),
1466 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1467 nodes[0].node.push_pending_event(Event::ProbeFailed {
1468 payment_id: PaymentId([42; 32]),
1469 payment_hash: PaymentHash([42; 32]),
1471 short_channel_id: Some(scored_scid),
1473 let event = receiver
1474 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1475 .expect("ProbeFailure not handled within deadline");
1477 Event::ProbeFailed { .. } => {},
1478 _ => panic!("Unexpected event"),
1481 assert!(bg_processor.stop().is_ok());