1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
10 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[cfg(not(feature = "std"))]
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::events::{Event, PathFailure};
30 #[cfg(feature = "std")]
31 use lightning::events::{EventHandler, EventsProvider};
32 use lightning::ln::channelmanager::ChannelManager;
33 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
34 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
35 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
36 use lightning::routing::utxo::UtxoLookup;
37 use lightning::routing::router::Router;
38 use lightning::routing::scoring::{Score, WriteableScore};
39 use lightning::util::logger::Logger;
40 use lightning::util::persist::Persister;
41 use lightning_rapid_gossip_sync::RapidGossipSync;
44 use core::time::Duration;
46 #[cfg(feature = "std")]
48 #[cfg(feature = "std")]
49 use core::sync::atomic::{AtomicBool, Ordering};
50 #[cfg(feature = "std")]
51 use std::thread::{self, JoinHandle};
52 #[cfg(feature = "std")]
53 use std::time::Instant;
55 #[cfg(not(feature = "std"))]
58 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
59 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
60 /// responsibilities are:
61 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
62 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
63 /// writing it to disk/backups by invoking the callback given to it at startup.
64 /// [`ChannelManager`] persistence should be done in the background.
65 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
66 /// at the appropriate intervals.
67 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
68 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
70 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
71 /// upon as doing so may result in high latency.
75 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
76 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
77 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
78 /// unilateral chain closure fees are at risk.
80 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
81 /// [`Event`]: lightning::events::Event
82 #[cfg(feature = "std")]
83 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
84 pub struct BackgroundProcessor {
85 stop_thread: Arc<AtomicBool>,
86 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
90 const FRESHNESS_TIMER: u64 = 60;
92 const FRESHNESS_TIMER: u64 = 1;
94 #[cfg(all(not(test), not(debug_assertions)))]
95 const PING_TIMER: u64 = 10;
96 /// Signature operations take a lot longer without compiler optimisations.
97 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
98 /// timeout is reached.
99 #[cfg(all(not(test), debug_assertions))]
100 const PING_TIMER: u64 = 30;
102 const PING_TIMER: u64 = 1;
104 /// Prune the network graph of stale entries hourly.
105 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
108 const SCORER_PERSIST_TIMER: u64 = 30;
110 const SCORER_PERSIST_TIMER: u64 = 1;
113 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
117 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
119 P: Deref<Target = P2PGossipSync<G, U, L>>,
120 R: Deref<Target = RapidGossipSync<G, L>>,
121 G: Deref<Target = NetworkGraph<L>>,
125 where U::Target: UtxoLookup, L::Target: Logger {
126 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
128 /// Rapid gossip sync from a trusted server.
135 P: Deref<Target = P2PGossipSync<G, U, L>>,
136 R: Deref<Target = RapidGossipSync<G, L>>,
137 G: Deref<Target = NetworkGraph<L>>,
140 > GossipSync<P, R, G, U, L>
141 where U::Target: UtxoLookup, L::Target: Logger {
142 fn network_graph(&self) -> Option<&G> {
144 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
145 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
146 GossipSync::None => None,
150 fn prunable_network_graph(&self) -> Option<&G> {
152 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
153 GossipSync::Rapid(gossip_sync) => {
154 if gossip_sync.is_initial_sync_complete() {
155 Some(gossip_sync.network_graph())
160 GossipSync::None => None,
165 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
166 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
167 GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
169 U::Target: UtxoLookup,
172 /// Initializes a new [`GossipSync::P2P`] variant.
173 pub fn p2p(gossip_sync: P) -> Self {
174 GossipSync::P2P(gossip_sync)
178 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
179 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
181 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
184 &'a (dyn UtxoLookup + Send + Sync),
190 /// Initializes a new [`GossipSync::Rapid`] variant.
191 pub fn rapid(gossip_sync: R) -> Self {
192 GossipSync::Rapid(gossip_sync)
196 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
199 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
200 &RapidGossipSync<&'a NetworkGraph<L>, L>,
202 &'a (dyn UtxoLookup + Send + Sync),
208 /// Initializes a new [`GossipSync::None`] variant.
209 pub fn none() -> Self {
214 fn handle_network_graph_update<L: Deref>(
215 network_graph: &NetworkGraph<L>, event: &Event
216 ) where L::Target: Logger {
217 if let Event::PaymentPathFailed {
218 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
220 network_graph.handle_network_update(upd);
224 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
225 scorer: &'a S, event: &Event
227 let mut score = scorer.lock();
229 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
230 let path = path.iter().collect::<Vec<_>>();
231 score.payment_path_failed(&path, *scid);
233 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
234 // Reached if the destination explicitly failed it back. We treat this as a successful probe
235 // because the payment made it all the way to the destination with sufficient liquidity.
236 let path = path.iter().collect::<Vec<_>>();
237 score.probe_successful(&path);
239 Event::PaymentPathSuccessful { path, .. } => {
240 let path = path.iter().collect::<Vec<_>>();
241 score.payment_path_successful(&path);
243 Event::ProbeSuccessful { path, .. } => {
244 let path = path.iter().collect::<Vec<_>>();
245 score.probe_successful(&path);
247 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
248 let path = path.iter().collect::<Vec<_>>();
249 score.probe_failed(&path, *scid);
255 macro_rules! define_run_body {
256 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
257 $channel_manager: ident, $process_channel_manager_events: expr,
258 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
259 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
261 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
262 $channel_manager.timer_tick_occurred();
264 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
265 let mut last_ping_call = $get_timer(PING_TIMER);
266 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
267 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
268 let mut have_pruned = false;
271 $process_channel_manager_events;
272 $process_chain_monitor_events;
274 // Note that the PeerManager::process_events may block on ChannelManager's locks,
275 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
276 // we want to ensure we get into `persist_manager` as quickly as we can, especially
277 // without running the normal event processing above and handing events to users.
279 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
280 // processing a message effectively at any point during this loop. In order to
281 // minimize the time between such processing completing and persisting the updated
282 // ChannelManager, we want to minimize methods blocking on a ChannelManager
283 // generally, and as a fallback place such blocking only immediately before
285 $peer_manager.process_events();
287 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
288 // see `await_start`'s use below.
289 let mut await_start = $get_timer(1);
290 let updates_available = $await;
291 let await_slow = $timer_elapsed(&mut await_start, 1);
293 if updates_available {
294 log_trace!($logger, "Persisting ChannelManager...");
295 $persister.persist_manager(&*$channel_manager)?;
296 log_trace!($logger, "Done persisting ChannelManager.");
298 // Exit the loop if the background processor was requested to stop.
299 if $loop_exit_check {
300 log_trace!($logger, "Terminating background processor.");
303 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
304 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
305 $channel_manager.timer_tick_occurred();
306 last_freshness_call = $get_timer(FRESHNESS_TIMER);
309 // On various platforms, we may be starved of CPU cycles for several reasons.
310 // E.g. on iOS, if we've been in the background, we will be entirely paused.
311 // Similarly, if we're on a desktop platform and the device has been asleep, we
312 // may not get any cycles.
313 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
314 // full second, at which point we assume sockets may have been killed (they
315 // appear to be at least on some platforms, even if it has only been a second).
316 // Note that we have to take care to not get here just because user event
317 // processing was slow at the top of the loop. For example, the sample client
318 // may call Bitcoin Core RPCs during event handling, which very often takes
319 // more than a handful of seconds to complete, and shouldn't disconnect all our
321 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
322 $peer_manager.disconnect_all_peers();
323 last_ping_call = $get_timer(PING_TIMER);
324 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
325 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
326 $peer_manager.timer_tick_occurred();
327 last_ping_call = $get_timer(PING_TIMER);
330 // Note that we want to run a graph prune once not long after startup before
331 // falling back to our usual hourly prunes. This avoids short-lived clients never
332 // pruning their network graph. We run once 60 seconds after startup before
333 // continuing our normal cadence.
334 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
335 // The network graph must not be pruned while rapid sync completion is pending
336 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
337 #[cfg(feature = "std")] {
338 log_trace!($logger, "Pruning and persisting network graph.");
339 network_graph.remove_stale_channels_and_tracking();
341 #[cfg(not(feature = "std"))] {
342 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
343 log_trace!($logger, "Persisting network graph.");
346 if let Err(e) = $persister.persist_graph(network_graph) {
347 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
352 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
355 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
356 if let Some(ref scorer) = $scorer {
357 log_trace!($logger, "Persisting scorer");
358 if let Err(e) = $persister.persist_scorer(&scorer) {
359 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
362 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
366 // After we exit, ensure we persist the ChannelManager one final time - this avoids
367 // some races where users quit while channel updates were in-flight, with
368 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
369 $persister.persist_manager(&*$channel_manager)?;
371 // Persist Scorer on exit
372 if let Some(ref scorer) = $scorer {
373 $persister.persist_scorer(&scorer)?;
376 // Persist NetworkGraph on exit
377 if let Some(network_graph) = $gossip_sync.network_graph() {
378 $persister.persist_graph(network_graph)?;
385 #[cfg(feature = "futures")]
386 pub(crate) mod futures_util {
387 use core::future::Future;
388 use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
390 use core::marker::Unpin;
391 pub(crate) struct Selector<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> {
395 pub(crate) enum SelectorOutput {
399 impl<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> Future for Selector<A, B> {
400 type Output = SelectorOutput;
401 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
402 match Pin::new(&mut self.a).poll(ctx) {
403 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
406 match Pin::new(&mut self.b).poll(ctx) {
407 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
414 // If we want to poll a future without an async context to figure out if it has completed or
415 // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
416 // but sadly there's a good bit of boilerplate here.
417 fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
418 fn dummy_waker_action(_: *const ()) { }
420 const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
421 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
422 pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
424 #[cfg(feature = "futures")]
425 use futures_util::{Selector, SelectorOutput, dummy_waker};
426 #[cfg(feature = "futures")]
429 /// Processes background events in a future.
431 /// `sleeper` should return a future which completes in the given amount of time and returns a
432 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
433 /// future which outputs true, the loop will exit and this function's future will complete.
435 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
437 /// Requires the `futures` feature. Note that while this method is available without the `std`
438 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
439 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
440 /// manually instead.
441 #[cfg(feature = "futures")]
442 pub async fn process_events_async<
444 UL: 'static + Deref + Send + Sync,
445 CF: 'static + Deref + Send + Sync,
446 CW: 'static + Deref + Send + Sync,
447 T: 'static + Deref + Send + Sync,
448 ES: 'static + Deref + Send + Sync,
449 NS: 'static + Deref + Send + Sync,
450 SP: 'static + Deref + Send + Sync,
451 F: 'static + Deref + Send + Sync,
452 R: 'static + Deref + Send + Sync,
453 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
454 L: 'static + Deref + Send + Sync,
455 P: 'static + Deref + Send + Sync,
456 Descriptor: 'static + SocketDescriptor + Send + Sync,
457 CMH: 'static + Deref + Send + Sync,
458 RMH: 'static + Deref + Send + Sync,
459 OMH: 'static + Deref + Send + Sync,
460 EventHandlerFuture: core::future::Future<Output = ()>,
461 EventHandler: Fn(Event) -> EventHandlerFuture,
462 PS: 'static + Deref + Send,
463 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
464 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
465 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
466 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
467 UMH: 'static + Deref + Send + Sync,
468 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
469 S: 'static + Deref<Target = SC> + Send + Sync,
470 SC: for<'b> WriteableScore<'b>,
471 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
472 Sleeper: Fn(Duration) -> SleepFuture
474 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
475 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
477 ) -> Result<(), lightning::io::Error>
479 UL::Target: 'static + UtxoLookup,
480 CF::Target: 'static + chain::Filter,
481 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
482 T::Target: 'static + BroadcasterInterface,
483 ES::Target: 'static + EntropySource,
484 NS::Target: 'static + NodeSigner,
485 SP::Target: 'static + SignerProvider,
486 F::Target: 'static + FeeEstimator,
487 R::Target: 'static + Router,
488 L::Target: 'static + Logger,
489 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
490 CMH::Target: 'static + ChannelMessageHandler,
491 OMH::Target: 'static + OnionMessageHandler,
492 RMH::Target: 'static + RoutingMessageHandler,
493 UMH::Target: 'static + CustomMessageHandler,
494 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
496 let mut should_break = true;
497 let async_event_handler = |event| {
498 let network_graph = gossip_sync.network_graph();
499 let event_handler = &event_handler;
500 let scorer = &scorer;
502 if let Some(network_graph) = network_graph {
503 handle_network_graph_update(network_graph, &event)
505 if let Some(ref scorer) = scorer {
506 update_scorer(scorer, &event);
508 event_handler(event).await;
511 define_run_body!(persister,
512 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
513 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
514 gossip_sync, peer_manager, logger, scorer, should_break, {
516 a: channel_manager.get_persistable_update_future(),
517 b: sleeper(Duration::from_millis(100)),
520 SelectorOutput::A => true,
521 SelectorOutput::B(exit) => {
526 }, |t| sleeper(Duration::from_secs(t)),
527 |fut: &mut SleepFuture, _| {
528 let mut waker = dummy_waker();
529 let mut ctx = task::Context::from_waker(&mut waker);
530 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
534 #[cfg(feature = "std")]
535 impl BackgroundProcessor {
536 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
539 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
540 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
541 /// either [`join`] or [`stop`].
543 /// # Data Persistence
545 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
546 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
547 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
548 /// provided implementation.
550 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
551 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
552 /// See the `lightning-persister` crate for LDK's provided implementation.
554 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
555 /// error or call [`join`] and handle any error that may arise. For the latter case,
556 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
560 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
561 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
562 /// functionality implemented by other handlers.
563 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
565 /// # Rapid Gossip Sync
567 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
568 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
569 /// until the [`RapidGossipSync`] instance completes its first sync.
571 /// [top-level documentation]: BackgroundProcessor
572 /// [`join`]: Self::join
573 /// [`stop`]: Self::stop
574 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
575 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
576 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
577 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
578 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
579 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
582 UL: 'static + Deref + Send + Sync,
583 CF: 'static + Deref + Send + Sync,
584 CW: 'static + Deref + Send + Sync,
585 T: 'static + Deref + Send + Sync,
586 ES: 'static + Deref + Send + Sync,
587 NS: 'static + Deref + Send + Sync,
588 SP: 'static + Deref + Send + Sync,
589 F: 'static + Deref + Send + Sync,
590 R: 'static + Deref + Send + Sync,
591 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
592 L: 'static + Deref + Send + Sync,
593 P: 'static + Deref + Send + Sync,
594 Descriptor: 'static + SocketDescriptor + Send + Sync,
595 CMH: 'static + Deref + Send + Sync,
596 OMH: 'static + Deref + Send + Sync,
597 RMH: 'static + Deref + Send + Sync,
598 EH: 'static + EventHandler + Send,
599 PS: 'static + Deref + Send,
600 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
601 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
602 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
603 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
604 UMH: 'static + Deref + Send + Sync,
605 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
606 S: 'static + Deref<Target = SC> + Send + Sync,
607 SC: for <'b> WriteableScore<'b>,
609 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
610 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
613 UL::Target: 'static + UtxoLookup,
614 CF::Target: 'static + chain::Filter,
615 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
616 T::Target: 'static + BroadcasterInterface,
617 ES::Target: 'static + EntropySource,
618 NS::Target: 'static + NodeSigner,
619 SP::Target: 'static + SignerProvider,
620 F::Target: 'static + FeeEstimator,
621 R::Target: 'static + Router,
622 L::Target: 'static + Logger,
623 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
624 CMH::Target: 'static + ChannelMessageHandler,
625 OMH::Target: 'static + OnionMessageHandler,
626 RMH::Target: 'static + RoutingMessageHandler,
627 UMH::Target: 'static + CustomMessageHandler,
628 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
630 let stop_thread = Arc::new(AtomicBool::new(false));
631 let stop_thread_clone = stop_thread.clone();
632 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
633 let event_handler = |event| {
634 let network_graph = gossip_sync.network_graph();
635 if let Some(network_graph) = network_graph {
636 handle_network_graph_update(network_graph, &event)
638 if let Some(ref scorer) = scorer {
639 update_scorer(scorer, &event);
641 event_handler.handle_event(event);
643 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
644 channel_manager, channel_manager.process_pending_events(&event_handler),
645 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
646 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
647 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
649 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
652 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
653 /// [`ChannelManager`].
657 /// This function panics if the background thread has panicked such as while persisting or
660 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
661 pub fn join(mut self) -> Result<(), std::io::Error> {
662 assert!(self.thread_handle.is_some());
666 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
667 /// [`ChannelManager`].
671 /// This function panics if the background thread has panicked such as while persisting or
674 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
675 pub fn stop(mut self) -> Result<(), std::io::Error> {
676 assert!(self.thread_handle.is_some());
677 self.stop_and_join_thread()
680 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
681 self.stop_thread.store(true, Ordering::Release);
685 fn join_thread(&mut self) -> Result<(), std::io::Error> {
686 match self.thread_handle.take() {
687 Some(handle) => handle.join().unwrap(),
693 #[cfg(feature = "std")]
694 impl Drop for BackgroundProcessor {
696 self.stop_and_join_thread().unwrap();
700 #[cfg(all(feature = "std", test))]
702 use bitcoin::blockdata::block::BlockHeader;
703 use bitcoin::blockdata::constants::genesis_block;
704 use bitcoin::blockdata::locktime::PackedLockTime;
705 use bitcoin::blockdata::transaction::{Transaction, TxOut};
706 use bitcoin::network::constants::Network;
707 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
708 use lightning::chain::{BestBlock, Confirm, chainmonitor};
709 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
710 use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
711 use lightning::chain::transaction::OutPoint;
712 use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
713 use lightning::get_event_msg;
714 use lightning::ln::PaymentHash;
715 use lightning::ln::channelmanager;
716 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
717 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
718 use lightning::ln::msgs::{ChannelMessageHandler, Init};
719 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
720 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
721 use lightning::routing::router::{DefaultRouter, RouteHop};
722 use lightning::routing::scoring::{ChannelUsage, Score};
723 use lightning::util::config::UserConfig;
724 use lightning::util::ser::Writeable;
725 use lightning::util::test_utils;
726 use lightning::util::persist::KVStorePersister;
727 use lightning_persister::FilesystemPersister;
728 use std::collections::VecDeque;
730 use std::path::PathBuf;
731 use std::sync::{Arc, Mutex};
732 use std::sync::mpsc::SyncSender;
733 use std::time::Duration;
734 use bitcoin::hashes::Hash;
735 use bitcoin::TxMerkleNode;
736 use lightning_rapid_gossip_sync::RapidGossipSync;
737 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
739 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
741 #[derive(Clone, Hash, PartialEq, Eq)]
742 struct TestDescriptor{}
743 impl SocketDescriptor for TestDescriptor {
744 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
748 fn disconnect_socket(&mut self) {}
751 type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
753 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
755 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
756 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
759 node: Arc<ChannelManager>,
760 p2p_gossip_sync: PGS,
761 rapid_gossip_sync: RGS,
762 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
763 chain_monitor: Arc<ChainMonitor>,
764 persister: Arc<FilesystemPersister>,
765 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
766 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
767 logger: Arc<test_utils::TestLogger>,
768 best_block: BestBlock,
769 scorer: Arc<Mutex<TestScorer>>,
773 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
774 GossipSync::P2P(self.p2p_gossip_sync.clone())
777 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
778 GossipSync::Rapid(self.rapid_gossip_sync.clone())
781 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
788 let data_dir = self.persister.get_data_dir();
789 match fs::remove_dir_all(data_dir.clone()) {
790 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
797 graph_error: Option<(std::io::ErrorKind, &'static str)>,
798 graph_persistence_notifier: Option<SyncSender<()>>,
799 manager_error: Option<(std::io::ErrorKind, &'static str)>,
800 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
801 filesystem_persister: FilesystemPersister,
805 fn new(data_dir: String) -> Self {
806 let filesystem_persister = FilesystemPersister::new(data_dir);
807 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
810 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
811 Self { graph_error: Some((error, message)), ..self }
814 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
815 Self { graph_persistence_notifier: Some(sender), ..self }
818 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
819 Self { manager_error: Some((error, message)), ..self }
822 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
823 Self { scorer_error: Some((error, message)), ..self }
827 impl KVStorePersister for Persister {
828 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
829 if key == "manager" {
830 if let Some((error, message)) = self.manager_error {
831 return Err(std::io::Error::new(error, message))
835 if key == "network_graph" {
836 if let Some(sender) = &self.graph_persistence_notifier {
837 sender.send(()).unwrap();
840 if let Some((error, message)) = self.graph_error {
841 return Err(std::io::Error::new(error, message))
846 if let Some((error, message)) = self.scorer_error {
847 return Err(std::io::Error::new(error, message))
851 self.filesystem_persister.persist(key, object)
856 event_expectations: Option<VecDeque<TestResult>>,
861 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
862 PaymentSuccess { path: Vec<RouteHop> },
863 ProbeFailure { path: Vec<RouteHop> },
864 ProbeSuccess { path: Vec<RouteHop> },
869 Self { event_expectations: None }
872 fn expect(&mut self, expectation: TestResult) {
873 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
877 impl lightning::util::ser::Writeable for TestScorer {
878 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
881 impl Score for TestScorer {
882 fn channel_penalty_msat(
883 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
884 ) -> u64 { unimplemented!(); }
886 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
887 if let Some(expectations) = &mut self.event_expectations {
888 match expectations.pop_front().unwrap() {
889 TestResult::PaymentFailure { path, short_channel_id } => {
890 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
891 assert_eq!(actual_short_channel_id, short_channel_id);
893 TestResult::PaymentSuccess { path } => {
894 panic!("Unexpected successful payment path: {:?}", path)
896 TestResult::ProbeFailure { path } => {
897 panic!("Unexpected probe failure: {:?}", path)
899 TestResult::ProbeSuccess { path } => {
900 panic!("Unexpected probe success: {:?}", path)
906 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
907 if let Some(expectations) = &mut self.event_expectations {
908 match expectations.pop_front().unwrap() {
909 TestResult::PaymentFailure { path, .. } => {
910 panic!("Unexpected payment path failure: {:?}", path)
912 TestResult::PaymentSuccess { path } => {
913 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
915 TestResult::ProbeFailure { path } => {
916 panic!("Unexpected probe failure: {:?}", path)
918 TestResult::ProbeSuccess { path } => {
919 panic!("Unexpected probe success: {:?}", path)
925 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
926 if let Some(expectations) = &mut self.event_expectations {
927 match expectations.pop_front().unwrap() {
928 TestResult::PaymentFailure { path, .. } => {
929 panic!("Unexpected payment path failure: {:?}", path)
931 TestResult::PaymentSuccess { path } => {
932 panic!("Unexpected payment path success: {:?}", path)
934 TestResult::ProbeFailure { path } => {
935 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
937 TestResult::ProbeSuccess { path } => {
938 panic!("Unexpected probe success: {:?}", path)
943 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
944 if let Some(expectations) = &mut self.event_expectations {
945 match expectations.pop_front().unwrap() {
946 TestResult::PaymentFailure { path, .. } => {
947 panic!("Unexpected payment path failure: {:?}", path)
949 TestResult::PaymentSuccess { path } => {
950 panic!("Unexpected payment path success: {:?}", path)
952 TestResult::ProbeFailure { path } => {
953 panic!("Unexpected probe failure: {:?}", path)
955 TestResult::ProbeSuccess { path } => {
956 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
963 impl Drop for TestScorer {
965 if std::thread::panicking() {
969 if let Some(event_expectations) = &self.event_expectations {
970 if !event_expectations.is_empty() {
971 panic!("Unsatisfied event expectations: {:?}", event_expectations);
977 fn get_full_filepath(filepath: String, filename: String) -> String {
978 let mut path = PathBuf::from(filepath);
980 path.to_str().unwrap().to_string()
983 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
984 let mut nodes = Vec::new();
985 for i in 0..num_nodes {
986 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
987 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
988 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
989 let network = Network::Testnet;
990 let genesis_block = genesis_block(network);
991 let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
992 let scorer = Arc::new(Mutex::new(TestScorer::new()));
993 let seed = [i as u8; 32];
994 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
995 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
996 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
997 let now = Duration::from_secs(genesis_block.header.time as u64);
998 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
999 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
1000 let best_block = BestBlock::from_network(network);
1001 let params = ChainParameters { network, best_block };
1002 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
1003 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1004 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1005 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
1006 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
1007 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
1011 for i in 0..num_nodes {
1012 for j in (i+1)..num_nodes {
1013 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap();
1014 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap();
1021 macro_rules! open_channel {
1022 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1023 begin_open_channel!($node_a, $node_b, $channel_value);
1024 let events = $node_a.node.get_and_clear_pending_events();
1025 assert_eq!(events.len(), 1);
1026 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1027 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
1032 macro_rules! begin_open_channel {
1033 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1034 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
1035 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1036 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1040 macro_rules! handle_funding_generation_ready {
1041 ($event: expr, $channel_value: expr) => {{
1043 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1044 assert_eq!(channel_value_satoshis, $channel_value);
1045 assert_eq!(user_channel_id, 42);
1047 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1048 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1050 (temporary_channel_id, tx)
1052 _ => panic!("Unexpected event"),
1057 macro_rules! end_open_channel {
1058 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1059 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1060 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1061 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1065 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1066 for i in 1..=depth {
1067 let prev_blockhash = node.best_block.block_hash();
1068 let height = node.best_block.height() + 1;
1069 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1070 let txdata = vec![(0, tx)];
1071 node.best_block = BestBlock::new(header.block_hash(), height);
1074 node.node.transactions_confirmed(&header, &txdata, height);
1075 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1077 x if x == depth => {
1078 node.node.best_block_updated(&header, height);
1079 node.chain_monitor.best_block_updated(&header, height);
1085 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1086 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1090 fn test_background_processor() {
1091 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1092 // updates. Also test that when new updates are available, the manager signals that it needs
1093 // re-persistence and is successfully re-persisted.
1094 let nodes = create_nodes(2, "test_background_processor".to_string());
1096 // Go through the channel creation process so that each node has something to persist. Since
1097 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1098 // avoid a race with processing events.
1099 let tx = open_channel!(nodes[0], nodes[1], 100000);
1101 // Initiate the background processors to watch each node.
1102 let data_dir = nodes[0].persister.get_data_dir();
1103 let persister = Arc::new(Persister::new(data_dir));
1104 let event_handler = |_: _| {};
1105 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1107 macro_rules! check_persisted_data {
1108 ($node: expr, $filepath: expr) => {
1109 let mut expected_bytes = Vec::new();
1111 expected_bytes.clear();
1112 match $node.write(&mut expected_bytes) {
1114 match std::fs::read($filepath) {
1116 if bytes == expected_bytes {
1125 Err(e) => panic!("Unexpected error: {}", e)
1131 // Check that the initial channel manager data is persisted as expected.
1132 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1133 check_persisted_data!(nodes[0].node, filepath.clone());
1136 if !nodes[0].node.get_persistence_condvar_value() { break }
1139 // Force-close the channel.
1140 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1142 // Check that the force-close updates are persisted.
1143 check_persisted_data!(nodes[0].node, filepath.clone());
1145 if !nodes[0].node.get_persistence_condvar_value() { break }
1148 // Check network graph is persisted
1149 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1150 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1152 // Check scorer is persisted
1153 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1154 check_persisted_data!(nodes[0].scorer, filepath.clone());
1156 assert!(bg_processor.stop().is_ok());
1160 fn test_timer_tick_called() {
1161 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1162 // `FRESHNESS_TIMER`.
1163 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1164 let data_dir = nodes[0].persister.get_data_dir();
1165 let persister = Arc::new(Persister::new(data_dir));
1166 let event_handler = |_: _| {};
1167 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1169 let log_entries = nodes[0].logger.lines.lock().unwrap();
1170 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1171 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1172 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1173 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1178 assert!(bg_processor.stop().is_ok());
1182 fn test_channel_manager_persist_error() {
1183 // Test that if we encounter an error during manager persistence, the thread panics.
1184 let nodes = create_nodes(2, "test_persist_error".to_string());
1185 open_channel!(nodes[0], nodes[1], 100000);
1187 let data_dir = nodes[0].persister.get_data_dir();
1188 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1189 let event_handler = |_: _| {};
1190 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1191 match bg_processor.join() {
1192 Ok(_) => panic!("Expected error persisting manager"),
1194 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1195 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1201 fn test_network_graph_persist_error() {
1202 // Test that if we encounter an error during network graph persistence, an error gets returned.
1203 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1204 let data_dir = nodes[0].persister.get_data_dir();
1205 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1206 let event_handler = |_: _| {};
1207 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1209 match bg_processor.stop() {
1210 Ok(_) => panic!("Expected error persisting network graph"),
1212 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1213 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1219 fn test_scorer_persist_error() {
1220 // Test that if we encounter an error during scorer persistence, an error gets returned.
1221 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1222 let data_dir = nodes[0].persister.get_data_dir();
1223 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1224 let event_handler = |_: _| {};
1225 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1227 match bg_processor.stop() {
1228 Ok(_) => panic!("Expected error persisting scorer"),
1230 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1231 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1237 fn test_background_event_handling() {
1238 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1239 let channel_value = 100000;
1240 let data_dir = nodes[0].persister.get_data_dir();
1241 let persister = Arc::new(Persister::new(data_dir.clone()));
1243 // Set up a background event handler for FundingGenerationReady events.
1244 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1245 let event_handler = move |event: Event| match event {
1246 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1247 Event::ChannelReady { .. } => {},
1248 _ => panic!("Unexpected event: {:?}", event),
1251 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1253 // Open a channel and check that the FundingGenerationReady event was handled.
1254 begin_open_channel!(nodes[0], nodes[1], channel_value);
1255 let (temporary_channel_id, funding_tx) = receiver
1256 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1257 .expect("FundingGenerationReady not handled within deadline");
1258 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1260 // Confirm the funding transaction.
1261 confirm_transaction(&mut nodes[0], &funding_tx);
1262 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1263 confirm_transaction(&mut nodes[1], &funding_tx);
1264 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1265 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1266 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1267 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1268 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1270 assert!(bg_processor.stop().is_ok());
1272 // Set up a background event handler for SpendableOutputs events.
1273 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1274 let event_handler = move |event: Event| match event {
1275 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1276 Event::ChannelReady { .. } => {},
1277 Event::ChannelClosed { .. } => {},
1278 _ => panic!("Unexpected event: {:?}", event),
1280 let persister = Arc::new(Persister::new(data_dir));
1281 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1283 // Force close the channel and check that the SpendableOutputs event was handled.
1284 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1285 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1286 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1288 let event = receiver
1289 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1290 .expect("Events not handled within deadline");
1292 Event::SpendableOutputs { .. } => {},
1293 _ => panic!("Unexpected event: {:?}", event),
1296 assert!(bg_processor.stop().is_ok());
1300 fn test_scorer_persistence() {
1301 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1302 let data_dir = nodes[0].persister.get_data_dir();
1303 let persister = Arc::new(Persister::new(data_dir));
1304 let event_handler = |_: _| {};
1305 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1308 let log_entries = nodes[0].logger.lines.lock().unwrap();
1309 let expected_log = "Persisting scorer".to_string();
1310 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1315 assert!(bg_processor.stop().is_ok());
1319 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1320 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1321 let data_dir = nodes[0].persister.get_data_dir();
1322 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1323 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1324 let network_graph = nodes[0].network_graph.clone();
1325 let features = ChannelFeatures::empty();
1326 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1327 .expect("Failed to update channel from partial announcement");
1328 let original_graph_description = network_graph.to_string();
1329 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1330 assert_eq!(network_graph.read_only().channels().len(), 1);
1332 let event_handler = |_: _| {};
1333 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1336 let log_entries = nodes[0].logger.lines.lock().unwrap();
1337 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1338 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1341 // Wait until the loop has gone around at least twice.
1346 let initialization_input = vec![
1347 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1348 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1349 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1350 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1351 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1352 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1353 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1354 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1355 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1356 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1357 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1358 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1359 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1361 nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1363 // this should have added two channels
1364 assert_eq!(network_graph.read_only().channels().len(), 3);
1367 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1368 .expect("Network graph not pruned within deadline");
1370 background_processor.stop().unwrap();
1372 // all channels should now be pruned
1373 assert_eq!(network_graph.read_only().channels().len(), 0);
1377 fn test_payment_path_scoring() {
1378 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1379 // that we update the scorer upon a payment path succeeding (note that the channel must be
1380 // public or else we won't score it).
1381 // Set up a background event handler for FundingGenerationReady events.
1382 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1383 let event_handler = move |event: Event| match event {
1384 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1385 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1386 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1387 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1388 _ => panic!("Unexpected event: {:?}", event),
1391 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1392 let data_dir = nodes[0].persister.get_data_dir();
1393 let persister = Arc::new(Persister::new(data_dir));
1394 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1396 let scored_scid = 4242;
1397 let secp_ctx = Secp256k1::new();
1398 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1399 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1401 let path = vec![RouteHop {
1403 node_features: NodeFeatures::empty(),
1404 short_channel_id: scored_scid,
1405 channel_features: ChannelFeatures::empty(),
1407 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1410 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1411 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1413 payment_hash: PaymentHash([42; 32]),
1414 payment_failed_permanently: false,
1415 failure: PathFailure::OnPath { network_update: None },
1417 short_channel_id: Some(scored_scid),
1419 let event = receiver
1420 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1421 .expect("PaymentPathFailed not handled within deadline");
1423 Event::PaymentPathFailed { .. } => {},
1424 _ => panic!("Unexpected event"),
1427 // Ensure we'll score payments that were explicitly failed back by the destination as
1429 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1430 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1432 payment_hash: PaymentHash([42; 32]),
1433 payment_failed_permanently: true,
1434 failure: PathFailure::OnPath { network_update: None },
1436 short_channel_id: None,
1438 let event = receiver
1439 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1440 .expect("PaymentPathFailed not handled within deadline");
1442 Event::PaymentPathFailed { .. } => {},
1443 _ => panic!("Unexpected event"),
1446 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1447 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1448 payment_id: PaymentId([42; 32]),
1452 let event = receiver
1453 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1454 .expect("PaymentPathSuccessful not handled within deadline");
1456 Event::PaymentPathSuccessful { .. } => {},
1457 _ => panic!("Unexpected event"),
1460 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1461 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1462 payment_id: PaymentId([42; 32]),
1463 payment_hash: PaymentHash([42; 32]),
1466 let event = receiver
1467 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1468 .expect("ProbeSuccessful not handled within deadline");
1470 Event::ProbeSuccessful { .. } => {},
1471 _ => panic!("Unexpected event"),
1474 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1475 nodes[0].node.push_pending_event(Event::ProbeFailed {
1476 payment_id: PaymentId([42; 32]),
1477 payment_hash: PaymentHash([42; 32]),
1479 short_channel_id: Some(scored_scid),
1481 let event = receiver
1482 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1483 .expect("ProbeFailure not handled within deadline");
1485 Event::ProbeFailed { .. } => {},
1486 _ => panic!("Unexpected event"),
1489 assert!(bg_processor.stop().is_ok());