1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
10 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[cfg(not(feature = "std"))]
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::events::{Event, PathFailure};
30 #[cfg(feature = "std")]
31 use lightning::events::{EventHandler, EventsProvider};
32 use lightning::ln::channelmanager::ChannelManager;
33 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
34 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
35 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
36 use lightning::routing::utxo::UtxoLookup;
37 use lightning::routing::router::Router;
38 use lightning::routing::scoring::{Score, WriteableScore};
39 use lightning::util::logger::Logger;
40 use lightning::util::persist::Persister;
41 #[cfg(feature = "std")]
42 use lightning::util::wakers::Sleeper;
43 use lightning_rapid_gossip_sync::RapidGossipSync;
46 use core::time::Duration;
48 #[cfg(feature = "std")]
50 #[cfg(feature = "std")]
51 use core::sync::atomic::{AtomicBool, Ordering};
52 #[cfg(feature = "std")]
53 use std::thread::{self, JoinHandle};
54 #[cfg(feature = "std")]
55 use std::time::Instant;
57 #[cfg(not(feature = "std"))]
60 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
61 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
62 /// responsibilities are:
63 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
64 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
65 /// writing it to disk/backups by invoking the callback given to it at startup.
66 /// [`ChannelManager`] persistence should be done in the background.
67 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
68 /// at the appropriate intervals.
69 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
70 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
72 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
73 /// upon as doing so may result in high latency.
77 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
78 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
79 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
80 /// unilateral chain closure fees are at risk.
82 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
83 /// [`Event`]: lightning::events::Event
84 #[cfg(feature = "std")]
85 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
86 pub struct BackgroundProcessor {
87 stop_thread: Arc<AtomicBool>,
88 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
92 const FRESHNESS_TIMER: u64 = 60;
94 const FRESHNESS_TIMER: u64 = 1;
96 #[cfg(all(not(test), not(debug_assertions)))]
97 const PING_TIMER: u64 = 10;
98 /// Signature operations take a lot longer without compiler optimisations.
99 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
100 /// timeout is reached.
101 #[cfg(all(not(test), debug_assertions))]
102 const PING_TIMER: u64 = 30;
104 const PING_TIMER: u64 = 1;
106 /// Prune the network graph of stale entries hourly.
107 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
110 const SCORER_PERSIST_TIMER: u64 = 30;
112 const SCORER_PERSIST_TIMER: u64 = 1;
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
117 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
119 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
121 P: Deref<Target = P2PGossipSync<G, U, L>>,
122 R: Deref<Target = RapidGossipSync<G, L>>,
123 G: Deref<Target = NetworkGraph<L>>,
127 where U::Target: UtxoLookup, L::Target: Logger {
128 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
130 /// Rapid gossip sync from a trusted server.
137 P: Deref<Target = P2PGossipSync<G, U, L>>,
138 R: Deref<Target = RapidGossipSync<G, L>>,
139 G: Deref<Target = NetworkGraph<L>>,
142 > GossipSync<P, R, G, U, L>
143 where U::Target: UtxoLookup, L::Target: Logger {
144 fn network_graph(&self) -> Option<&G> {
146 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
147 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
148 GossipSync::None => None,
152 fn prunable_network_graph(&self) -> Option<&G> {
154 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
155 GossipSync::Rapid(gossip_sync) => {
156 if gossip_sync.is_initial_sync_complete() {
157 Some(gossip_sync.network_graph())
162 GossipSync::None => None,
167 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
168 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
169 GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
171 U::Target: UtxoLookup,
174 /// Initializes a new [`GossipSync::P2P`] variant.
175 pub fn p2p(gossip_sync: P) -> Self {
176 GossipSync::P2P(gossip_sync)
180 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
181 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
183 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
186 &'a (dyn UtxoLookup + Send + Sync),
192 /// Initializes a new [`GossipSync::Rapid`] variant.
193 pub fn rapid(gossip_sync: R) -> Self {
194 GossipSync::Rapid(gossip_sync)
198 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
201 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
202 &RapidGossipSync<&'a NetworkGraph<L>, L>,
204 &'a (dyn UtxoLookup + Send + Sync),
210 /// Initializes a new [`GossipSync::None`] variant.
211 pub fn none() -> Self {
216 fn handle_network_graph_update<L: Deref>(
217 network_graph: &NetworkGraph<L>, event: &Event
218 ) where L::Target: Logger {
219 if let Event::PaymentPathFailed {
220 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
222 network_graph.handle_network_update(upd);
226 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
227 scorer: &'a S, event: &Event
229 let mut score = scorer.lock();
231 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
232 let path = path.iter().collect::<Vec<_>>();
233 score.payment_path_failed(&path, *scid);
235 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
236 // Reached if the destination explicitly failed it back. We treat this as a successful probe
237 // because the payment made it all the way to the destination with sufficient liquidity.
238 let path = path.iter().collect::<Vec<_>>();
239 score.probe_successful(&path);
241 Event::PaymentPathSuccessful { path, .. } => {
242 let path = path.iter().collect::<Vec<_>>();
243 score.payment_path_successful(&path);
245 Event::ProbeSuccessful { path, .. } => {
246 let path = path.iter().collect::<Vec<_>>();
247 score.probe_successful(&path);
249 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
250 let path = path.iter().collect::<Vec<_>>();
251 score.probe_failed(&path, *scid);
257 macro_rules! define_run_body {
258 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
259 $channel_manager: ident, $process_channel_manager_events: expr,
260 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
261 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
263 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
264 $channel_manager.timer_tick_occurred();
266 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
267 let mut last_ping_call = $get_timer(PING_TIMER);
268 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
269 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
270 let mut have_pruned = false;
273 $process_channel_manager_events;
274 $process_chain_monitor_events;
276 // Note that the PeerManager::process_events may block on ChannelManager's locks,
277 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
278 // we want to ensure we get into `persist_manager` as quickly as we can, especially
279 // without running the normal event processing above and handing events to users.
281 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
282 // processing a message effectively at any point during this loop. In order to
283 // minimize the time between such processing completing and persisting the updated
284 // ChannelManager, we want to minimize methods blocking on a ChannelManager
285 // generally, and as a fallback place such blocking only immediately before
287 $peer_manager.process_events();
289 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
290 // see `await_start`'s use below.
291 let mut await_start = $get_timer(1);
292 let updates_available = $await;
293 let await_slow = $timer_elapsed(&mut await_start, 1);
295 if updates_available {
296 log_trace!($logger, "Persisting ChannelManager...");
297 $persister.persist_manager(&*$channel_manager)?;
298 log_trace!($logger, "Done persisting ChannelManager.");
300 // Exit the loop if the background processor was requested to stop.
301 if $loop_exit_check {
302 log_trace!($logger, "Terminating background processor.");
305 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
306 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
307 $channel_manager.timer_tick_occurred();
308 last_freshness_call = $get_timer(FRESHNESS_TIMER);
311 // On various platforms, we may be starved of CPU cycles for several reasons.
312 // E.g. on iOS, if we've been in the background, we will be entirely paused.
313 // Similarly, if we're on a desktop platform and the device has been asleep, we
314 // may not get any cycles.
315 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
316 // full second, at which point we assume sockets may have been killed (they
317 // appear to be at least on some platforms, even if it has only been a second).
318 // Note that we have to take care to not get here just because user event
319 // processing was slow at the top of the loop. For example, the sample client
320 // may call Bitcoin Core RPCs during event handling, which very often takes
321 // more than a handful of seconds to complete, and shouldn't disconnect all our
323 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
324 $peer_manager.disconnect_all_peers();
325 last_ping_call = $get_timer(PING_TIMER);
326 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
327 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
328 $peer_manager.timer_tick_occurred();
329 last_ping_call = $get_timer(PING_TIMER);
332 // Note that we want to run a graph prune once not long after startup before
333 // falling back to our usual hourly prunes. This avoids short-lived clients never
334 // pruning their network graph. We run once 60 seconds after startup before
335 // continuing our normal cadence.
336 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
337 // The network graph must not be pruned while rapid sync completion is pending
338 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
339 #[cfg(feature = "std")] {
340 log_trace!($logger, "Pruning and persisting network graph.");
341 network_graph.remove_stale_channels_and_tracking();
343 #[cfg(not(feature = "std"))] {
344 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
345 log_trace!($logger, "Persisting network graph.");
348 if let Err(e) = $persister.persist_graph(network_graph) {
349 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
354 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
357 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
358 if let Some(ref scorer) = $scorer {
359 log_trace!($logger, "Persisting scorer");
360 if let Err(e) = $persister.persist_scorer(&scorer) {
361 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
364 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
368 // After we exit, ensure we persist the ChannelManager one final time - this avoids
369 // some races where users quit while channel updates were in-flight, with
370 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
371 $persister.persist_manager(&*$channel_manager)?;
373 // Persist Scorer on exit
374 if let Some(ref scorer) = $scorer {
375 $persister.persist_scorer(&scorer)?;
378 // Persist NetworkGraph on exit
379 if let Some(network_graph) = $gossip_sync.network_graph() {
380 $persister.persist_graph(network_graph)?;
387 #[cfg(feature = "futures")]
388 pub(crate) mod futures_util {
389 use core::future::Future;
390 use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
392 use core::marker::Unpin;
393 pub(crate) struct Selector<
394 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
400 pub(crate) enum SelectorOutput {
405 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
406 > Future for Selector<A, B, C> {
407 type Output = SelectorOutput;
408 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
409 match Pin::new(&mut self.a).poll(ctx) {
410 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
413 match Pin::new(&mut self.b).poll(ctx) {
414 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
417 match Pin::new(&mut self.c).poll(ctx) {
418 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
425 // If we want to poll a future without an async context to figure out if it has completed or
426 // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
427 // but sadly there's a good bit of boilerplate here.
428 fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
429 fn dummy_waker_action(_: *const ()) { }
431 const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
432 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
433 pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
435 #[cfg(feature = "futures")]
436 use futures_util::{Selector, SelectorOutput, dummy_waker};
437 #[cfg(feature = "futures")]
440 /// Processes background events in a future.
442 /// `sleeper` should return a future which completes in the given amount of time and returns a
443 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
444 /// future which outputs true, the loop will exit and this function's future will complete.
446 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
448 /// Requires the `futures` feature. Note that while this method is available without the `std`
449 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
450 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
451 /// manually instead.
452 #[cfg(feature = "futures")]
453 pub async fn process_events_async<
455 UL: 'static + Deref + Send + Sync,
456 CF: 'static + Deref + Send + Sync,
457 CW: 'static + Deref + Send + Sync,
458 T: 'static + Deref + Send + Sync,
459 ES: 'static + Deref + Send + Sync,
460 NS: 'static + Deref + Send + Sync,
461 SP: 'static + Deref + Send + Sync,
462 F: 'static + Deref + Send + Sync,
463 R: 'static + Deref + Send + Sync,
464 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
465 L: 'static + Deref + Send + Sync,
466 P: 'static + Deref + Send + Sync,
467 Descriptor: 'static + SocketDescriptor + Send + Sync,
468 CMH: 'static + Deref + Send + Sync,
469 RMH: 'static + Deref + Send + Sync,
470 OMH: 'static + Deref + Send + Sync,
471 EventHandlerFuture: core::future::Future<Output = ()>,
472 EventHandler: Fn(Event) -> EventHandlerFuture,
473 PS: 'static + Deref + Send,
474 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
475 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
476 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
477 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
478 UMH: 'static + Deref + Send + Sync,
479 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
480 S: 'static + Deref<Target = SC> + Send + Sync,
481 SC: for<'b> WriteableScore<'b>,
482 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
483 Sleeper: Fn(Duration) -> SleepFuture
485 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
486 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
488 ) -> Result<(), lightning::io::Error>
490 UL::Target: 'static + UtxoLookup,
491 CF::Target: 'static + chain::Filter,
492 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
493 T::Target: 'static + BroadcasterInterface,
494 ES::Target: 'static + EntropySource,
495 NS::Target: 'static + NodeSigner,
496 SP::Target: 'static + SignerProvider,
497 F::Target: 'static + FeeEstimator,
498 R::Target: 'static + Router,
499 L::Target: 'static + Logger,
500 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
501 CMH::Target: 'static + ChannelMessageHandler,
502 OMH::Target: 'static + OnionMessageHandler,
503 RMH::Target: 'static + RoutingMessageHandler,
504 UMH::Target: 'static + CustomMessageHandler,
505 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
507 let mut should_break = true;
508 let async_event_handler = |event| {
509 let network_graph = gossip_sync.network_graph();
510 let event_handler = &event_handler;
511 let scorer = &scorer;
513 if let Some(network_graph) = network_graph {
514 handle_network_graph_update(network_graph, &event)
516 if let Some(ref scorer) = scorer {
517 update_scorer(scorer, &event);
519 event_handler(event).await;
522 define_run_body!(persister,
523 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
524 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
525 gossip_sync, peer_manager, logger, scorer, should_break, {
527 a: channel_manager.get_persistable_update_future(),
528 b: chain_monitor.get_update_future(),
529 c: sleeper(Duration::from_millis(100)),
532 SelectorOutput::A => true,
533 SelectorOutput::B => false,
534 SelectorOutput::C(exit) => {
539 }, |t| sleeper(Duration::from_secs(t)),
540 |fut: &mut SleepFuture, _| {
541 let mut waker = dummy_waker();
542 let mut ctx = task::Context::from_waker(&mut waker);
543 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
547 #[cfg(feature = "std")]
548 impl BackgroundProcessor {
549 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
552 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
553 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
554 /// either [`join`] or [`stop`].
556 /// # Data Persistence
558 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
559 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
560 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
561 /// provided implementation.
563 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
564 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
565 /// See the `lightning-persister` crate for LDK's provided implementation.
567 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
568 /// error or call [`join`] and handle any error that may arise. For the latter case,
569 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
573 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
574 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
575 /// functionality implemented by other handlers.
576 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
578 /// # Rapid Gossip Sync
580 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
581 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
582 /// until the [`RapidGossipSync`] instance completes its first sync.
584 /// [top-level documentation]: BackgroundProcessor
585 /// [`join`]: Self::join
586 /// [`stop`]: Self::stop
587 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
588 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
589 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
590 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
591 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
592 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
595 UL: 'static + Deref + Send + Sync,
596 CF: 'static + Deref + Send + Sync,
597 CW: 'static + Deref + Send + Sync,
598 T: 'static + Deref + Send + Sync,
599 ES: 'static + Deref + Send + Sync,
600 NS: 'static + Deref + Send + Sync,
601 SP: 'static + Deref + Send + Sync,
602 F: 'static + Deref + Send + Sync,
603 R: 'static + Deref + Send + Sync,
604 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
605 L: 'static + Deref + Send + Sync,
606 P: 'static + Deref + Send + Sync,
607 Descriptor: 'static + SocketDescriptor + Send + Sync,
608 CMH: 'static + Deref + Send + Sync,
609 OMH: 'static + Deref + Send + Sync,
610 RMH: 'static + Deref + Send + Sync,
611 EH: 'static + EventHandler + Send,
612 PS: 'static + Deref + Send,
613 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
614 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
615 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
616 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
617 UMH: 'static + Deref + Send + Sync,
618 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
619 S: 'static + Deref<Target = SC> + Send + Sync,
620 SC: for <'b> WriteableScore<'b>,
622 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
623 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
626 UL::Target: 'static + UtxoLookup,
627 CF::Target: 'static + chain::Filter,
628 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
629 T::Target: 'static + BroadcasterInterface,
630 ES::Target: 'static + EntropySource,
631 NS::Target: 'static + NodeSigner,
632 SP::Target: 'static + SignerProvider,
633 F::Target: 'static + FeeEstimator,
634 R::Target: 'static + Router,
635 L::Target: 'static + Logger,
636 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
637 CMH::Target: 'static + ChannelMessageHandler,
638 OMH::Target: 'static + OnionMessageHandler,
639 RMH::Target: 'static + RoutingMessageHandler,
640 UMH::Target: 'static + CustomMessageHandler,
641 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
643 let stop_thread = Arc::new(AtomicBool::new(false));
644 let stop_thread_clone = stop_thread.clone();
645 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
646 let event_handler = |event| {
647 let network_graph = gossip_sync.network_graph();
648 if let Some(network_graph) = network_graph {
649 handle_network_graph_update(network_graph, &event)
651 if let Some(ref scorer) = scorer {
652 update_scorer(scorer, &event);
654 event_handler.handle_event(event);
656 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
657 channel_manager, channel_manager.process_pending_events(&event_handler),
658 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
659 Sleeper::from_two_futures(
660 channel_manager.get_persistable_update_future(),
661 chain_monitor.get_update_future()
662 ).wait_timeout(Duration::from_millis(100)),
663 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
665 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
668 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
669 /// [`ChannelManager`].
673 /// This function panics if the background thread has panicked such as while persisting or
676 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
677 pub fn join(mut self) -> Result<(), std::io::Error> {
678 assert!(self.thread_handle.is_some());
682 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
683 /// [`ChannelManager`].
687 /// This function panics if the background thread has panicked such as while persisting or
690 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
691 pub fn stop(mut self) -> Result<(), std::io::Error> {
692 assert!(self.thread_handle.is_some());
693 self.stop_and_join_thread()
696 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
697 self.stop_thread.store(true, Ordering::Release);
701 fn join_thread(&mut self) -> Result<(), std::io::Error> {
702 match self.thread_handle.take() {
703 Some(handle) => handle.join().unwrap(),
709 #[cfg(feature = "std")]
710 impl Drop for BackgroundProcessor {
712 self.stop_and_join_thread().unwrap();
716 #[cfg(all(feature = "std", test))]
718 use bitcoin::blockdata::block::BlockHeader;
719 use bitcoin::blockdata::constants::genesis_block;
720 use bitcoin::blockdata::locktime::PackedLockTime;
721 use bitcoin::blockdata::transaction::{Transaction, TxOut};
722 use bitcoin::network::constants::Network;
723 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
724 use lightning::chain::{BestBlock, Confirm, chainmonitor};
725 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
726 use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
727 use lightning::chain::transaction::OutPoint;
728 use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
729 use lightning::get_event_msg;
730 use lightning::ln::PaymentHash;
731 use lightning::ln::channelmanager;
732 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
733 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
734 use lightning::ln::msgs::{ChannelMessageHandler, Init};
735 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
736 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
737 use lightning::routing::router::{DefaultRouter, RouteHop};
738 use lightning::routing::scoring::{ChannelUsage, Score};
739 use lightning::util::config::UserConfig;
740 use lightning::util::ser::Writeable;
741 use lightning::util::test_utils;
742 use lightning::util::persist::KVStorePersister;
743 use lightning_persister::FilesystemPersister;
744 use std::collections::VecDeque;
746 use std::path::PathBuf;
747 use std::sync::{Arc, Mutex};
748 use std::sync::mpsc::SyncSender;
749 use std::time::Duration;
750 use bitcoin::hashes::Hash;
751 use bitcoin::TxMerkleNode;
752 use lightning_rapid_gossip_sync::RapidGossipSync;
753 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
755 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
757 #[derive(Clone, Hash, PartialEq, Eq)]
758 struct TestDescriptor{}
759 impl SocketDescriptor for TestDescriptor {
760 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
764 fn disconnect_socket(&mut self) {}
767 type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
769 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
771 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
772 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
775 node: Arc<ChannelManager>,
776 p2p_gossip_sync: PGS,
777 rapid_gossip_sync: RGS,
778 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
779 chain_monitor: Arc<ChainMonitor>,
780 persister: Arc<FilesystemPersister>,
781 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
782 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
783 logger: Arc<test_utils::TestLogger>,
784 best_block: BestBlock,
785 scorer: Arc<Mutex<TestScorer>>,
789 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
790 GossipSync::P2P(self.p2p_gossip_sync.clone())
793 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
794 GossipSync::Rapid(self.rapid_gossip_sync.clone())
797 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
804 let data_dir = self.persister.get_data_dir();
805 match fs::remove_dir_all(data_dir.clone()) {
806 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
813 graph_error: Option<(std::io::ErrorKind, &'static str)>,
814 graph_persistence_notifier: Option<SyncSender<()>>,
815 manager_error: Option<(std::io::ErrorKind, &'static str)>,
816 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
817 filesystem_persister: FilesystemPersister,
821 fn new(data_dir: String) -> Self {
822 let filesystem_persister = FilesystemPersister::new(data_dir);
823 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
826 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
827 Self { graph_error: Some((error, message)), ..self }
830 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
831 Self { graph_persistence_notifier: Some(sender), ..self }
834 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
835 Self { manager_error: Some((error, message)), ..self }
838 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
839 Self { scorer_error: Some((error, message)), ..self }
843 impl KVStorePersister for Persister {
844 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
845 if key == "manager" {
846 if let Some((error, message)) = self.manager_error {
847 return Err(std::io::Error::new(error, message))
851 if key == "network_graph" {
852 if let Some(sender) = &self.graph_persistence_notifier {
853 sender.send(()).unwrap();
856 if let Some((error, message)) = self.graph_error {
857 return Err(std::io::Error::new(error, message))
862 if let Some((error, message)) = self.scorer_error {
863 return Err(std::io::Error::new(error, message))
867 self.filesystem_persister.persist(key, object)
872 event_expectations: Option<VecDeque<TestResult>>,
877 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
878 PaymentSuccess { path: Vec<RouteHop> },
879 ProbeFailure { path: Vec<RouteHop> },
880 ProbeSuccess { path: Vec<RouteHop> },
885 Self { event_expectations: None }
888 fn expect(&mut self, expectation: TestResult) {
889 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
893 impl lightning::util::ser::Writeable for TestScorer {
894 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
897 impl Score for TestScorer {
898 fn channel_penalty_msat(
899 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
900 ) -> u64 { unimplemented!(); }
902 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
903 if let Some(expectations) = &mut self.event_expectations {
904 match expectations.pop_front().unwrap() {
905 TestResult::PaymentFailure { path, short_channel_id } => {
906 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
907 assert_eq!(actual_short_channel_id, short_channel_id);
909 TestResult::PaymentSuccess { path } => {
910 panic!("Unexpected successful payment path: {:?}", path)
912 TestResult::ProbeFailure { path } => {
913 panic!("Unexpected probe failure: {:?}", path)
915 TestResult::ProbeSuccess { path } => {
916 panic!("Unexpected probe success: {:?}", path)
922 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
923 if let Some(expectations) = &mut self.event_expectations {
924 match expectations.pop_front().unwrap() {
925 TestResult::PaymentFailure { path, .. } => {
926 panic!("Unexpected payment path failure: {:?}", path)
928 TestResult::PaymentSuccess { path } => {
929 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
931 TestResult::ProbeFailure { path } => {
932 panic!("Unexpected probe failure: {:?}", path)
934 TestResult::ProbeSuccess { path } => {
935 panic!("Unexpected probe success: {:?}", path)
941 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
942 if let Some(expectations) = &mut self.event_expectations {
943 match expectations.pop_front().unwrap() {
944 TestResult::PaymentFailure { path, .. } => {
945 panic!("Unexpected payment path failure: {:?}", path)
947 TestResult::PaymentSuccess { path } => {
948 panic!("Unexpected payment path success: {:?}", path)
950 TestResult::ProbeFailure { path } => {
951 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
953 TestResult::ProbeSuccess { path } => {
954 panic!("Unexpected probe success: {:?}", path)
959 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
960 if let Some(expectations) = &mut self.event_expectations {
961 match expectations.pop_front().unwrap() {
962 TestResult::PaymentFailure { path, .. } => {
963 panic!("Unexpected payment path failure: {:?}", path)
965 TestResult::PaymentSuccess { path } => {
966 panic!("Unexpected payment path success: {:?}", path)
968 TestResult::ProbeFailure { path } => {
969 panic!("Unexpected probe failure: {:?}", path)
971 TestResult::ProbeSuccess { path } => {
972 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
979 impl Drop for TestScorer {
981 if std::thread::panicking() {
985 if let Some(event_expectations) = &self.event_expectations {
986 if !event_expectations.is_empty() {
987 panic!("Unsatisfied event expectations: {:?}", event_expectations);
993 fn get_full_filepath(filepath: String, filename: String) -> String {
994 let mut path = PathBuf::from(filepath);
996 path.to_str().unwrap().to_string()
999 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
1000 let mut nodes = Vec::new();
1001 for i in 0..num_nodes {
1002 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
1003 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
1004 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1005 let network = Network::Testnet;
1006 let genesis_block = genesis_block(network);
1007 let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1008 let scorer = Arc::new(Mutex::new(TestScorer::new()));
1009 let seed = [i as u8; 32];
1010 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
1011 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
1012 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
1013 let now = Duration::from_secs(genesis_block.header.time as u64);
1014 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1015 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
1016 let best_block = BestBlock::from_network(network);
1017 let params = ChainParameters { network, best_block };
1018 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
1019 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1020 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1021 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
1022 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
1023 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
1027 for i in 0..num_nodes {
1028 for j in (i+1)..num_nodes {
1029 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap();
1030 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap();
1037 macro_rules! open_channel {
1038 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1039 begin_open_channel!($node_a, $node_b, $channel_value);
1040 let events = $node_a.node.get_and_clear_pending_events();
1041 assert_eq!(events.len(), 1);
1042 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1043 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
1048 macro_rules! begin_open_channel {
1049 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1050 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
1051 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1052 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1056 macro_rules! handle_funding_generation_ready {
1057 ($event: expr, $channel_value: expr) => {{
1059 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1060 assert_eq!(channel_value_satoshis, $channel_value);
1061 assert_eq!(user_channel_id, 42);
1063 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1064 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1066 (temporary_channel_id, tx)
1068 _ => panic!("Unexpected event"),
1073 macro_rules! end_open_channel {
1074 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1075 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1076 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1077 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1081 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1082 for i in 1..=depth {
1083 let prev_blockhash = node.best_block.block_hash();
1084 let height = node.best_block.height() + 1;
1085 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1086 let txdata = vec![(0, tx)];
1087 node.best_block = BestBlock::new(header.block_hash(), height);
1090 node.node.transactions_confirmed(&header, &txdata, height);
1091 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1093 x if x == depth => {
1094 node.node.best_block_updated(&header, height);
1095 node.chain_monitor.best_block_updated(&header, height);
1101 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1102 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1106 fn test_background_processor() {
1107 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1108 // updates. Also test that when new updates are available, the manager signals that it needs
1109 // re-persistence and is successfully re-persisted.
1110 let nodes = create_nodes(2, "test_background_processor".to_string());
1112 // Go through the channel creation process so that each node has something to persist. Since
1113 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1114 // avoid a race with processing events.
1115 let tx = open_channel!(nodes[0], nodes[1], 100000);
1117 // Initiate the background processors to watch each node.
1118 let data_dir = nodes[0].persister.get_data_dir();
1119 let persister = Arc::new(Persister::new(data_dir));
1120 let event_handler = |_: _| {};
1121 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1123 macro_rules! check_persisted_data {
1124 ($node: expr, $filepath: expr) => {
1125 let mut expected_bytes = Vec::new();
1127 expected_bytes.clear();
1128 match $node.write(&mut expected_bytes) {
1130 match std::fs::read($filepath) {
1132 if bytes == expected_bytes {
1141 Err(e) => panic!("Unexpected error: {}", e)
1147 // Check that the initial channel manager data is persisted as expected.
1148 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1149 check_persisted_data!(nodes[0].node, filepath.clone());
1152 if !nodes[0].node.get_persistence_condvar_value() { break }
1155 // Force-close the channel.
1156 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1158 // Check that the force-close updates are persisted.
1159 check_persisted_data!(nodes[0].node, filepath.clone());
1161 if !nodes[0].node.get_persistence_condvar_value() { break }
1164 // Check network graph is persisted
1165 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1166 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1168 // Check scorer is persisted
1169 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1170 check_persisted_data!(nodes[0].scorer, filepath.clone());
1172 assert!(bg_processor.stop().is_ok());
1176 fn test_timer_tick_called() {
1177 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1178 // `FRESHNESS_TIMER`.
1179 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1180 let data_dir = nodes[0].persister.get_data_dir();
1181 let persister = Arc::new(Persister::new(data_dir));
1182 let event_handler = |_: _| {};
1183 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1185 let log_entries = nodes[0].logger.lines.lock().unwrap();
1186 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1187 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1188 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1189 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1194 assert!(bg_processor.stop().is_ok());
1198 fn test_channel_manager_persist_error() {
1199 // Test that if we encounter an error during manager persistence, the thread panics.
1200 let nodes = create_nodes(2, "test_persist_error".to_string());
1201 open_channel!(nodes[0], nodes[1], 100000);
1203 let data_dir = nodes[0].persister.get_data_dir();
1204 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1205 let event_handler = |_: _| {};
1206 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1207 match bg_processor.join() {
1208 Ok(_) => panic!("Expected error persisting manager"),
1210 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1211 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1217 fn test_network_graph_persist_error() {
1218 // Test that if we encounter an error during network graph persistence, an error gets returned.
1219 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1220 let data_dir = nodes[0].persister.get_data_dir();
1221 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1222 let event_handler = |_: _| {};
1223 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1225 match bg_processor.stop() {
1226 Ok(_) => panic!("Expected error persisting network graph"),
1228 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1229 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1235 fn test_scorer_persist_error() {
1236 // Test that if we encounter an error during scorer persistence, an error gets returned.
1237 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1238 let data_dir = nodes[0].persister.get_data_dir();
1239 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1240 let event_handler = |_: _| {};
1241 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1243 match bg_processor.stop() {
1244 Ok(_) => panic!("Expected error persisting scorer"),
1246 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1247 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1253 fn test_background_event_handling() {
1254 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1255 let channel_value = 100000;
1256 let data_dir = nodes[0].persister.get_data_dir();
1257 let persister = Arc::new(Persister::new(data_dir.clone()));
1259 // Set up a background event handler for FundingGenerationReady events.
1260 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1261 let event_handler = move |event: Event| match event {
1262 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1263 Event::ChannelReady { .. } => {},
1264 _ => panic!("Unexpected event: {:?}", event),
1267 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1269 // Open a channel and check that the FundingGenerationReady event was handled.
1270 begin_open_channel!(nodes[0], nodes[1], channel_value);
1271 let (temporary_channel_id, funding_tx) = receiver
1272 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1273 .expect("FundingGenerationReady not handled within deadline");
1274 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1276 // Confirm the funding transaction.
1277 confirm_transaction(&mut nodes[0], &funding_tx);
1278 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1279 confirm_transaction(&mut nodes[1], &funding_tx);
1280 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1281 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1282 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1283 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1284 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1286 assert!(bg_processor.stop().is_ok());
1288 // Set up a background event handler for SpendableOutputs events.
1289 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1290 let event_handler = move |event: Event| match event {
1291 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1292 Event::ChannelReady { .. } => {},
1293 Event::ChannelClosed { .. } => {},
1294 _ => panic!("Unexpected event: {:?}", event),
1296 let persister = Arc::new(Persister::new(data_dir));
1297 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1299 // Force close the channel and check that the SpendableOutputs event was handled.
1300 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1301 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1302 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1304 let event = receiver
1305 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1306 .expect("Events not handled within deadline");
1308 Event::SpendableOutputs { .. } => {},
1309 _ => panic!("Unexpected event: {:?}", event),
1312 assert!(bg_processor.stop().is_ok());
1316 fn test_scorer_persistence() {
1317 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1318 let data_dir = nodes[0].persister.get_data_dir();
1319 let persister = Arc::new(Persister::new(data_dir));
1320 let event_handler = |_: _| {};
1321 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1324 let log_entries = nodes[0].logger.lines.lock().unwrap();
1325 let expected_log = "Persisting scorer".to_string();
1326 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1331 assert!(bg_processor.stop().is_ok());
1335 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1336 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1337 let data_dir = nodes[0].persister.get_data_dir();
1338 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1339 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1340 let network_graph = nodes[0].network_graph.clone();
1341 let features = ChannelFeatures::empty();
1342 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1343 .expect("Failed to update channel from partial announcement");
1344 let original_graph_description = network_graph.to_string();
1345 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1346 assert_eq!(network_graph.read_only().channels().len(), 1);
1348 let event_handler = |_: _| {};
1349 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1352 let log_entries = nodes[0].logger.lines.lock().unwrap();
1353 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1354 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1357 // Wait until the loop has gone around at least twice.
1362 let initialization_input = vec![
1363 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1364 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1365 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1366 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1367 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1368 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1369 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1370 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1371 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1372 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1373 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1374 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1375 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1377 nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1379 // this should have added two channels
1380 assert_eq!(network_graph.read_only().channels().len(), 3);
1383 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1384 .expect("Network graph not pruned within deadline");
1386 background_processor.stop().unwrap();
1388 // all channels should now be pruned
1389 assert_eq!(network_graph.read_only().channels().len(), 0);
1393 fn test_payment_path_scoring() {
1394 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1395 // that we update the scorer upon a payment path succeeding (note that the channel must be
1396 // public or else we won't score it).
1397 // Set up a background event handler for FundingGenerationReady events.
1398 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1399 let event_handler = move |event: Event| match event {
1400 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1401 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1402 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1403 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1404 _ => panic!("Unexpected event: {:?}", event),
1407 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1408 let data_dir = nodes[0].persister.get_data_dir();
1409 let persister = Arc::new(Persister::new(data_dir));
1410 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1412 let scored_scid = 4242;
1413 let secp_ctx = Secp256k1::new();
1414 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1415 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1417 let path = vec![RouteHop {
1419 node_features: NodeFeatures::empty(),
1420 short_channel_id: scored_scid,
1421 channel_features: ChannelFeatures::empty(),
1423 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1426 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1427 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1429 payment_hash: PaymentHash([42; 32]),
1430 payment_failed_permanently: false,
1431 failure: PathFailure::OnPath { network_update: None },
1433 short_channel_id: Some(scored_scid),
1435 let event = receiver
1436 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1437 .expect("PaymentPathFailed not handled within deadline");
1439 Event::PaymentPathFailed { .. } => {},
1440 _ => panic!("Unexpected event"),
1443 // Ensure we'll score payments that were explicitly failed back by the destination as
1445 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1446 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1448 payment_hash: PaymentHash([42; 32]),
1449 payment_failed_permanently: true,
1450 failure: PathFailure::OnPath { network_update: None },
1452 short_channel_id: None,
1454 let event = receiver
1455 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1456 .expect("PaymentPathFailed not handled within deadline");
1458 Event::PaymentPathFailed { .. } => {},
1459 _ => panic!("Unexpected event"),
1462 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1463 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1464 payment_id: PaymentId([42; 32]),
1468 let event = receiver
1469 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1470 .expect("PaymentPathSuccessful not handled within deadline");
1472 Event::PaymentPathSuccessful { .. } => {},
1473 _ => panic!("Unexpected event"),
1476 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1477 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1478 payment_id: PaymentId([42; 32]),
1479 payment_hash: PaymentHash([42; 32]),
1482 let event = receiver
1483 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1484 .expect("ProbeSuccessful not handled within deadline");
1486 Event::ProbeSuccessful { .. } => {},
1487 _ => panic!("Unexpected event"),
1490 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1491 nodes[0].node.push_pending_event(Event::ProbeFailed {
1492 payment_id: PaymentId([42; 32]),
1493 payment_hash: PaymentHash([42; 32]),
1495 short_channel_id: Some(scored_scid),
1497 let event = receiver
1498 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1499 .expect("ProbeFailure not handled within deadline");
1501 Event::ProbeFailed { .. } => {},
1502 _ => panic!("Unexpected event"),
1505 assert!(bg_processor.stop().is_ok());