1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
10 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[cfg(not(feature = "std"))]
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::events::{Event, PathFailure};
30 #[cfg(feature = "std")]
31 use lightning::events::{EventHandler, EventsProvider};
32 use lightning::ln::channelmanager::ChannelManager;
33 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
34 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
35 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
36 use lightning::routing::utxo::UtxoLookup;
37 use lightning::routing::router::Router;
38 use lightning::routing::scoring::{Score, WriteableScore};
39 use lightning::util::logger::Logger;
40 use lightning::util::persist::Persister;
41 use lightning_rapid_gossip_sync::RapidGossipSync;
44 use core::time::Duration;
46 #[cfg(feature = "std")]
48 #[cfg(feature = "std")]
49 use core::sync::atomic::{AtomicBool, Ordering};
50 #[cfg(feature = "std")]
51 use std::thread::{self, JoinHandle};
52 #[cfg(feature = "std")]
53 use std::time::Instant;
55 #[cfg(not(feature = "std"))]
58 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
59 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
60 /// responsibilities are:
61 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
62 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
63 /// writing it to disk/backups by invoking the callback given to it at startup.
64 /// [`ChannelManager`] persistence should be done in the background.
65 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
66 /// at the appropriate intervals.
67 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
68 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
70 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
71 /// upon as doing so may result in high latency.
75 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
76 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
77 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
78 /// unilateral chain closure fees are at risk.
80 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
81 /// [`Event`]: lightning::events::Event
82 #[cfg(feature = "std")]
83 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
84 pub struct BackgroundProcessor {
85 stop_thread: Arc<AtomicBool>,
86 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
90 const FRESHNESS_TIMER: u64 = 60;
92 const FRESHNESS_TIMER: u64 = 1;
94 #[cfg(all(not(test), not(debug_assertions)))]
95 const PING_TIMER: u64 = 10;
96 /// Signature operations take a lot longer without compiler optimisations.
97 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
98 /// timeout is reached.
99 #[cfg(all(not(test), debug_assertions))]
100 const PING_TIMER: u64 = 30;
102 const PING_TIMER: u64 = 1;
104 /// Prune the network graph of stale entries hourly.
105 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
108 const SCORER_PERSIST_TIMER: u64 = 30;
110 const SCORER_PERSIST_TIMER: u64 = 1;
113 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
117 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
119 P: Deref<Target = P2PGossipSync<G, U, L>>,
120 R: Deref<Target = RapidGossipSync<G, L>>,
121 G: Deref<Target = NetworkGraph<L>>,
125 where U::Target: UtxoLookup, L::Target: Logger {
126 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
128 /// Rapid gossip sync from a trusted server.
135 P: Deref<Target = P2PGossipSync<G, U, L>>,
136 R: Deref<Target = RapidGossipSync<G, L>>,
137 G: Deref<Target = NetworkGraph<L>>,
140 > GossipSync<P, R, G, U, L>
141 where U::Target: UtxoLookup, L::Target: Logger {
142 fn network_graph(&self) -> Option<&G> {
144 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
145 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
146 GossipSync::None => None,
150 fn prunable_network_graph(&self) -> Option<&G> {
152 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
153 GossipSync::Rapid(gossip_sync) => {
154 if gossip_sync.is_initial_sync_complete() {
155 Some(gossip_sync.network_graph())
160 GossipSync::None => None,
165 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
166 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
167 GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
169 U::Target: UtxoLookup,
172 /// Initializes a new [`GossipSync::P2P`] variant.
173 pub fn p2p(gossip_sync: P) -> Self {
174 GossipSync::P2P(gossip_sync)
178 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
179 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
181 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
184 &'a (dyn UtxoLookup + Send + Sync),
190 /// Initializes a new [`GossipSync::Rapid`] variant.
191 pub fn rapid(gossip_sync: R) -> Self {
192 GossipSync::Rapid(gossip_sync)
196 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
199 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
200 &RapidGossipSync<&'a NetworkGraph<L>, L>,
202 &'a (dyn UtxoLookup + Send + Sync),
208 /// Initializes a new [`GossipSync::None`] variant.
209 pub fn none() -> Self {
214 fn handle_network_graph_update<L: Deref>(
215 network_graph: &NetworkGraph<L>, event: &Event
216 ) where L::Target: Logger {
217 if let Event::PaymentPathFailed {
218 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
220 network_graph.handle_network_update(upd);
224 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
225 scorer: &'a S, event: &Event
227 let mut score = scorer.lock();
229 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
230 let path = path.iter().collect::<Vec<_>>();
231 score.payment_path_failed(&path, *scid);
233 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
234 // Reached if the destination explicitly failed it back. We treat this as a successful probe
235 // because the payment made it all the way to the destination with sufficient liquidity.
236 let path = path.iter().collect::<Vec<_>>();
237 score.probe_successful(&path);
239 Event::PaymentPathSuccessful { path, .. } => {
240 let path = path.iter().collect::<Vec<_>>();
241 score.payment_path_successful(&path);
243 Event::ProbeSuccessful { path, .. } => {
244 let path = path.iter().collect::<Vec<_>>();
245 score.probe_successful(&path);
247 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
248 let path = path.iter().collect::<Vec<_>>();
249 score.probe_failed(&path, *scid);
255 macro_rules! define_run_body {
256 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
257 $channel_manager: ident, $process_channel_manager_events: expr,
258 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
259 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
261 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
262 $channel_manager.timer_tick_occurred();
264 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
265 let mut last_ping_call = $get_timer(PING_TIMER);
266 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
267 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
268 let mut have_pruned = false;
271 $process_channel_manager_events;
272 $process_chain_monitor_events;
274 // Note that the PeerManager::process_events may block on ChannelManager's locks,
275 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
276 // we want to ensure we get into `persist_manager` as quickly as we can, especially
277 // without running the normal event processing above and handing events to users.
279 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
280 // processing a message effectively at any point during this loop. In order to
281 // minimize the time between such processing completing and persisting the updated
282 // ChannelManager, we want to minimize methods blocking on a ChannelManager
283 // generally, and as a fallback place such blocking only immediately before
285 $peer_manager.process_events();
287 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
288 // see `await_start`'s use below.
289 let mut await_start = $get_timer(1);
290 let updates_available = $await;
291 let await_slow = $timer_elapsed(&mut await_start, 1);
293 if updates_available {
294 log_trace!($logger, "Persisting ChannelManager...");
295 $persister.persist_manager(&*$channel_manager)?;
296 log_trace!($logger, "Done persisting ChannelManager.");
298 // Exit the loop if the background processor was requested to stop.
299 if $loop_exit_check {
300 log_trace!($logger, "Terminating background processor.");
303 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
304 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
305 $channel_manager.timer_tick_occurred();
306 last_freshness_call = $get_timer(FRESHNESS_TIMER);
309 // On various platforms, we may be starved of CPU cycles for several reasons.
310 // E.g. on iOS, if we've been in the background, we will be entirely paused.
311 // Similarly, if we're on a desktop platform and the device has been asleep, we
312 // may not get any cycles.
313 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
314 // full second, at which point we assume sockets may have been killed (they
315 // appear to be at least on some platforms, even if it has only been a second).
316 // Note that we have to take care to not get here just because user event
317 // processing was slow at the top of the loop. For example, the sample client
318 // may call Bitcoin Core RPCs during event handling, which very often takes
319 // more than a handful of seconds to complete, and shouldn't disconnect all our
321 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
322 $peer_manager.disconnect_all_peers();
323 last_ping_call = $get_timer(PING_TIMER);
324 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
325 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
326 $peer_manager.timer_tick_occurred();
327 last_ping_call = $get_timer(PING_TIMER);
330 // Note that we want to run a graph prune once not long after startup before
331 // falling back to our usual hourly prunes. This avoids short-lived clients never
332 // pruning their network graph. We run once 60 seconds after startup before
333 // continuing our normal cadence.
334 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
335 // The network graph must not be pruned while rapid sync completion is pending
336 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
337 #[cfg(feature = "std")] {
338 log_trace!($logger, "Pruning and persisting network graph.");
339 network_graph.remove_stale_channels_and_tracking();
341 #[cfg(not(feature = "std"))] {
342 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
343 log_trace!($logger, "Persisting network graph.");
346 if let Err(e) = $persister.persist_graph(network_graph) {
347 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
352 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
355 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
356 if let Some(ref scorer) = $scorer {
357 log_trace!($logger, "Persisting scorer");
358 if let Err(e) = $persister.persist_scorer(&scorer) {
359 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
362 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
366 // After we exit, ensure we persist the ChannelManager one final time - this avoids
367 // some races where users quit while channel updates were in-flight, with
368 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
369 $persister.persist_manager(&*$channel_manager)?;
371 // Persist Scorer on exit
372 if let Some(ref scorer) = $scorer {
373 $persister.persist_scorer(&scorer)?;
376 // Persist NetworkGraph on exit
377 if let Some(network_graph) = $gossip_sync.network_graph() {
378 $persister.persist_graph(network_graph)?;
385 #[cfg(feature = "futures")]
386 pub(crate) mod futures_util {
387 use core::future::Future;
388 use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
390 use core::marker::Unpin;
391 pub(crate) struct Selector<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> {
395 pub(crate) enum SelectorOutput {
399 impl<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> Future for Selector<A, B> {
400 type Output = SelectorOutput;
401 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
402 match Pin::new(&mut self.a).poll(ctx) {
403 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
406 match Pin::new(&mut self.b).poll(ctx) {
407 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
414 // If we want to poll a future without an async context to figure out if it has completed or
415 // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
416 // but sadly there's a good bit of boilerplate here.
417 fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
418 fn dummy_waker_action(_: *const ()) { }
420 const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
421 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
422 pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
424 #[cfg(feature = "futures")]
425 use futures_util::{Selector, SelectorOutput, dummy_waker};
426 #[cfg(feature = "futures")]
429 /// Processes background events in a future.
431 /// `sleeper` should return a future which completes in the given amount of time and returns a
432 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
433 /// future which outputs true, the loop will exit and this function's future will complete.
435 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
437 /// Requires the `futures` feature. Note that while this method is available without the `std`
438 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
439 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
440 /// manually instead.
441 #[cfg(feature = "futures")]
442 pub async fn process_events_async<
444 UL: 'static + Deref + Send + Sync,
445 CF: 'static + Deref + Send + Sync,
446 CW: 'static + Deref + Send + Sync,
447 T: 'static + Deref + Send + Sync,
448 ES: 'static + Deref + Send + Sync,
449 NS: 'static + Deref + Send + Sync,
450 SP: 'static + Deref + Send + Sync,
451 F: 'static + Deref + Send + Sync,
452 R: 'static + Deref + Send + Sync,
453 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
454 L: 'static + Deref + Send + Sync,
455 P: 'static + Deref + Send + Sync,
456 Descriptor: 'static + SocketDescriptor + Send + Sync,
457 CMH: 'static + Deref + Send + Sync,
458 RMH: 'static + Deref + Send + Sync,
459 OMH: 'static + Deref + Send + Sync,
460 EventHandlerFuture: core::future::Future<Output = ()>,
461 EventHandler: Fn(Event) -> EventHandlerFuture,
462 PS: 'static + Deref + Send,
463 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
464 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
465 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
466 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
467 UMH: 'static + Deref + Send + Sync,
468 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
469 S: 'static + Deref<Target = SC> + Send + Sync,
470 SC: for<'b> WriteableScore<'b>,
471 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
472 Sleeper: Fn(Duration) -> SleepFuture
474 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
475 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
477 ) -> Result<(), lightning::io::Error>
479 UL::Target: 'static + UtxoLookup,
480 CF::Target: 'static + chain::Filter,
481 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
482 T::Target: 'static + BroadcasterInterface,
483 ES::Target: 'static + EntropySource,
484 NS::Target: 'static + NodeSigner,
485 SP::Target: 'static + SignerProvider,
486 F::Target: 'static + FeeEstimator,
487 R::Target: 'static + Router,
488 L::Target: 'static + Logger,
489 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
490 CMH::Target: 'static + ChannelMessageHandler,
491 OMH::Target: 'static + OnionMessageHandler,
492 RMH::Target: 'static + RoutingMessageHandler,
493 UMH::Target: 'static + CustomMessageHandler,
494 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
496 let mut should_break = true;
497 let async_event_handler = |event| {
498 let network_graph = gossip_sync.network_graph();
499 let event_handler = &event_handler;
500 let scorer = &scorer;
502 if let Some(network_graph) = network_graph {
503 handle_network_graph_update(network_graph, &event)
505 if let Some(ref scorer) = scorer {
506 update_scorer(scorer, &event);
508 event_handler(event).await;
511 define_run_body!(persister,
512 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
513 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
514 gossip_sync, peer_manager, logger, scorer, should_break, {
516 a: channel_manager.get_persistable_update_future(),
517 b: sleeper(Duration::from_millis(100)),
520 SelectorOutput::A => true,
521 SelectorOutput::B(exit) => {
526 }, |t| sleeper(Duration::from_secs(t)),
527 |fut: &mut SleepFuture, _| {
528 let mut waker = dummy_waker();
529 let mut ctx = task::Context::from_waker(&mut waker);
530 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
534 #[cfg(feature = "std")]
535 impl BackgroundProcessor {
536 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
539 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
540 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
541 /// either [`join`] or [`stop`].
543 /// # Data Persistence
545 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
546 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
547 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
548 /// provided implementation.
550 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
551 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
552 /// See the `lightning-persister` crate for LDK's provided implementation.
554 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
555 /// error or call [`join`] and handle any error that may arise. For the latter case,
556 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
560 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
561 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
562 /// functionality implemented by other handlers.
563 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
565 /// # Rapid Gossip Sync
567 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
568 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
569 /// until the [`RapidGossipSync`] instance completes its first sync.
571 /// [top-level documentation]: BackgroundProcessor
572 /// [`join`]: Self::join
573 /// [`stop`]: Self::stop
574 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
575 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
576 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
577 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
578 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
579 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
582 UL: 'static + Deref + Send + Sync,
583 CF: 'static + Deref + Send + Sync,
584 CW: 'static + Deref + Send + Sync,
585 T: 'static + Deref + Send + Sync,
586 ES: 'static + Deref + Send + Sync,
587 NS: 'static + Deref + Send + Sync,
588 SP: 'static + Deref + Send + Sync,
589 F: 'static + Deref + Send + Sync,
590 R: 'static + Deref + Send + Sync,
591 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
592 L: 'static + Deref + Send + Sync,
593 P: 'static + Deref + Send + Sync,
594 Descriptor: 'static + SocketDescriptor + Send + Sync,
595 CMH: 'static + Deref + Send + Sync,
596 OMH: 'static + Deref + Send + Sync,
597 RMH: 'static + Deref + Send + Sync,
598 EH: 'static + EventHandler + Send,
599 PS: 'static + Deref + Send,
600 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
601 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
602 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
603 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
604 UMH: 'static + Deref + Send + Sync,
605 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
606 S: 'static + Deref<Target = SC> + Send + Sync,
607 SC: for <'b> WriteableScore<'b>,
609 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
610 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
613 UL::Target: 'static + UtxoLookup,
614 CF::Target: 'static + chain::Filter,
615 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
616 T::Target: 'static + BroadcasterInterface,
617 ES::Target: 'static + EntropySource,
618 NS::Target: 'static + NodeSigner,
619 SP::Target: 'static + SignerProvider,
620 F::Target: 'static + FeeEstimator,
621 R::Target: 'static + Router,
622 L::Target: 'static + Logger,
623 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
624 CMH::Target: 'static + ChannelMessageHandler,
625 OMH::Target: 'static + OnionMessageHandler,
626 RMH::Target: 'static + RoutingMessageHandler,
627 UMH::Target: 'static + CustomMessageHandler,
628 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
630 let stop_thread = Arc::new(AtomicBool::new(false));
631 let stop_thread_clone = stop_thread.clone();
632 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
633 let event_handler = |event| {
634 let network_graph = gossip_sync.network_graph();
635 if let Some(network_graph) = network_graph {
636 handle_network_graph_update(network_graph, &event)
638 if let Some(ref scorer) = scorer {
639 update_scorer(scorer, &event);
641 event_handler.handle_event(event);
643 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
644 channel_manager, channel_manager.process_pending_events(&event_handler),
645 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
646 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
647 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
649 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
652 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
653 /// [`ChannelManager`].
657 /// This function panics if the background thread has panicked such as while persisting or
660 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
661 pub fn join(mut self) -> Result<(), std::io::Error> {
662 assert!(self.thread_handle.is_some());
666 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
667 /// [`ChannelManager`].
671 /// This function panics if the background thread has panicked such as while persisting or
674 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
675 pub fn stop(mut self) -> Result<(), std::io::Error> {
676 assert!(self.thread_handle.is_some());
677 self.stop_and_join_thread()
680 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
681 self.stop_thread.store(true, Ordering::Release);
685 fn join_thread(&mut self) -> Result<(), std::io::Error> {
686 match self.thread_handle.take() {
687 Some(handle) => handle.join().unwrap(),
693 #[cfg(feature = "std")]
694 impl Drop for BackgroundProcessor {
696 self.stop_and_join_thread().unwrap();
700 #[cfg(all(feature = "std", test))]
702 use bitcoin::blockdata::block::BlockHeader;
703 use bitcoin::blockdata::constants::genesis_block;
704 use bitcoin::blockdata::locktime::PackedLockTime;
705 use bitcoin::blockdata::transaction::{Transaction, TxOut};
706 use bitcoin::network::constants::Network;
707 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
708 use lightning::chain::{BestBlock, Confirm, chainmonitor};
709 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
710 use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
711 use lightning::chain::transaction::OutPoint;
712 use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
713 use lightning::{get_event_msg, get_event};
714 use lightning::ln::PaymentHash;
715 use lightning::ln::channelmanager;
716 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
717 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
718 use lightning::ln::msgs::{ChannelMessageHandler, Init};
719 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
720 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
721 use lightning::routing::router::{DefaultRouter, RouteHop};
722 use lightning::routing::scoring::{ChannelUsage, Score};
723 use lightning::util::config::UserConfig;
724 use lightning::util::ser::Writeable;
725 use lightning::util::test_utils;
726 use lightning::util::persist::KVStorePersister;
727 use lightning_persister::FilesystemPersister;
728 use std::collections::VecDeque;
730 use std::path::PathBuf;
731 use std::sync::{Arc, Mutex};
732 use std::sync::mpsc::SyncSender;
733 use std::time::Duration;
734 use bitcoin::hashes::Hash;
735 use bitcoin::TxMerkleNode;
736 use lightning_rapid_gossip_sync::RapidGossipSync;
737 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
739 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
741 #[derive(Clone, Hash, PartialEq, Eq)]
742 struct TestDescriptor{}
743 impl SocketDescriptor for TestDescriptor {
744 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
748 fn disconnect_socket(&mut self) {}
751 type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
753 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
755 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
756 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
759 node: Arc<ChannelManager>,
760 p2p_gossip_sync: PGS,
761 rapid_gossip_sync: RGS,
762 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
763 chain_monitor: Arc<ChainMonitor>,
764 persister: Arc<FilesystemPersister>,
765 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
766 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
767 logger: Arc<test_utils::TestLogger>,
768 best_block: BestBlock,
769 scorer: Arc<Mutex<TestScorer>>,
773 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
774 GossipSync::P2P(self.p2p_gossip_sync.clone())
777 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
778 GossipSync::Rapid(self.rapid_gossip_sync.clone())
781 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
788 let data_dir = self.persister.get_data_dir();
789 match fs::remove_dir_all(data_dir.clone()) {
790 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
797 graph_error: Option<(std::io::ErrorKind, &'static str)>,
798 graph_persistence_notifier: Option<SyncSender<()>>,
799 manager_error: Option<(std::io::ErrorKind, &'static str)>,
800 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
801 filesystem_persister: FilesystemPersister,
805 fn new(data_dir: String) -> Self {
806 let filesystem_persister = FilesystemPersister::new(data_dir);
807 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
810 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
811 Self { graph_error: Some((error, message)), ..self }
814 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
815 Self { graph_persistence_notifier: Some(sender), ..self }
818 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
819 Self { manager_error: Some((error, message)), ..self }
822 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
823 Self { scorer_error: Some((error, message)), ..self }
827 impl KVStorePersister for Persister {
828 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
829 if key == "manager" {
830 if let Some((error, message)) = self.manager_error {
831 return Err(std::io::Error::new(error, message))
835 if key == "network_graph" {
836 if let Some(sender) = &self.graph_persistence_notifier {
837 sender.send(()).unwrap();
840 if let Some((error, message)) = self.graph_error {
841 return Err(std::io::Error::new(error, message))
846 if let Some((error, message)) = self.scorer_error {
847 return Err(std::io::Error::new(error, message))
851 self.filesystem_persister.persist(key, object)
856 event_expectations: Option<VecDeque<TestResult>>,
861 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
862 PaymentSuccess { path: Vec<RouteHop> },
863 ProbeFailure { path: Vec<RouteHop> },
864 ProbeSuccess { path: Vec<RouteHop> },
869 Self { event_expectations: None }
872 fn expect(&mut self, expectation: TestResult) {
873 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
877 impl lightning::util::ser::Writeable for TestScorer {
878 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
881 impl Score for TestScorer {
882 fn channel_penalty_msat(
883 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
884 ) -> u64 { unimplemented!(); }
886 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
887 if let Some(expectations) = &mut self.event_expectations {
888 match expectations.pop_front().unwrap() {
889 TestResult::PaymentFailure { path, short_channel_id } => {
890 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
891 assert_eq!(actual_short_channel_id, short_channel_id);
893 TestResult::PaymentSuccess { path } => {
894 panic!("Unexpected successful payment path: {:?}", path)
896 TestResult::ProbeFailure { path } => {
897 panic!("Unexpected probe failure: {:?}", path)
899 TestResult::ProbeSuccess { path } => {
900 panic!("Unexpected probe success: {:?}", path)
906 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
907 if let Some(expectations) = &mut self.event_expectations {
908 match expectations.pop_front().unwrap() {
909 TestResult::PaymentFailure { path, .. } => {
910 panic!("Unexpected payment path failure: {:?}", path)
912 TestResult::PaymentSuccess { path } => {
913 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
915 TestResult::ProbeFailure { path } => {
916 panic!("Unexpected probe failure: {:?}", path)
918 TestResult::ProbeSuccess { path } => {
919 panic!("Unexpected probe success: {:?}", path)
925 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
926 if let Some(expectations) = &mut self.event_expectations {
927 match expectations.pop_front().unwrap() {
928 TestResult::PaymentFailure { path, .. } => {
929 panic!("Unexpected payment path failure: {:?}", path)
931 TestResult::PaymentSuccess { path } => {
932 panic!("Unexpected payment path success: {:?}", path)
934 TestResult::ProbeFailure { path } => {
935 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
937 TestResult::ProbeSuccess { path } => {
938 panic!("Unexpected probe success: {:?}", path)
943 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
944 if let Some(expectations) = &mut self.event_expectations {
945 match expectations.pop_front().unwrap() {
946 TestResult::PaymentFailure { path, .. } => {
947 panic!("Unexpected payment path failure: {:?}", path)
949 TestResult::PaymentSuccess { path } => {
950 panic!("Unexpected payment path success: {:?}", path)
952 TestResult::ProbeFailure { path } => {
953 panic!("Unexpected probe failure: {:?}", path)
955 TestResult::ProbeSuccess { path } => {
956 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
963 impl Drop for TestScorer {
965 if std::thread::panicking() {
969 if let Some(event_expectations) = &self.event_expectations {
970 if !event_expectations.is_empty() {
971 panic!("Unsatisfied event expectations: {:?}", event_expectations);
977 fn get_full_filepath(filepath: String, filename: String) -> String {
978 let mut path = PathBuf::from(filepath);
980 path.to_str().unwrap().to_string()
983 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
984 let mut nodes = Vec::new();
985 for i in 0..num_nodes {
986 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
987 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
988 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
989 let network = Network::Testnet;
990 let genesis_block = genesis_block(network);
991 let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
992 let scorer = Arc::new(Mutex::new(TestScorer::new()));
993 let seed = [i as u8; 32];
994 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
995 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
996 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
997 let now = Duration::from_secs(genesis_block.header.time as u64);
998 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
999 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
1000 let best_block = BestBlock::from_network(network);
1001 let params = ChainParameters { network, best_block };
1002 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
1003 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1004 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1005 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
1006 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
1007 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
1011 for i in 0..num_nodes {
1012 for j in (i+1)..num_nodes {
1013 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap();
1014 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap();
1021 macro_rules! open_channel {
1022 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1023 begin_open_channel!($node_a, $node_b, $channel_value);
1024 let events = $node_a.node.get_and_clear_pending_events();
1025 assert_eq!(events.len(), 1);
1026 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1027 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
1032 macro_rules! begin_open_channel {
1033 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1034 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
1035 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1036 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1040 macro_rules! handle_funding_generation_ready {
1041 ($event: expr, $channel_value: expr) => {{
1043 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1044 assert_eq!(channel_value_satoshis, $channel_value);
1045 assert_eq!(user_channel_id, 42);
1047 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1048 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1050 (temporary_channel_id, tx)
1052 _ => panic!("Unexpected event"),
1057 macro_rules! end_open_channel {
1058 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1059 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1060 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1061 get_event!($node_b, Event::ChannelPending);
1063 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1064 get_event!($node_a, Event::ChannelPending);
1068 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1069 for i in 1..=depth {
1070 let prev_blockhash = node.best_block.block_hash();
1071 let height = node.best_block.height() + 1;
1072 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1073 let txdata = vec![(0, tx)];
1074 node.best_block = BestBlock::new(header.block_hash(), height);
1077 node.node.transactions_confirmed(&header, &txdata, height);
1078 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1080 x if x == depth => {
1081 node.node.best_block_updated(&header, height);
1082 node.chain_monitor.best_block_updated(&header, height);
1088 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1089 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1093 fn test_background_processor() {
1094 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1095 // updates. Also test that when new updates are available, the manager signals that it needs
1096 // re-persistence and is successfully re-persisted.
1097 let nodes = create_nodes(2, "test_background_processor".to_string());
1099 // Go through the channel creation process so that each node has something to persist. Since
1100 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1101 // avoid a race with processing events.
1102 let tx = open_channel!(nodes[0], nodes[1], 100000);
1104 // Initiate the background processors to watch each node.
1105 let data_dir = nodes[0].persister.get_data_dir();
1106 let persister = Arc::new(Persister::new(data_dir));
1107 let event_handler = |_: _| {};
1108 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1110 macro_rules! check_persisted_data {
1111 ($node: expr, $filepath: expr) => {
1112 let mut expected_bytes = Vec::new();
1114 expected_bytes.clear();
1115 match $node.write(&mut expected_bytes) {
1117 match std::fs::read($filepath) {
1119 if bytes == expected_bytes {
1128 Err(e) => panic!("Unexpected error: {}", e)
1134 // Check that the initial channel manager data is persisted as expected.
1135 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1136 check_persisted_data!(nodes[0].node, filepath.clone());
1139 if !nodes[0].node.get_persistence_condvar_value() { break }
1142 // Force-close the channel.
1143 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1145 // Check that the force-close updates are persisted.
1146 check_persisted_data!(nodes[0].node, filepath.clone());
1148 if !nodes[0].node.get_persistence_condvar_value() { break }
1151 // Check network graph is persisted
1152 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1153 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1155 // Check scorer is persisted
1156 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1157 check_persisted_data!(nodes[0].scorer, filepath.clone());
1159 assert!(bg_processor.stop().is_ok());
1163 fn test_timer_tick_called() {
1164 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1165 // `FRESHNESS_TIMER`.
1166 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1167 let data_dir = nodes[0].persister.get_data_dir();
1168 let persister = Arc::new(Persister::new(data_dir));
1169 let event_handler = |_: _| {};
1170 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1172 let log_entries = nodes[0].logger.lines.lock().unwrap();
1173 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1174 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1175 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1176 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1181 assert!(bg_processor.stop().is_ok());
1185 fn test_channel_manager_persist_error() {
1186 // Test that if we encounter an error during manager persistence, the thread panics.
1187 let nodes = create_nodes(2, "test_persist_error".to_string());
1188 open_channel!(nodes[0], nodes[1], 100000);
1190 let data_dir = nodes[0].persister.get_data_dir();
1191 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1192 let event_handler = |_: _| {};
1193 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1194 match bg_processor.join() {
1195 Ok(_) => panic!("Expected error persisting manager"),
1197 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1198 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1204 fn test_network_graph_persist_error() {
1205 // Test that if we encounter an error during network graph persistence, an error gets returned.
1206 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1207 let data_dir = nodes[0].persister.get_data_dir();
1208 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1209 let event_handler = |_: _| {};
1210 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1212 match bg_processor.stop() {
1213 Ok(_) => panic!("Expected error persisting network graph"),
1215 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1216 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1222 fn test_scorer_persist_error() {
1223 // Test that if we encounter an error during scorer persistence, an error gets returned.
1224 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1225 let data_dir = nodes[0].persister.get_data_dir();
1226 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1227 let event_handler = |_: _| {};
1228 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1230 match bg_processor.stop() {
1231 Ok(_) => panic!("Expected error persisting scorer"),
1233 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1234 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1240 fn test_background_event_handling() {
1241 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1242 let channel_value = 100000;
1243 let data_dir = nodes[0].persister.get_data_dir();
1244 let persister = Arc::new(Persister::new(data_dir.clone()));
1246 // Set up a background event handler for FundingGenerationReady events.
1247 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1248 let event_handler = move |event: Event| match event {
1249 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1250 Event::ChannelReady { .. } => {},
1251 _ => panic!("Unexpected event: {:?}", event),
1254 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1256 // Open a channel and check that the FundingGenerationReady event was handled.
1257 begin_open_channel!(nodes[0], nodes[1], channel_value);
1258 let (temporary_channel_id, funding_tx) = receiver
1259 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1260 .expect("FundingGenerationReady not handled within deadline");
1261 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1263 // Confirm the funding transaction.
1264 confirm_transaction(&mut nodes[0], &funding_tx);
1265 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1266 confirm_transaction(&mut nodes[1], &funding_tx);
1267 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1268 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1269 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1270 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1271 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1273 assert!(bg_processor.stop().is_ok());
1275 // Set up a background event handler for SpendableOutputs events.
1276 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1277 let event_handler = move |event: Event| match event {
1278 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1279 Event::ChannelReady { .. } => {},
1280 Event::ChannelClosed { .. } => {},
1281 _ => panic!("Unexpected event: {:?}", event),
1283 let persister = Arc::new(Persister::new(data_dir));
1284 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1286 // Force close the channel and check that the SpendableOutputs event was handled.
1287 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1288 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1289 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1291 let event = receiver
1292 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1293 .expect("Events not handled within deadline");
1295 Event::SpendableOutputs { .. } => {},
1296 _ => panic!("Unexpected event: {:?}", event),
1299 assert!(bg_processor.stop().is_ok());
1303 fn test_scorer_persistence() {
1304 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1305 let data_dir = nodes[0].persister.get_data_dir();
1306 let persister = Arc::new(Persister::new(data_dir));
1307 let event_handler = |_: _| {};
1308 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1311 let log_entries = nodes[0].logger.lines.lock().unwrap();
1312 let expected_log = "Persisting scorer".to_string();
1313 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1318 assert!(bg_processor.stop().is_ok());
1322 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1323 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1324 let data_dir = nodes[0].persister.get_data_dir();
1325 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1326 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1327 let network_graph = nodes[0].network_graph.clone();
1328 let features = ChannelFeatures::empty();
1329 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1330 .expect("Failed to update channel from partial announcement");
1331 let original_graph_description = network_graph.to_string();
1332 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1333 assert_eq!(network_graph.read_only().channels().len(), 1);
1335 let event_handler = |_: _| {};
1336 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1339 let log_entries = nodes[0].logger.lines.lock().unwrap();
1340 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1341 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1344 // Wait until the loop has gone around at least twice.
1349 let initialization_input = vec![
1350 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1351 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1352 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1353 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1354 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1355 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1356 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1357 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1358 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1359 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1360 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1361 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1362 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1364 nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1366 // this should have added two channels
1367 assert_eq!(network_graph.read_only().channels().len(), 3);
1370 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1371 .expect("Network graph not pruned within deadline");
1373 background_processor.stop().unwrap();
1375 // all channels should now be pruned
1376 assert_eq!(network_graph.read_only().channels().len(), 0);
1380 fn test_payment_path_scoring() {
1381 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1382 // that we update the scorer upon a payment path succeeding (note that the channel must be
1383 // public or else we won't score it).
1384 // Set up a background event handler for FundingGenerationReady events.
1385 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1386 let event_handler = move |event: Event| match event {
1387 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1388 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1389 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1390 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1391 _ => panic!("Unexpected event: {:?}", event),
1394 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1395 let data_dir = nodes[0].persister.get_data_dir();
1396 let persister = Arc::new(Persister::new(data_dir));
1397 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1399 let scored_scid = 4242;
1400 let secp_ctx = Secp256k1::new();
1401 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1402 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1404 let path = vec![RouteHop {
1406 node_features: NodeFeatures::empty(),
1407 short_channel_id: scored_scid,
1408 channel_features: ChannelFeatures::empty(),
1410 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1413 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1414 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1416 payment_hash: PaymentHash([42; 32]),
1417 payment_failed_permanently: false,
1418 failure: PathFailure::OnPath { network_update: None },
1420 short_channel_id: Some(scored_scid),
1422 let event = receiver
1423 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1424 .expect("PaymentPathFailed not handled within deadline");
1426 Event::PaymentPathFailed { .. } => {},
1427 _ => panic!("Unexpected event"),
1430 // Ensure we'll score payments that were explicitly failed back by the destination as
1432 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1433 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1435 payment_hash: PaymentHash([42; 32]),
1436 payment_failed_permanently: true,
1437 failure: PathFailure::OnPath { network_update: None },
1439 short_channel_id: None,
1441 let event = receiver
1442 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1443 .expect("PaymentPathFailed not handled within deadline");
1445 Event::PaymentPathFailed { .. } => {},
1446 _ => panic!("Unexpected event"),
1449 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1450 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1451 payment_id: PaymentId([42; 32]),
1455 let event = receiver
1456 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1457 .expect("PaymentPathSuccessful not handled within deadline");
1459 Event::PaymentPathSuccessful { .. } => {},
1460 _ => panic!("Unexpected event"),
1463 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1464 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1465 payment_id: PaymentId([42; 32]),
1466 payment_hash: PaymentHash([42; 32]),
1469 let event = receiver
1470 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1471 .expect("ProbeSuccessful not handled within deadline");
1473 Event::ProbeSuccessful { .. } => {},
1474 _ => panic!("Unexpected event"),
1477 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1478 nodes[0].node.push_pending_event(Event::ProbeFailed {
1479 payment_id: PaymentId([42; 32]),
1480 payment_hash: PaymentHash([42; 32]),
1482 short_channel_id: Some(scored_scid),
1484 let event = receiver
1485 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1486 .expect("ProbeFailure not handled within deadline");
1488 Event::ProbeFailed { .. } => {},
1489 _ => panic!("Unexpected event"),
1492 assert!(bg_processor.stop().is_ok());