1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
10 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[cfg(not(feature = "std"))]
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::events::{Event, PathFailure};
30 #[cfg(feature = "std")]
31 use lightning::events::{EventHandler, EventsProvider};
32 use lightning::ln::channelmanager::ChannelManager;
33 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
34 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
35 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
36 use lightning::routing::utxo::UtxoLookup;
37 use lightning::routing::router::Router;
38 use lightning::routing::scoring::{Score, WriteableScore};
39 use lightning::util::logger::Logger;
40 use lightning::util::persist::Persister;
41 #[cfg(feature = "std")]
42 use lightning::util::wakers::Sleeper;
43 use lightning_rapid_gossip_sync::RapidGossipSync;
46 use core::time::Duration;
48 #[cfg(feature = "std")]
50 #[cfg(feature = "std")]
51 use core::sync::atomic::{AtomicBool, Ordering};
52 #[cfg(feature = "std")]
53 use std::thread::{self, JoinHandle};
54 #[cfg(feature = "std")]
55 use std::time::Instant;
57 #[cfg(not(feature = "std"))]
60 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
61 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
62 /// responsibilities are:
63 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
64 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
65 /// writing it to disk/backups by invoking the callback given to it at startup.
66 /// [`ChannelManager`] persistence should be done in the background.
67 /// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
68 /// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
69 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
70 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
72 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
73 /// upon as doing so may result in high latency.
77 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
78 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
79 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
80 /// unilateral chain closure fees are at risk.
82 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
83 /// [`Event`]: lightning::events::Event
84 #[cfg(feature = "std")]
85 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
86 pub struct BackgroundProcessor {
87 stop_thread: Arc<AtomicBool>,
88 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
92 const FRESHNESS_TIMER: u64 = 60;
94 const FRESHNESS_TIMER: u64 = 1;
96 #[cfg(all(not(test), not(debug_assertions)))]
97 const PING_TIMER: u64 = 10;
98 /// Signature operations take a lot longer without compiler optimisations.
99 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
100 /// timeout is reached.
101 #[cfg(all(not(test), debug_assertions))]
102 const PING_TIMER: u64 = 30;
104 const PING_TIMER: u64 = 1;
106 /// Prune the network graph of stale entries hourly.
107 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
110 const SCORER_PERSIST_TIMER: u64 = 60 * 60;
112 const SCORER_PERSIST_TIMER: u64 = 1;
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
117 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
120 const REBROADCAST_TIMER: u64 = 30;
122 const REBROADCAST_TIMER: u64 = 1;
124 #[cfg(feature = "futures")]
125 /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
126 const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
127 #[cfg(feature = "futures")]
128 const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
129 min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
131 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
133 P: Deref<Target = P2PGossipSync<G, U, L>>,
134 R: Deref<Target = RapidGossipSync<G, L>>,
135 G: Deref<Target = NetworkGraph<L>>,
139 where U::Target: UtxoLookup, L::Target: Logger {
140 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
142 /// Rapid gossip sync from a trusted server.
149 P: Deref<Target = P2PGossipSync<G, U, L>>,
150 R: Deref<Target = RapidGossipSync<G, L>>,
151 G: Deref<Target = NetworkGraph<L>>,
154 > GossipSync<P, R, G, U, L>
155 where U::Target: UtxoLookup, L::Target: Logger {
156 fn network_graph(&self) -> Option<&G> {
158 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
159 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
160 GossipSync::None => None,
164 fn prunable_network_graph(&self) -> Option<&G> {
166 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
167 GossipSync::Rapid(gossip_sync) => {
168 if gossip_sync.is_initial_sync_complete() {
169 Some(gossip_sync.network_graph())
174 GossipSync::None => None,
179 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
180 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
181 GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
183 U::Target: UtxoLookup,
186 /// Initializes a new [`GossipSync::P2P`] variant.
187 pub fn p2p(gossip_sync: P) -> Self {
188 GossipSync::P2P(gossip_sync)
192 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
193 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
195 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
198 &'a (dyn UtxoLookup + Send + Sync),
204 /// Initializes a new [`GossipSync::Rapid`] variant.
205 pub fn rapid(gossip_sync: R) -> Self {
206 GossipSync::Rapid(gossip_sync)
210 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
213 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
214 &RapidGossipSync<&'a NetworkGraph<L>, L>,
216 &'a (dyn UtxoLookup + Send + Sync),
222 /// Initializes a new [`GossipSync::None`] variant.
223 pub fn none() -> Self {
228 fn handle_network_graph_update<L: Deref>(
229 network_graph: &NetworkGraph<L>, event: &Event
230 ) where L::Target: Logger {
231 if let Event::PaymentPathFailed {
232 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
234 network_graph.handle_network_update(upd);
238 /// Updates scorer based on event and returns whether an update occurred so we can decide whether
240 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
241 scorer: &'a S, event: &Event
243 let mut score = scorer.lock();
245 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
246 score.payment_path_failed(path, *scid);
248 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
249 // Reached if the destination explicitly failed it back. We treat this as a successful probe
250 // because the payment made it all the way to the destination with sufficient liquidity.
251 score.probe_successful(path);
253 Event::PaymentPathSuccessful { path, .. } => {
254 score.payment_path_successful(path);
256 Event::ProbeSuccessful { path, .. } => {
257 score.probe_successful(path);
259 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
260 score.probe_failed(path, *scid);
267 macro_rules! define_run_body {
268 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
269 $channel_manager: ident, $process_channel_manager_events: expr,
270 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
271 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
272 $check_slow_await: expr)
274 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
275 $channel_manager.timer_tick_occurred();
276 log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
277 $chain_monitor.rebroadcast_pending_claims();
279 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
280 let mut last_ping_call = $get_timer(PING_TIMER);
281 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
282 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
283 let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
284 let mut have_pruned = false;
287 $process_channel_manager_events;
288 $process_chain_monitor_events;
290 // Note that the PeerManager::process_events may block on ChannelManager's locks,
291 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
292 // we want to ensure we get into `persist_manager` as quickly as we can, especially
293 // without running the normal event processing above and handing events to users.
295 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
296 // processing a message effectively at any point during this loop. In order to
297 // minimize the time between such processing completing and persisting the updated
298 // ChannelManager, we want to minimize methods blocking on a ChannelManager
299 // generally, and as a fallback place such blocking only immediately before
301 $peer_manager.process_events();
303 // Exit the loop if the background processor was requested to stop.
304 if $loop_exit_check {
305 log_trace!($logger, "Terminating background processor.");
309 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
310 // see `await_start`'s use below.
311 let mut await_start = None;
312 if $check_slow_await { await_start = Some($get_timer(1)); }
313 let updates_available = $await;
314 let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
316 // Exit the loop if the background processor was requested to stop.
317 if $loop_exit_check {
318 log_trace!($logger, "Terminating background processor.");
322 if updates_available {
323 log_trace!($logger, "Persisting ChannelManager...");
324 $persister.persist_manager(&*$channel_manager)?;
325 log_trace!($logger, "Done persisting ChannelManager.");
327 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
328 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
329 $channel_manager.timer_tick_occurred();
330 last_freshness_call = $get_timer(FRESHNESS_TIMER);
333 // On various platforms, we may be starved of CPU cycles for several reasons.
334 // E.g. on iOS, if we've been in the background, we will be entirely paused.
335 // Similarly, if we're on a desktop platform and the device has been asleep, we
336 // may not get any cycles.
337 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
338 // full second, at which point we assume sockets may have been killed (they
339 // appear to be at least on some platforms, even if it has only been a second).
340 // Note that we have to take care to not get here just because user event
341 // processing was slow at the top of the loop. For example, the sample client
342 // may call Bitcoin Core RPCs during event handling, which very often takes
343 // more than a handful of seconds to complete, and shouldn't disconnect all our
345 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
346 $peer_manager.disconnect_all_peers();
347 last_ping_call = $get_timer(PING_TIMER);
348 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
349 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
350 $peer_manager.timer_tick_occurred();
351 last_ping_call = $get_timer(PING_TIMER);
354 // Note that we want to run a graph prune once not long after startup before
355 // falling back to our usual hourly prunes. This avoids short-lived clients never
356 // pruning their network graph. We run once 60 seconds after startup before
357 // continuing our normal cadence. For RGS, since 60 seconds is likely too long,
358 // we prune after an initial sync completes.
359 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
360 let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer);
361 let should_prune = match $gossip_sync {
362 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
363 _ => prune_timer_elapsed,
366 // The network graph must not be pruned while rapid sync completion is pending
367 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
368 #[cfg(feature = "std")] {
369 log_trace!($logger, "Pruning and persisting network graph.");
370 network_graph.remove_stale_channels_and_tracking();
372 #[cfg(not(feature = "std"))] {
373 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
374 log_trace!($logger, "Persisting network graph.");
377 if let Err(e) = $persister.persist_graph(network_graph) {
378 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
383 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
384 last_prune_call = $get_timer(prune_timer);
387 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
388 if let Some(ref scorer) = $scorer {
389 log_trace!($logger, "Persisting scorer");
390 if let Err(e) = $persister.persist_scorer(&scorer) {
391 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
394 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
397 if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
398 log_trace!($logger, "Rebroadcasting monitor's pending claims");
399 $chain_monitor.rebroadcast_pending_claims();
400 last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
404 // After we exit, ensure we persist the ChannelManager one final time - this avoids
405 // some races where users quit while channel updates were in-flight, with
406 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
407 $persister.persist_manager(&*$channel_manager)?;
409 // Persist Scorer on exit
410 if let Some(ref scorer) = $scorer {
411 $persister.persist_scorer(&scorer)?;
414 // Persist NetworkGraph on exit
415 if let Some(network_graph) = $gossip_sync.network_graph() {
416 $persister.persist_graph(network_graph)?;
423 #[cfg(feature = "futures")]
424 pub(crate) mod futures_util {
425 use core::future::Future;
426 use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
428 use core::marker::Unpin;
429 pub(crate) struct Selector<
430 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
436 pub(crate) enum SelectorOutput {
441 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
442 > Future for Selector<A, B, C> {
443 type Output = SelectorOutput;
444 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
445 match Pin::new(&mut self.a).poll(ctx) {
446 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
449 match Pin::new(&mut self.b).poll(ctx) {
450 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
453 match Pin::new(&mut self.c).poll(ctx) {
454 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
461 // If we want to poll a future without an async context to figure out if it has completed or
462 // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
463 // but sadly there's a good bit of boilerplate here.
464 fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
465 fn dummy_waker_action(_: *const ()) { }
467 const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
468 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
469 pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
471 #[cfg(feature = "futures")]
472 use futures_util::{Selector, SelectorOutput, dummy_waker};
473 #[cfg(feature = "futures")]
476 /// Processes background events in a future.
478 /// `sleeper` should return a future which completes in the given amount of time and returns a
479 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
480 /// future which outputs `true`, the loop will exit and this function's future will complete.
481 /// The `sleeper` future is free to return early after it has triggered the exit condition.
483 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
485 /// Requires the `futures` feature. Note that while this method is available without the `std`
486 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
487 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
488 /// manually instead.
490 /// The `mobile_interruptable_platform` flag should be set if we're currently running on a
491 /// mobile device, where we may need to check for interruption of the application regularly. If you
492 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
493 /// are hundreds or thousands of simultaneous process calls running.
495 /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
496 /// could setup `process_events_async` like this:
498 /// # struct MyPersister {}
499 /// # impl lightning::util::persist::KVStorePersister for MyPersister {
500 /// # fn persist<W: lightning::util::ser::Writeable>(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) }
502 /// # struct MyEventHandler {}
503 /// # impl MyEventHandler {
504 /// # async fn handle_event(&self, _: lightning::events::Event) {}
506 /// # #[derive(Eq, PartialEq, Clone, Hash)]
507 /// # struct MySocketDescriptor {}
508 /// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
509 /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
510 /// # fn disconnect_socket(&mut self) {}
512 /// # use std::sync::{Arc, Mutex};
513 /// # use std::sync::atomic::{AtomicBool, Ordering};
514 /// # use lightning_background_processor::{process_events_async, GossipSync};
515 /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
516 /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
517 /// # type MyNodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync;
518 /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
519 /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
520 /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
521 /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyPersister>>;
522 /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger>;
523 /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
524 /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
525 /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
526 /// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
528 /// # async fn setup_background_processing(my_persister: Arc<MyPersister>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
529 /// let background_persister = Arc::clone(&my_persister);
530 /// let background_event_handler = Arc::clone(&my_event_handler);
531 /// let background_chain_mon = Arc::clone(&my_chain_monitor);
532 /// let background_chan_man = Arc::clone(&my_channel_manager);
533 /// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
534 /// let background_peer_man = Arc::clone(&my_peer_manager);
535 /// let background_logger = Arc::clone(&my_logger);
536 /// let background_scorer = Arc::clone(&my_scorer);
538 /// // Setup the sleeper.
539 /// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
541 /// let sleeper = move |d| {
542 /// let mut receiver = stop_receiver.clone();
543 /// Box::pin(async move {
545 /// _ = tokio::time::sleep(d) => false,
546 /// _ = receiver.changed() => true,
551 /// let mobile_interruptable_platform = false;
553 /// let handle = tokio::spawn(async move {
554 /// process_events_async(
555 /// background_persister,
556 /// |e| background_event_handler.handle_event(e),
557 /// background_chain_mon,
558 /// background_chan_man,
559 /// background_gossip_sync,
560 /// background_peer_man,
561 /// background_logger,
562 /// Some(background_scorer),
564 /// mobile_interruptable_platform,
567 /// .expect("Failed to process events");
570 /// // Stop the background processing.
571 /// stop_sender.send(()).unwrap();
572 /// handle.await.unwrap();
575 #[cfg(feature = "futures")]
576 pub async fn process_events_async<
578 UL: 'static + Deref + Send + Sync,
579 CF: 'static + Deref + Send + Sync,
580 CW: 'static + Deref + Send + Sync,
581 T: 'static + Deref + Send + Sync,
582 ES: 'static + Deref + Send + Sync,
583 NS: 'static + Deref + Send + Sync,
584 SP: 'static + Deref + Send + Sync,
585 F: 'static + Deref + Send + Sync,
586 R: 'static + Deref + Send + Sync,
587 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
588 L: 'static + Deref + Send + Sync,
589 P: 'static + Deref + Send + Sync,
590 Descriptor: 'static + SocketDescriptor + Send + Sync,
591 CMH: 'static + Deref + Send + Sync,
592 RMH: 'static + Deref + Send + Sync,
593 OMH: 'static + Deref + Send + Sync,
594 EventHandlerFuture: core::future::Future<Output = ()>,
595 EventHandler: Fn(Event) -> EventHandlerFuture,
596 PS: 'static + Deref + Send,
597 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
598 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
599 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
600 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
601 UMH: 'static + Deref + Send + Sync,
602 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
603 S: 'static + Deref<Target = SC> + Send + Sync,
604 SC: for<'b> WriteableScore<'b>,
605 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
606 Sleeper: Fn(Duration) -> SleepFuture
608 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
609 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
610 sleeper: Sleeper, mobile_interruptable_platform: bool,
611 ) -> Result<(), lightning::io::Error>
613 UL::Target: 'static + UtxoLookup,
614 CF::Target: 'static + chain::Filter,
615 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
616 T::Target: 'static + BroadcasterInterface,
617 ES::Target: 'static + EntropySource,
618 NS::Target: 'static + NodeSigner,
619 SP::Target: 'static + SignerProvider,
620 F::Target: 'static + FeeEstimator,
621 R::Target: 'static + Router,
622 L::Target: 'static + Logger,
623 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
624 CMH::Target: 'static + ChannelMessageHandler,
625 OMH::Target: 'static + OnionMessageHandler,
626 RMH::Target: 'static + RoutingMessageHandler,
627 UMH::Target: 'static + CustomMessageHandler,
628 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
630 let mut should_break = false;
631 let async_event_handler = |event| {
632 let network_graph = gossip_sync.network_graph();
633 let event_handler = &event_handler;
634 let scorer = &scorer;
635 let logger = &logger;
636 let persister = &persister;
638 if let Some(network_graph) = network_graph {
639 handle_network_graph_update(network_graph, &event)
641 if let Some(ref scorer) = scorer {
642 if update_scorer(scorer, &event) {
643 log_trace!(logger, "Persisting scorer after update");
644 if let Err(e) = persister.persist_scorer(&scorer) {
645 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
649 event_handler(event).await;
652 define_run_body!(persister,
653 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
654 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
655 gossip_sync, peer_manager, logger, scorer, should_break, {
657 a: channel_manager.get_persistable_update_future(),
658 b: chain_monitor.get_update_future(),
659 c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
662 SelectorOutput::A => true,
663 SelectorOutput::B => false,
664 SelectorOutput::C(exit) => {
669 }, |t| sleeper(Duration::from_secs(t)),
670 |fut: &mut SleepFuture, _| {
671 let mut waker = dummy_waker();
672 let mut ctx = task::Context::from_waker(&mut waker);
673 match core::pin::Pin::new(fut).poll(&mut ctx) {
674 task::Poll::Ready(exit) => { should_break = exit; true },
675 task::Poll::Pending => false,
677 }, mobile_interruptable_platform)
680 #[cfg(feature = "std")]
681 impl BackgroundProcessor {
682 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
685 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
686 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
687 /// either [`join`] or [`stop`].
689 /// # Data Persistence
691 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
692 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
693 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
694 /// provided implementation.
696 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
697 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
698 /// See the `lightning-persister` crate for LDK's provided implementation.
700 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
701 /// error or call [`join`] and handle any error that may arise. For the latter case,
702 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
706 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
707 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
708 /// functionality implemented by other handlers.
709 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
711 /// # Rapid Gossip Sync
713 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
714 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
715 /// until the [`RapidGossipSync`] instance completes its first sync.
717 /// [top-level documentation]: BackgroundProcessor
718 /// [`join`]: Self::join
719 /// [`stop`]: Self::stop
720 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
721 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
722 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
723 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
724 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
725 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
728 UL: 'static + Deref + Send + Sync,
729 CF: 'static + Deref + Send + Sync,
730 CW: 'static + Deref + Send + Sync,
731 T: 'static + Deref + Send + Sync,
732 ES: 'static + Deref + Send + Sync,
733 NS: 'static + Deref + Send + Sync,
734 SP: 'static + Deref + Send + Sync,
735 F: 'static + Deref + Send + Sync,
736 R: 'static + Deref + Send + Sync,
737 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
738 L: 'static + Deref + Send + Sync,
739 P: 'static + Deref + Send + Sync,
740 Descriptor: 'static + SocketDescriptor + Send + Sync,
741 CMH: 'static + Deref + Send + Sync,
742 OMH: 'static + Deref + Send + Sync,
743 RMH: 'static + Deref + Send + Sync,
744 EH: 'static + EventHandler + Send,
745 PS: 'static + Deref + Send,
746 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
747 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
748 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
749 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
750 UMH: 'static + Deref + Send + Sync,
751 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
752 S: 'static + Deref<Target = SC> + Send + Sync,
753 SC: for <'b> WriteableScore<'b>,
755 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
756 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
759 UL::Target: 'static + UtxoLookup,
760 CF::Target: 'static + chain::Filter,
761 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
762 T::Target: 'static + BroadcasterInterface,
763 ES::Target: 'static + EntropySource,
764 NS::Target: 'static + NodeSigner,
765 SP::Target: 'static + SignerProvider,
766 F::Target: 'static + FeeEstimator,
767 R::Target: 'static + Router,
768 L::Target: 'static + Logger,
769 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
770 CMH::Target: 'static + ChannelMessageHandler,
771 OMH::Target: 'static + OnionMessageHandler,
772 RMH::Target: 'static + RoutingMessageHandler,
773 UMH::Target: 'static + CustomMessageHandler,
774 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
776 let stop_thread = Arc::new(AtomicBool::new(false));
777 let stop_thread_clone = stop_thread.clone();
778 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
779 let event_handler = |event| {
780 let network_graph = gossip_sync.network_graph();
781 if let Some(network_graph) = network_graph {
782 handle_network_graph_update(network_graph, &event)
784 if let Some(ref scorer) = scorer {
785 if update_scorer(scorer, &event) {
786 log_trace!(logger, "Persisting scorer after update");
787 if let Err(e) = persister.persist_scorer(&scorer) {
788 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
792 event_handler.handle_event(event);
794 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
795 channel_manager, channel_manager.process_pending_events(&event_handler),
796 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
797 Sleeper::from_two_futures(
798 channel_manager.get_persistable_update_future(),
799 chain_monitor.get_update_future()
800 ).wait_timeout(Duration::from_millis(100)),
801 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
803 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
806 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
807 /// [`ChannelManager`].
811 /// This function panics if the background thread has panicked such as while persisting or
814 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
815 pub fn join(mut self) -> Result<(), std::io::Error> {
816 assert!(self.thread_handle.is_some());
820 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
821 /// [`ChannelManager`].
825 /// This function panics if the background thread has panicked such as while persisting or
828 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
829 pub fn stop(mut self) -> Result<(), std::io::Error> {
830 assert!(self.thread_handle.is_some());
831 self.stop_and_join_thread()
834 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
835 self.stop_thread.store(true, Ordering::Release);
839 fn join_thread(&mut self) -> Result<(), std::io::Error> {
840 match self.thread_handle.take() {
841 Some(handle) => handle.join().unwrap(),
847 #[cfg(feature = "std")]
848 impl Drop for BackgroundProcessor {
850 self.stop_and_join_thread().unwrap();
854 #[cfg(all(feature = "std", test))]
856 use bitcoin::blockdata::block::BlockHeader;
857 use bitcoin::blockdata::constants::genesis_block;
858 use bitcoin::blockdata::locktime::PackedLockTime;
859 use bitcoin::blockdata::transaction::{Transaction, TxOut};
860 use bitcoin::network::constants::Network;
861 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
862 use lightning::chain::{BestBlock, Confirm, chainmonitor};
863 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
864 use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
865 use lightning::chain::transaction::OutPoint;
866 use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
867 use lightning::{get_event_msg, get_event};
868 use lightning::ln::PaymentHash;
869 use lightning::ln::channelmanager;
870 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
871 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
872 use lightning::ln::msgs::{ChannelMessageHandler, Init};
873 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
874 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
875 use lightning::routing::router::{DefaultRouter, Path, RouteHop};
876 use lightning::routing::scoring::{ChannelUsage, Score};
877 use lightning::util::config::UserConfig;
878 use lightning::util::ser::Writeable;
879 use lightning::util::test_utils;
880 use lightning::util::persist::KVStorePersister;
881 use lightning_persister::FilesystemPersister;
882 use std::collections::VecDeque;
884 use std::path::PathBuf;
885 use std::sync::{Arc, Mutex};
886 use std::sync::mpsc::SyncSender;
887 use std::time::Duration;
888 use bitcoin::hashes::Hash;
889 use bitcoin::TxMerkleNode;
890 use lightning_rapid_gossip_sync::RapidGossipSync;
891 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
893 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
895 #[derive(Clone, Hash, PartialEq, Eq)]
896 struct TestDescriptor{}
897 impl SocketDescriptor for TestDescriptor {
898 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
902 fn disconnect_socket(&mut self) {}
905 type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
907 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
909 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
910 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
913 node: Arc<ChannelManager>,
914 p2p_gossip_sync: PGS,
915 rapid_gossip_sync: RGS,
916 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
917 chain_monitor: Arc<ChainMonitor>,
918 persister: Arc<FilesystemPersister>,
919 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
920 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
921 logger: Arc<test_utils::TestLogger>,
922 best_block: BestBlock,
923 scorer: Arc<Mutex<TestScorer>>,
927 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
928 GossipSync::P2P(self.p2p_gossip_sync.clone())
931 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
932 GossipSync::Rapid(self.rapid_gossip_sync.clone())
935 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
942 let data_dir = self.persister.get_data_dir();
943 match fs::remove_dir_all(data_dir.clone()) {
944 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
951 graph_error: Option<(std::io::ErrorKind, &'static str)>,
952 graph_persistence_notifier: Option<SyncSender<()>>,
953 manager_error: Option<(std::io::ErrorKind, &'static str)>,
954 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
955 filesystem_persister: FilesystemPersister,
959 fn new(data_dir: String) -> Self {
960 let filesystem_persister = FilesystemPersister::new(data_dir);
961 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
964 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
965 Self { graph_error: Some((error, message)), ..self }
968 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
969 Self { graph_persistence_notifier: Some(sender), ..self }
972 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
973 Self { manager_error: Some((error, message)), ..self }
976 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
977 Self { scorer_error: Some((error, message)), ..self }
981 impl KVStorePersister for Persister {
982 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
983 if key == "manager" {
984 if let Some((error, message)) = self.manager_error {
985 return Err(std::io::Error::new(error, message))
989 if key == "network_graph" {
990 if let Some(sender) = &self.graph_persistence_notifier {
991 match sender.send(()) {
993 Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
997 if let Some((error, message)) = self.graph_error {
998 return Err(std::io::Error::new(error, message))
1002 if key == "scorer" {
1003 if let Some((error, message)) = self.scorer_error {
1004 return Err(std::io::Error::new(error, message))
1008 self.filesystem_persister.persist(key, object)
1013 event_expectations: Option<VecDeque<TestResult>>,
1018 PaymentFailure { path: Path, short_channel_id: u64 },
1019 PaymentSuccess { path: Path },
1020 ProbeFailure { path: Path },
1021 ProbeSuccess { path: Path },
1026 Self { event_expectations: None }
1029 fn expect(&mut self, expectation: TestResult) {
1030 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
1034 impl lightning::util::ser::Writeable for TestScorer {
1035 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
1038 impl Score for TestScorer {
1039 fn channel_penalty_msat(
1040 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
1041 ) -> u64 { unimplemented!(); }
1043 fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
1044 if let Some(expectations) = &mut self.event_expectations {
1045 match expectations.pop_front().unwrap() {
1046 TestResult::PaymentFailure { path, short_channel_id } => {
1047 assert_eq!(actual_path, &path);
1048 assert_eq!(actual_short_channel_id, short_channel_id);
1050 TestResult::PaymentSuccess { path } => {
1051 panic!("Unexpected successful payment path: {:?}", path)
1053 TestResult::ProbeFailure { path } => {
1054 panic!("Unexpected probe failure: {:?}", path)
1056 TestResult::ProbeSuccess { path } => {
1057 panic!("Unexpected probe success: {:?}", path)
1063 fn payment_path_successful(&mut self, actual_path: &Path) {
1064 if let Some(expectations) = &mut self.event_expectations {
1065 match expectations.pop_front().unwrap() {
1066 TestResult::PaymentFailure { path, .. } => {
1067 panic!("Unexpected payment path failure: {:?}", path)
1069 TestResult::PaymentSuccess { path } => {
1070 assert_eq!(actual_path, &path);
1072 TestResult::ProbeFailure { path } => {
1073 panic!("Unexpected probe failure: {:?}", path)
1075 TestResult::ProbeSuccess { path } => {
1076 panic!("Unexpected probe success: {:?}", path)
1082 fn probe_failed(&mut self, actual_path: &Path, _: u64) {
1083 if let Some(expectations) = &mut self.event_expectations {
1084 match expectations.pop_front().unwrap() {
1085 TestResult::PaymentFailure { path, .. } => {
1086 panic!("Unexpected payment path failure: {:?}", path)
1088 TestResult::PaymentSuccess { path } => {
1089 panic!("Unexpected payment path success: {:?}", path)
1091 TestResult::ProbeFailure { path } => {
1092 assert_eq!(actual_path, &path);
1094 TestResult::ProbeSuccess { path } => {
1095 panic!("Unexpected probe success: {:?}", path)
1100 fn probe_successful(&mut self, actual_path: &Path) {
1101 if let Some(expectations) = &mut self.event_expectations {
1102 match expectations.pop_front().unwrap() {
1103 TestResult::PaymentFailure { path, .. } => {
1104 panic!("Unexpected payment path failure: {:?}", path)
1106 TestResult::PaymentSuccess { path } => {
1107 panic!("Unexpected payment path success: {:?}", path)
1109 TestResult::ProbeFailure { path } => {
1110 panic!("Unexpected probe failure: {:?}", path)
1112 TestResult::ProbeSuccess { path } => {
1113 assert_eq!(actual_path, &path);
1120 impl Drop for TestScorer {
1121 fn drop(&mut self) {
1122 if std::thread::panicking() {
1126 if let Some(event_expectations) = &self.event_expectations {
1127 if !event_expectations.is_empty() {
1128 panic!("Unsatisfied event expectations: {:?}", event_expectations);
1134 fn get_full_filepath(filepath: String, filename: String) -> String {
1135 let mut path = PathBuf::from(filepath);
1136 path.push(filename);
1137 path.to_str().unwrap().to_string()
1140 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
1141 let network = Network::Testnet;
1142 let mut nodes = Vec::new();
1143 for i in 0..num_nodes {
1144 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
1145 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
1146 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1147 let genesis_block = genesis_block(network);
1148 let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1149 let scorer = Arc::new(Mutex::new(TestScorer::new()));
1150 let seed = [i as u8; 32];
1151 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
1152 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
1153 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
1154 let now = Duration::from_secs(genesis_block.header.time as u64);
1155 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1156 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
1157 let best_block = BestBlock::from_network(network);
1158 let params = ChainParameters { network, best_block };
1159 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
1160 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1161 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1162 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
1163 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
1164 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
1168 for i in 0..num_nodes {
1169 for j in (i+1)..num_nodes {
1170 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap();
1171 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap();
1178 macro_rules! open_channel {
1179 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1180 begin_open_channel!($node_a, $node_b, $channel_value);
1181 let events = $node_a.node.get_and_clear_pending_events();
1182 assert_eq!(events.len(), 1);
1183 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1184 $node_a.node.funding_transaction_generated(&temporary_channel_id, &$node_b.node.get_our_node_id(), tx.clone()).unwrap();
1185 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1186 get_event!($node_b, Event::ChannelPending);
1187 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1188 get_event!($node_a, Event::ChannelPending);
1193 macro_rules! begin_open_channel {
1194 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1195 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
1196 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1197 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1201 macro_rules! handle_funding_generation_ready {
1202 ($event: expr, $channel_value: expr) => {{
1204 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1205 assert_eq!(channel_value_satoshis, $channel_value);
1206 assert_eq!(user_channel_id, 42);
1208 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1209 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1211 (temporary_channel_id, tx)
1213 _ => panic!("Unexpected event"),
1218 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1219 for i in 1..=depth {
1220 let prev_blockhash = node.best_block.block_hash();
1221 let height = node.best_block.height() + 1;
1222 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1223 let txdata = vec![(0, tx)];
1224 node.best_block = BestBlock::new(header.block_hash(), height);
1227 node.node.transactions_confirmed(&header, &txdata, height);
1228 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1230 x if x == depth => {
1231 node.node.best_block_updated(&header, height);
1232 node.chain_monitor.best_block_updated(&header, height);
1238 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1239 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1243 fn test_background_processor() {
1244 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1245 // updates. Also test that when new updates are available, the manager signals that it needs
1246 // re-persistence and is successfully re-persisted.
1247 let nodes = create_nodes(2, "test_background_processor".to_string());
1249 // Go through the channel creation process so that each node has something to persist. Since
1250 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1251 // avoid a race with processing events.
1252 let tx = open_channel!(nodes[0], nodes[1], 100000);
1254 // Initiate the background processors to watch each node.
1255 let data_dir = nodes[0].persister.get_data_dir();
1256 let persister = Arc::new(Persister::new(data_dir));
1257 let event_handler = |_: _| {};
1258 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1260 macro_rules! check_persisted_data {
1261 ($node: expr, $filepath: expr) => {
1262 let mut expected_bytes = Vec::new();
1264 expected_bytes.clear();
1265 match $node.write(&mut expected_bytes) {
1267 match std::fs::read($filepath) {
1269 if bytes == expected_bytes {
1278 Err(e) => panic!("Unexpected error: {}", e)
1284 // Check that the initial channel manager data is persisted as expected.
1285 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1286 check_persisted_data!(nodes[0].node, filepath.clone());
1289 if !nodes[0].node.get_persistence_condvar_value() { break }
1292 // Force-close the channel.
1293 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1295 // Check that the force-close updates are persisted.
1296 check_persisted_data!(nodes[0].node, filepath.clone());
1298 if !nodes[0].node.get_persistence_condvar_value() { break }
1301 // Check network graph is persisted
1302 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1303 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1305 // Check scorer is persisted
1306 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1307 check_persisted_data!(nodes[0].scorer, filepath.clone());
1309 if !std::thread::panicking() {
1310 bg_processor.stop().unwrap();
1315 fn test_timer_tick_called() {
1316 // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1317 // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
1318 // `PeerManager::timer_tick_occurred` every `PING_TIMER`.
1319 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1320 let data_dir = nodes[0].persister.get_data_dir();
1321 let persister = Arc::new(Persister::new(data_dir));
1322 let event_handler = |_: _| {};
1323 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1325 let log_entries = nodes[0].logger.lines.lock().unwrap();
1326 let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1327 let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1328 let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1329 if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() &&
1330 log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() &&
1331 log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() {
1336 if !std::thread::panicking() {
1337 bg_processor.stop().unwrap();
1342 fn test_channel_manager_persist_error() {
1343 // Test that if we encounter an error during manager persistence, the thread panics.
1344 let nodes = create_nodes(2, "test_persist_error".to_string());
1345 open_channel!(nodes[0], nodes[1], 100000);
1347 let data_dir = nodes[0].persister.get_data_dir();
1348 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1349 let event_handler = |_: _| {};
1350 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1351 match bg_processor.join() {
1352 Ok(_) => panic!("Expected error persisting manager"),
1354 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1355 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1361 #[cfg(feature = "futures")]
1362 async fn test_channel_manager_persist_error_async() {
1363 // Test that if we encounter an error during manager persistence, the thread panics.
1364 let nodes = create_nodes(2, "test_persist_error_sync".to_string());
1365 open_channel!(nodes[0], nodes[1], 100000);
1367 let data_dir = nodes[0].persister.get_data_dir();
1368 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1370 let bp_future = super::process_events_async(
1371 persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1372 nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1373 Some(nodes[0].scorer.clone()), move |dur: Duration| {
1374 Box::pin(async move {
1375 tokio::time::sleep(dur).await;
1380 match bp_future.await {
1381 Ok(_) => panic!("Expected error persisting manager"),
1383 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1384 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1390 fn test_network_graph_persist_error() {
1391 // Test that if we encounter an error during network graph persistence, an error gets returned.
1392 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1393 let data_dir = nodes[0].persister.get_data_dir();
1394 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1395 let event_handler = |_: _| {};
1396 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1398 match bg_processor.stop() {
1399 Ok(_) => panic!("Expected error persisting network graph"),
1401 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1402 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1408 fn test_scorer_persist_error() {
1409 // Test that if we encounter an error during scorer persistence, an error gets returned.
1410 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1411 let data_dir = nodes[0].persister.get_data_dir();
1412 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1413 let event_handler = |_: _| {};
1414 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1416 match bg_processor.stop() {
1417 Ok(_) => panic!("Expected error persisting scorer"),
1419 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1420 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1426 fn test_background_event_handling() {
1427 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1428 let channel_value = 100000;
1429 let data_dir = nodes[0].persister.get_data_dir();
1430 let persister = Arc::new(Persister::new(data_dir.clone()));
1432 // Set up a background event handler for FundingGenerationReady events.
1433 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
1434 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1435 let event_handler = move |event: Event| match event {
1436 Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1437 Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1438 Event::ChannelReady { .. } => {},
1439 _ => panic!("Unexpected event: {:?}", event),
1442 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1444 // Open a channel and check that the FundingGenerationReady event was handled.
1445 begin_open_channel!(nodes[0], nodes[1], channel_value);
1446 let (temporary_channel_id, funding_tx) = funding_generation_recv
1447 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1448 .expect("FundingGenerationReady not handled within deadline");
1449 nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), funding_tx.clone()).unwrap();
1450 nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id()));
1451 get_event!(nodes[1], Event::ChannelPending);
1452 nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id()));
1453 let _ = channel_pending_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1454 .expect("ChannelPending not handled within deadline");
1456 // Confirm the funding transaction.
1457 confirm_transaction(&mut nodes[0], &funding_tx);
1458 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1459 confirm_transaction(&mut nodes[1], &funding_tx);
1460 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1461 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1462 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1463 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1464 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1466 if !std::thread::panicking() {
1467 bg_processor.stop().unwrap();
1470 // Set up a background event handler for SpendableOutputs events.
1471 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1472 let event_handler = move |event: Event| match event {
1473 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1474 Event::ChannelReady { .. } => {},
1475 Event::ChannelClosed { .. } => {},
1476 _ => panic!("Unexpected event: {:?}", event),
1478 let persister = Arc::new(Persister::new(data_dir));
1479 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1481 // Force close the channel and check that the SpendableOutputs event was handled.
1482 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1483 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1484 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1486 let event = receiver
1487 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1488 .expect("Events not handled within deadline");
1490 Event::SpendableOutputs { .. } => {},
1491 _ => panic!("Unexpected event: {:?}", event),
1494 if !std::thread::panicking() {
1495 bg_processor.stop().unwrap();
1500 fn test_scorer_persistence() {
1501 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1502 let data_dir = nodes[0].persister.get_data_dir();
1503 let persister = Arc::new(Persister::new(data_dir));
1504 let event_handler = |_: _| {};
1505 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1508 let log_entries = nodes[0].logger.lines.lock().unwrap();
1509 let expected_log = "Persisting scorer".to_string();
1510 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1515 if !std::thread::panicking() {
1516 bg_processor.stop().unwrap();
1520 macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
1521 ($nodes: expr, $receive: expr, $sleep: expr) => {
1522 let features = ChannelFeatures::empty();
1523 $nodes[0].network_graph.add_channel_from_partial_announcement(
1524 42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id()
1525 ).expect("Failed to update channel from partial announcement");
1526 let original_graph_description = $nodes[0].network_graph.to_string();
1527 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1528 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
1532 let log_entries = $nodes[0].logger.lines.lock().unwrap();
1533 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1534 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1537 // Wait until the loop has gone around at least twice.
1542 let initialization_input = vec![
1543 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1544 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1545 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1546 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1547 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1548 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1549 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1550 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1551 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1552 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1553 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1554 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1555 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1557 $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1559 // this should have added two channels and pruned the previous one.
1560 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
1562 $receive.expect("Network graph not pruned within deadline");
1564 // all channels should now be pruned
1565 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
1570 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1571 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1573 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1574 let data_dir = nodes[0].persister.get_data_dir();
1575 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1577 let event_handler = |_: _| {};
1578 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1580 do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
1581 receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
1582 std::thread::sleep(Duration::from_millis(1)));
1584 background_processor.stop().unwrap();
1588 #[cfg(feature = "futures")]
1589 async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
1590 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1592 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async".to_string());
1593 let data_dir = nodes[0].persister.get_data_dir();
1594 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1596 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1597 let bp_future = super::process_events_async(
1598 persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1599 nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1600 Some(nodes[0].scorer.clone()), move |dur: Duration| {
1601 let mut exit_receiver = exit_receiver.clone();
1602 Box::pin(async move {
1604 _ = tokio::time::sleep(dur) => false,
1605 _ = exit_receiver.changed() => true,
1611 let t1 = tokio::spawn(bp_future);
1612 let t2 = tokio::spawn(async move {
1613 do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
1616 tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await;
1617 if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); }
1621 }, tokio::time::sleep(Duration::from_millis(1)).await);
1622 exit_sender.send(()).unwrap();
1624 let (r1, r2) = tokio::join!(t1, t2);
1625 r1.unwrap().unwrap();
1629 macro_rules! do_test_payment_path_scoring {
1630 ($nodes: expr, $receive: expr) => {
1631 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1632 // that we update the scorer upon a payment path succeeding (note that the channel must be
1633 // public or else we won't score it).
1634 // A background event handler for FundingGenerationReady events must be hooked up to a
1635 // running background processor.
1636 let scored_scid = 4242;
1637 let secp_ctx = Secp256k1::new();
1638 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1639 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1641 let path = Path { hops: vec![RouteHop {
1643 node_features: NodeFeatures::empty(),
1644 short_channel_id: scored_scid,
1645 channel_features: ChannelFeatures::empty(),
1647 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1648 }], blinded_tail: None };
1650 $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1651 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1653 payment_hash: PaymentHash([42; 32]),
1654 payment_failed_permanently: false,
1655 failure: PathFailure::OnPath { network_update: None },
1657 short_channel_id: Some(scored_scid),
1659 let event = $receive.expect("PaymentPathFailed not handled within deadline");
1661 Event::PaymentPathFailed { .. } => {},
1662 _ => panic!("Unexpected event"),
1665 // Ensure we'll score payments that were explicitly failed back by the destination as
1667 $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1668 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1670 payment_hash: PaymentHash([42; 32]),
1671 payment_failed_permanently: true,
1672 failure: PathFailure::OnPath { network_update: None },
1674 short_channel_id: None,
1676 let event = $receive.expect("PaymentPathFailed not handled within deadline");
1678 Event::PaymentPathFailed { .. } => {},
1679 _ => panic!("Unexpected event"),
1682 $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1683 $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1684 payment_id: PaymentId([42; 32]),
1688 let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
1690 Event::PaymentPathSuccessful { .. } => {},
1691 _ => panic!("Unexpected event"),
1694 $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1695 $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1696 payment_id: PaymentId([42; 32]),
1697 payment_hash: PaymentHash([42; 32]),
1700 let event = $receive.expect("ProbeSuccessful not handled within deadline");
1702 Event::ProbeSuccessful { .. } => {},
1703 _ => panic!("Unexpected event"),
1706 $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1707 $nodes[0].node.push_pending_event(Event::ProbeFailed {
1708 payment_id: PaymentId([42; 32]),
1709 payment_hash: PaymentHash([42; 32]),
1711 short_channel_id: Some(scored_scid),
1713 let event = $receive.expect("ProbeFailure not handled within deadline");
1715 Event::ProbeFailed { .. } => {},
1716 _ => panic!("Unexpected event"),
1722 fn test_payment_path_scoring() {
1723 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1724 let event_handler = move |event: Event| match event {
1725 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1726 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1727 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1728 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1729 _ => panic!("Unexpected event: {:?}", event),
1732 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1733 let data_dir = nodes[0].persister.get_data_dir();
1734 let persister = Arc::new(Persister::new(data_dir));
1735 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1737 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
1739 if !std::thread::panicking() {
1740 bg_processor.stop().unwrap();
1743 let log_entries = nodes[0].logger.lines.lock().unwrap();
1744 let expected_log = "Persisting scorer after update".to_string();
1745 assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5);
1749 #[cfg(feature = "futures")]
1750 async fn test_payment_path_scoring_async() {
1751 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
1752 let event_handler = move |event: Event| {
1753 let sender_ref = sender.clone();
1756 Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
1757 Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1758 Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1759 Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
1760 _ => panic!("Unexpected event: {:?}", event),
1765 let nodes = create_nodes(1, "test_payment_path_scoring_async".to_string());
1766 let data_dir = nodes[0].persister.get_data_dir();
1767 let persister = Arc::new(Persister::new(data_dir));
1769 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1771 let bp_future = super::process_events_async(
1772 persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1773 nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1774 Some(nodes[0].scorer.clone()), move |dur: Duration| {
1775 let mut exit_receiver = exit_receiver.clone();
1776 Box::pin(async move {
1778 _ = tokio::time::sleep(dur) => false,
1779 _ = exit_receiver.changed() => true,
1784 let t1 = tokio::spawn(bp_future);
1785 let t2 = tokio::spawn(async move {
1786 do_test_payment_path_scoring!(nodes, receiver.recv().await);
1787 exit_sender.send(()).unwrap();
1789 let log_entries = nodes[0].logger.lines.lock().unwrap();
1790 let expected_log = "Persisting scorer after update".to_string();
1791 assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5);
1794 let (r1, r2) = tokio::join!(t1, t2);
1795 r1.unwrap().unwrap();