1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
10 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
16 #[cfg(any(test, feature = "std"))]
19 #[cfg(not(feature = "std"))]
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::events::{Event, PathFailure};
30 #[cfg(feature = "std")]
31 use lightning::events::{EventHandler, EventsProvider};
32 use lightning::ln::channelmanager::ChannelManager;
33 use lightning::ln::msgs::OnionMessageHandler;
34 use lightning::ln::peer_handler::APeerManager;
35 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
36 use lightning::routing::utxo::UtxoLookup;
37 use lightning::routing::router::Router;
38 use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
39 use lightning::util::logger::Logger;
40 use lightning::util::persist::Persister;
41 #[cfg(feature = "std")]
42 use lightning::util::wakers::Sleeper;
43 use lightning_rapid_gossip_sync::RapidGossipSync;
46 use core::time::Duration;
48 #[cfg(feature = "std")]
50 #[cfg(feature = "std")]
51 use core::sync::atomic::{AtomicBool, Ordering};
52 #[cfg(feature = "std")]
53 use std::thread::{self, JoinHandle};
54 #[cfg(feature = "std")]
55 use std::time::Instant;
57 #[cfg(not(feature = "std"))]
60 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
61 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
62 /// responsibilities are:
63 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
64 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
65 /// writing it to disk/backups by invoking the callback given to it at startup.
66 /// [`ChannelManager`] persistence should be done in the background.
67 /// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
68 /// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
69 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
70 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
72 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
73 /// upon as doing so may result in high latency.
77 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
78 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
79 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
80 /// unilateral chain closure fees are at risk.
82 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
83 /// [`Event`]: lightning::events::Event
84 /// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
85 /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
86 #[cfg(feature = "std")]
87 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
88 pub struct BackgroundProcessor {
89 stop_thread: Arc<AtomicBool>,
90 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
94 const FRESHNESS_TIMER: u64 = 60;
96 const FRESHNESS_TIMER: u64 = 1;
98 #[cfg(all(not(test), not(debug_assertions)))]
99 const PING_TIMER: u64 = 10;
100 /// Signature operations take a lot longer without compiler optimisations.
101 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
102 /// timeout is reached.
103 #[cfg(all(not(test), debug_assertions))]
104 const PING_TIMER: u64 = 30;
106 const PING_TIMER: u64 = 1;
109 const ONION_MESSAGE_HANDLER_TIMER: u64 = 10;
111 const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
113 /// Prune the network graph of stale entries hourly.
114 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
117 const SCORER_PERSIST_TIMER: u64 = 60 * 60;
119 const SCORER_PERSIST_TIMER: u64 = 1;
122 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
124 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
127 const REBROADCAST_TIMER: u64 = 30;
129 const REBROADCAST_TIMER: u64 = 1;
131 #[cfg(feature = "futures")]
132 /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
133 const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
134 #[cfg(feature = "futures")]
135 const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
136 min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
138 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
140 P: Deref<Target = P2PGossipSync<G, U, L>>,
141 R: Deref<Target = RapidGossipSync<G, L>>,
142 G: Deref<Target = NetworkGraph<L>>,
146 where U::Target: UtxoLookup, L::Target: Logger {
147 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
149 /// Rapid gossip sync from a trusted server.
156 P: Deref<Target = P2PGossipSync<G, U, L>>,
157 R: Deref<Target = RapidGossipSync<G, L>>,
158 G: Deref<Target = NetworkGraph<L>>,
161 > GossipSync<P, R, G, U, L>
162 where U::Target: UtxoLookup, L::Target: Logger {
163 fn network_graph(&self) -> Option<&G> {
165 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
166 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
167 GossipSync::None => None,
171 fn prunable_network_graph(&self) -> Option<&G> {
173 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
174 GossipSync::Rapid(gossip_sync) => {
175 if gossip_sync.is_initial_sync_complete() {
176 Some(gossip_sync.network_graph())
181 GossipSync::None => None,
186 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
187 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
188 GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
190 U::Target: UtxoLookup,
193 /// Initializes a new [`GossipSync::P2P`] variant.
194 pub fn p2p(gossip_sync: P) -> Self {
195 GossipSync::P2P(gossip_sync)
199 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
200 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
202 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
205 &'a (dyn UtxoLookup + Send + Sync),
211 /// Initializes a new [`GossipSync::Rapid`] variant.
212 pub fn rapid(gossip_sync: R) -> Self {
213 GossipSync::Rapid(gossip_sync)
217 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
220 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
221 &RapidGossipSync<&'a NetworkGraph<L>, L>,
223 &'a (dyn UtxoLookup + Send + Sync),
229 /// Initializes a new [`GossipSync::None`] variant.
230 pub fn none() -> Self {
235 fn handle_network_graph_update<L: Deref>(
236 network_graph: &NetworkGraph<L>, event: &Event
237 ) where L::Target: Logger {
238 if let Event::PaymentPathFailed {
239 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
241 network_graph.handle_network_update(upd);
245 /// Updates scorer based on event and returns whether an update occurred so we can decide whether
247 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
248 scorer: &'a S, event: &Event
251 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
252 let mut score = scorer.write_lock();
253 score.payment_path_failed(path, *scid);
255 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
256 // Reached if the destination explicitly failed it back. We treat this as a successful probe
257 // because the payment made it all the way to the destination with sufficient liquidity.
258 let mut score = scorer.write_lock();
259 score.probe_successful(path);
261 Event::PaymentPathSuccessful { path, .. } => {
262 let mut score = scorer.write_lock();
263 score.payment_path_successful(path);
265 Event::ProbeSuccessful { path, .. } => {
266 let mut score = scorer.write_lock();
267 score.probe_successful(path);
269 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
270 let mut score = scorer.write_lock();
271 score.probe_failed(path, *scid);
278 macro_rules! define_run_body {
280 $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
281 $channel_manager: ident, $process_channel_manager_events: expr,
282 $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
283 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
284 $timer_elapsed: expr, $check_slow_await: expr
286 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
287 $channel_manager.timer_tick_occurred();
288 log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
289 $chain_monitor.rebroadcast_pending_claims();
291 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
292 let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
293 let mut last_ping_call = $get_timer(PING_TIMER);
294 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
295 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
296 let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
297 let mut have_pruned = false;
300 $process_channel_manager_events;
301 $process_chain_monitor_events;
302 $process_onion_message_handler_events;
304 // Note that the PeerManager::process_events may block on ChannelManager's locks,
305 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
306 // we want to ensure we get into `persist_manager` as quickly as we can, especially
307 // without running the normal event processing above and handing events to users.
309 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
310 // processing a message effectively at any point during this loop. In order to
311 // minimize the time between such processing completing and persisting the updated
312 // ChannelManager, we want to minimize methods blocking on a ChannelManager
313 // generally, and as a fallback place such blocking only immediately before
315 $peer_manager.as_ref().process_events();
317 // Exit the loop if the background processor was requested to stop.
318 if $loop_exit_check {
319 log_trace!($logger, "Terminating background processor.");
323 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
324 // see `await_start`'s use below.
325 let mut await_start = None;
326 if $check_slow_await { await_start = Some($get_timer(1)); }
328 let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
330 // Exit the loop if the background processor was requested to stop.
331 if $loop_exit_check {
332 log_trace!($logger, "Terminating background processor.");
336 if $channel_manager.get_and_clear_needs_persistence() {
337 log_trace!($logger, "Persisting ChannelManager...");
338 $persister.persist_manager(&*$channel_manager)?;
339 log_trace!($logger, "Done persisting ChannelManager.");
341 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
342 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
343 $channel_manager.timer_tick_occurred();
344 last_freshness_call = $get_timer(FRESHNESS_TIMER);
346 if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
347 log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
348 $peer_manager.onion_message_handler().timer_tick_occurred();
349 last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
352 // On various platforms, we may be starved of CPU cycles for several reasons.
353 // E.g. on iOS, if we've been in the background, we will be entirely paused.
354 // Similarly, if we're on a desktop platform and the device has been asleep, we
355 // may not get any cycles.
356 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
357 // full second, at which point we assume sockets may have been killed (they
358 // appear to be at least on some platforms, even if it has only been a second).
359 // Note that we have to take care to not get here just because user event
360 // processing was slow at the top of the loop. For example, the sample client
361 // may call Bitcoin Core RPCs during event handling, which very often takes
362 // more than a handful of seconds to complete, and shouldn't disconnect all our
364 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
365 $peer_manager.as_ref().disconnect_all_peers();
366 last_ping_call = $get_timer(PING_TIMER);
367 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
368 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
369 $peer_manager.as_ref().timer_tick_occurred();
370 last_ping_call = $get_timer(PING_TIMER);
373 // Note that we want to run a graph prune once not long after startup before
374 // falling back to our usual hourly prunes. This avoids short-lived clients never
375 // pruning their network graph. We run once 60 seconds after startup before
376 // continuing our normal cadence. For RGS, since 60 seconds is likely too long,
377 // we prune after an initial sync completes.
378 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
379 let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer);
380 let should_prune = match $gossip_sync {
381 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
382 _ => prune_timer_elapsed,
385 // The network graph must not be pruned while rapid sync completion is pending
386 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
387 #[cfg(feature = "std")] {
388 log_trace!($logger, "Pruning and persisting network graph.");
389 network_graph.remove_stale_channels_and_tracking();
391 #[cfg(not(feature = "std"))] {
392 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
393 log_trace!($logger, "Persisting network graph.");
396 if let Err(e) = $persister.persist_graph(network_graph) {
397 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
402 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
403 last_prune_call = $get_timer(prune_timer);
406 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
407 if let Some(ref scorer) = $scorer {
408 log_trace!($logger, "Persisting scorer");
409 if let Err(e) = $persister.persist_scorer(&scorer) {
410 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
413 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
416 if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
417 log_trace!($logger, "Rebroadcasting monitor's pending claims");
418 $chain_monitor.rebroadcast_pending_claims();
419 last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
423 // After we exit, ensure we persist the ChannelManager one final time - this avoids
424 // some races where users quit while channel updates were in-flight, with
425 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
426 $persister.persist_manager(&*$channel_manager)?;
428 // Persist Scorer on exit
429 if let Some(ref scorer) = $scorer {
430 $persister.persist_scorer(&scorer)?;
433 // Persist NetworkGraph on exit
434 if let Some(network_graph) = $gossip_sync.network_graph() {
435 $persister.persist_graph(network_graph)?;
442 #[cfg(feature = "futures")]
443 pub(crate) mod futures_util {
444 use core::future::Future;
445 use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
447 use core::marker::Unpin;
448 pub(crate) struct Selector<
449 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
455 pub(crate) enum SelectorOutput {
460 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
461 > Future for Selector<A, B, C> {
462 type Output = SelectorOutput;
463 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
464 match Pin::new(&mut self.a).poll(ctx) {
465 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
468 match Pin::new(&mut self.b).poll(ctx) {
469 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
472 match Pin::new(&mut self.c).poll(ctx) {
473 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
480 // If we want to poll a future without an async context to figure out if it has completed or
481 // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
482 // but sadly there's a good bit of boilerplate here.
483 fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
484 fn dummy_waker_action(_: *const ()) { }
486 const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
487 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
488 pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
490 #[cfg(feature = "futures")]
491 use futures_util::{Selector, SelectorOutput, dummy_waker};
492 #[cfg(feature = "futures")]
495 /// Processes background events in a future.
497 /// `sleeper` should return a future which completes in the given amount of time and returns a
498 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
499 /// future which outputs `true`, the loop will exit and this function's future will complete.
500 /// The `sleeper` future is free to return early after it has triggered the exit condition.
502 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
504 /// Requires the `futures` feature. Note that while this method is available without the `std`
505 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
506 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
507 /// manually instead.
509 /// The `mobile_interruptable_platform` flag should be set if we're currently running on a
510 /// mobile device, where we may need to check for interruption of the application regularly. If you
511 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
512 /// are hundreds or thousands of simultaneous process calls running.
514 /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
515 /// could setup `process_events_async` like this:
517 /// # use lightning::io;
518 /// # use std::sync::{Arc, RwLock};
519 /// # use std::sync::atomic::{AtomicBool, Ordering};
520 /// # use lightning_background_processor::{process_events_async, GossipSync};
521 /// # struct MyStore {}
522 /// # impl lightning::util::persist::KVStore for MyStore {
523 /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
524 /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
525 /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
526 /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
528 /// # struct MyEventHandler {}
529 /// # impl MyEventHandler {
530 /// # async fn handle_event(&self, _: lightning::events::Event) {}
532 /// # #[derive(Eq, PartialEq, Clone, Hash)]
533 /// # struct MySocketDescriptor {}
534 /// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
535 /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
536 /// # fn disconnect_socket(&mut self) {}
538 /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
539 /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
540 /// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
541 /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
542 /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
543 /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
544 /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
545 /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
546 /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
547 /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
548 /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
549 /// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
551 /// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
552 /// let background_persister = Arc::clone(&my_persister);
553 /// let background_event_handler = Arc::clone(&my_event_handler);
554 /// let background_chain_mon = Arc::clone(&my_chain_monitor);
555 /// let background_chan_man = Arc::clone(&my_channel_manager);
556 /// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
557 /// let background_peer_man = Arc::clone(&my_peer_manager);
558 /// let background_logger = Arc::clone(&my_logger);
559 /// let background_scorer = Arc::clone(&my_scorer);
561 /// // Setup the sleeper.
562 /// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
564 /// let sleeper = move |d| {
565 /// let mut receiver = stop_receiver.clone();
566 /// Box::pin(async move {
568 /// _ = tokio::time::sleep(d) => false,
569 /// _ = receiver.changed() => true,
574 /// let mobile_interruptable_platform = false;
576 /// let handle = tokio::spawn(async move {
577 /// process_events_async(
578 /// background_persister,
579 /// |e| background_event_handler.handle_event(e),
580 /// background_chain_mon,
581 /// background_chan_man,
582 /// background_gossip_sync,
583 /// background_peer_man,
584 /// background_logger,
585 /// Some(background_scorer),
587 /// mobile_interruptable_platform,
590 /// .expect("Failed to process events");
593 /// // Stop the background processing.
594 /// stop_sender.send(()).unwrap();
595 /// handle.await.unwrap();
598 #[cfg(feature = "futures")]
599 pub async fn process_events_async<
601 UL: 'static + Deref + Send + Sync,
602 CF: 'static + Deref + Send + Sync,
603 CW: 'static + Deref + Send + Sync,
604 T: 'static + Deref + Send + Sync,
605 ES: 'static + Deref + Send + Sync,
606 NS: 'static + Deref + Send + Sync,
607 SP: 'static + Deref + Send + Sync,
608 F: 'static + Deref + Send + Sync,
609 R: 'static + Deref + Send + Sync,
610 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
611 L: 'static + Deref + Send + Sync,
612 P: 'static + Deref + Send + Sync,
613 EventHandlerFuture: core::future::Future<Output = ()>,
614 EventHandler: Fn(Event) -> EventHandlerFuture,
615 PS: 'static + Deref + Send,
616 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
617 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
618 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
619 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
620 PM: 'static + Deref + Send + Sync,
621 S: 'static + Deref<Target = SC> + Send + Sync,
622 SC: for<'b> WriteableScore<'b>,
623 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
624 Sleeper: Fn(Duration) -> SleepFuture
626 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
627 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
628 sleeper: Sleeper, mobile_interruptable_platform: bool,
629 ) -> Result<(), lightning::io::Error>
631 UL::Target: 'static + UtxoLookup,
632 CF::Target: 'static + chain::Filter,
633 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
634 T::Target: 'static + BroadcasterInterface,
635 ES::Target: 'static + EntropySource,
636 NS::Target: 'static + NodeSigner,
637 SP::Target: 'static + SignerProvider,
638 F::Target: 'static + FeeEstimator,
639 R::Target: 'static + Router,
640 L::Target: 'static + Logger,
641 P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
642 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
643 PM::Target: APeerManager + Send + Sync,
645 let mut should_break = false;
646 let async_event_handler = |event| {
647 let network_graph = gossip_sync.network_graph();
648 let event_handler = &event_handler;
649 let scorer = &scorer;
650 let logger = &logger;
651 let persister = &persister;
653 if let Some(network_graph) = network_graph {
654 handle_network_graph_update(network_graph, &event)
656 if let Some(ref scorer) = scorer {
657 if update_scorer(scorer, &event) {
658 log_trace!(logger, "Persisting scorer after update");
659 if let Err(e) = persister.persist_scorer(&scorer) {
660 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
664 event_handler(event).await;
668 persister, chain_monitor,
669 chain_monitor.process_pending_events_async(async_event_handler).await,
670 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
671 peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
672 gossip_sync, logger, scorer, should_break, {
674 a: channel_manager.get_event_or_persistence_needed_future(),
675 b: chain_monitor.get_update_future(),
676 c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
679 SelectorOutput::A|SelectorOutput::B => {},
680 SelectorOutput::C(exit) => {
684 }, |t| sleeper(Duration::from_secs(t)),
685 |fut: &mut SleepFuture, _| {
686 let mut waker = dummy_waker();
687 let mut ctx = task::Context::from_waker(&mut waker);
688 match core::pin::Pin::new(fut).poll(&mut ctx) {
689 task::Poll::Ready(exit) => { should_break = exit; true },
690 task::Poll::Pending => false,
692 }, mobile_interruptable_platform
696 #[cfg(feature = "futures")]
697 async fn process_onion_message_handler_events_async<
698 EventHandlerFuture: core::future::Future<Output = ()>,
699 EventHandler: Fn(Event) -> EventHandlerFuture,
700 PM: 'static + Deref + Send + Sync,
702 peer_manager: &PM, handler: EventHandler
705 PM::Target: APeerManager + Send + Sync,
707 use lightning::events::EventsProvider;
709 let events = core::cell::RefCell::new(Vec::new());
710 peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
712 for event in events.into_inner() {
717 #[cfg(feature = "std")]
718 impl BackgroundProcessor {
719 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
722 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
723 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
724 /// either [`join`] or [`stop`].
726 /// # Data Persistence
728 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
729 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
730 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
731 /// provided implementation.
733 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
734 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
735 /// See the `lightning-persister` crate for LDK's provided implementation.
737 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
738 /// error or call [`join`] and handle any error that may arise. For the latter case,
739 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
743 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
744 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
745 /// functionality implemented by other handlers.
746 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
748 /// # Rapid Gossip Sync
750 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
751 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
752 /// until the [`RapidGossipSync`] instance completes its first sync.
754 /// [top-level documentation]: BackgroundProcessor
755 /// [`join`]: Self::join
756 /// [`stop`]: Self::stop
757 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
758 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
759 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
760 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
761 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
762 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
765 UL: 'static + Deref + Send + Sync,
766 CF: 'static + Deref + Send + Sync,
767 CW: 'static + Deref + Send + Sync,
768 T: 'static + Deref + Send + Sync,
769 ES: 'static + Deref + Send + Sync,
770 NS: 'static + Deref + Send + Sync,
771 SP: 'static + Deref + Send + Sync,
772 F: 'static + Deref + Send + Sync,
773 R: 'static + Deref + Send + Sync,
774 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
775 L: 'static + Deref + Send + Sync,
776 P: 'static + Deref + Send + Sync,
777 EH: 'static + EventHandler + Send,
778 PS: 'static + Deref + Send,
779 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
780 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
781 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
782 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
783 PM: 'static + Deref + Send + Sync,
784 S: 'static + Deref<Target = SC> + Send + Sync,
785 SC: for <'b> WriteableScore<'b>,
787 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
788 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
791 UL::Target: 'static + UtxoLookup,
792 CF::Target: 'static + chain::Filter,
793 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
794 T::Target: 'static + BroadcasterInterface,
795 ES::Target: 'static + EntropySource,
796 NS::Target: 'static + NodeSigner,
797 SP::Target: 'static + SignerProvider,
798 F::Target: 'static + FeeEstimator,
799 R::Target: 'static + Router,
800 L::Target: 'static + Logger,
801 P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
802 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
803 PM::Target: APeerManager + Send + Sync,
805 let stop_thread = Arc::new(AtomicBool::new(false));
806 let stop_thread_clone = stop_thread.clone();
807 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
808 let event_handler = |event| {
809 let network_graph = gossip_sync.network_graph();
810 if let Some(network_graph) = network_graph {
811 handle_network_graph_update(network_graph, &event)
813 if let Some(ref scorer) = scorer {
814 if update_scorer(scorer, &event) {
815 log_trace!(logger, "Persisting scorer after update");
816 if let Err(e) = persister.persist_scorer(&scorer) {
817 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
821 event_handler.handle_event(event);
824 persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
825 channel_manager, channel_manager.process_pending_events(&event_handler),
827 peer_manager.onion_message_handler().process_pending_events(&event_handler),
828 gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
829 { Sleeper::from_two_futures(
830 channel_manager.get_event_or_persistence_needed_future(),
831 chain_monitor.get_update_future()
832 ).wait_timeout(Duration::from_millis(100)); },
833 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false
836 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
839 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
840 /// [`ChannelManager`].
844 /// This function panics if the background thread has panicked such as while persisting or
847 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
848 pub fn join(mut self) -> Result<(), std::io::Error> {
849 assert!(self.thread_handle.is_some());
853 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
854 /// [`ChannelManager`].
858 /// This function panics if the background thread has panicked such as while persisting or
861 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
862 pub fn stop(mut self) -> Result<(), std::io::Error> {
863 assert!(self.thread_handle.is_some());
864 self.stop_and_join_thread()
867 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
868 self.stop_thread.store(true, Ordering::Release);
872 fn join_thread(&mut self) -> Result<(), std::io::Error> {
873 match self.thread_handle.take() {
874 Some(handle) => handle.join().unwrap(),
880 #[cfg(feature = "std")]
881 impl Drop for BackgroundProcessor {
883 self.stop_and_join_thread().unwrap();
887 #[cfg(all(feature = "std", test))]
889 use bitcoin::blockdata::constants::{genesis_block, ChainHash};
890 use bitcoin::blockdata::locktime::absolute::LockTime;
891 use bitcoin::blockdata::transaction::{Transaction, TxOut};
892 use bitcoin::network::constants::Network;
893 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
894 use lightning::chain::{BestBlock, Confirm, chainmonitor};
895 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
896 use lightning::sign::{InMemorySigner, KeysManager};
897 use lightning::chain::transaction::OutPoint;
898 use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
899 use lightning::{get_event_msg, get_event};
900 use lightning::ln::PaymentHash;
901 use lightning::ln::channelmanager;
902 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
903 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
904 use lightning::ln::functional_test_utils::*;
905 use lightning::ln::msgs::{ChannelMessageHandler, Init};
906 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
907 use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
908 use lightning::routing::router::{DefaultRouter, Path, RouteHop};
909 use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
910 use lightning::util::config::UserConfig;
911 use lightning::util::ser::Writeable;
912 use lightning::util::test_utils;
913 use lightning::util::persist::{KVStore,
914 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY,
915 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
916 SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY};
917 use lightning_persister::fs_store::FilesystemStore;
918 use std::collections::VecDeque;
920 use std::path::PathBuf;
921 use std::sync::{Arc, Mutex};
922 use std::sync::mpsc::SyncSender;
923 use std::time::Duration;
924 use lightning_rapid_gossip_sync::RapidGossipSync;
925 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
927 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
929 #[derive(Clone, Hash, PartialEq, Eq)]
930 struct TestDescriptor{}
931 impl SocketDescriptor for TestDescriptor {
932 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
936 fn disconnect_socket(&mut self) {}
940 type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
941 #[cfg(not(c_bindings))]
942 type LockingWrapper<T> = Mutex<T>;
944 type ChannelManager =
945 channelmanager::ChannelManager<
947 Arc<test_utils::TestBroadcaster>,
951 Arc<test_utils::TestFeeEstimator>,
953 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
954 Arc<test_utils::TestLogger>,
955 Arc<LockingWrapper<TestScorer>>,
959 Arc<test_utils::TestLogger>>;
961 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemStore>>;
963 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
964 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
967 node: Arc<ChannelManager>,
968 p2p_gossip_sync: PGS,
969 rapid_gossip_sync: RGS,
970 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
971 chain_monitor: Arc<ChainMonitor>,
972 kv_store: Arc<FilesystemStore>,
973 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
974 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
975 logger: Arc<test_utils::TestLogger>,
976 best_block: BestBlock,
977 scorer: Arc<LockingWrapper<TestScorer>>,
981 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
982 GossipSync::P2P(self.p2p_gossip_sync.clone())
985 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
986 GossipSync::Rapid(self.rapid_gossip_sync.clone())
989 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
996 let data_dir = self.kv_store.get_data_dir();
997 match fs::remove_dir_all(data_dir.clone()) {
998 Err(e) => println!("Failed to remove test store directory {}: {}", data_dir.display(), e),
1005 graph_error: Option<(std::io::ErrorKind, &'static str)>,
1006 graph_persistence_notifier: Option<SyncSender<()>>,
1007 manager_error: Option<(std::io::ErrorKind, &'static str)>,
1008 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
1009 kv_store: FilesystemStore,
1013 fn new(data_dir: PathBuf) -> Self {
1014 let kv_store = FilesystemStore::new(data_dir);
1015 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, kv_store }
1018 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1019 Self { graph_error: Some((error, message)), ..self }
1022 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
1023 Self { graph_persistence_notifier: Some(sender), ..self }
1026 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1027 Self { manager_error: Some((error, message)), ..self }
1030 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1031 Self { scorer_error: Some((error, message)), ..self }
1035 impl KVStore for Persister {
1036 fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
1037 self.kv_store.read(primary_namespace, secondary_namespace, key)
1040 fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
1041 if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE &&
1042 secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE &&
1043 key == CHANNEL_MANAGER_PERSISTENCE_KEY
1045 if let Some((error, message)) = self.manager_error {
1046 return Err(std::io::Error::new(error, message))
1050 if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE &&
1051 secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE &&
1052 key == NETWORK_GRAPH_PERSISTENCE_KEY
1054 if let Some(sender) = &self.graph_persistence_notifier {
1055 match sender.send(()) {
1057 Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
1061 if let Some((error, message)) = self.graph_error {
1062 return Err(std::io::Error::new(error, message))
1066 if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE &&
1067 secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE &&
1068 key == SCORER_PERSISTENCE_KEY
1070 if let Some((error, message)) = self.scorer_error {
1071 return Err(std::io::Error::new(error, message))
1075 self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
1078 fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
1079 self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
1082 fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result<Vec<String>> {
1083 self.kv_store.list(primary_namespace, secondary_namespace)
1088 event_expectations: Option<VecDeque<TestResult>>,
1093 PaymentFailure { path: Path, short_channel_id: u64 },
1094 PaymentSuccess { path: Path },
1095 ProbeFailure { path: Path },
1096 ProbeSuccess { path: Path },
1101 Self { event_expectations: None }
1104 fn expect(&mut self, expectation: TestResult) {
1105 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
1109 impl lightning::util::ser::Writeable for TestScorer {
1110 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
1113 impl ScoreLookUp for TestScorer {
1114 type ScoreParams = ();
1115 fn channel_penalty_msat(
1116 &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams
1117 ) -> u64 { unimplemented!(); }
1120 impl ScoreUpdate for TestScorer {
1121 fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
1122 if let Some(expectations) = &mut self.event_expectations {
1123 match expectations.pop_front().unwrap() {
1124 TestResult::PaymentFailure { path, short_channel_id } => {
1125 assert_eq!(actual_path, &path);
1126 assert_eq!(actual_short_channel_id, short_channel_id);
1128 TestResult::PaymentSuccess { path } => {
1129 panic!("Unexpected successful payment path: {:?}", path)
1131 TestResult::ProbeFailure { path } => {
1132 panic!("Unexpected probe failure: {:?}", path)
1134 TestResult::ProbeSuccess { path } => {
1135 panic!("Unexpected probe success: {:?}", path)
1141 fn payment_path_successful(&mut self, actual_path: &Path) {
1142 if let Some(expectations) = &mut self.event_expectations {
1143 match expectations.pop_front().unwrap() {
1144 TestResult::PaymentFailure { path, .. } => {
1145 panic!("Unexpected payment path failure: {:?}", path)
1147 TestResult::PaymentSuccess { path } => {
1148 assert_eq!(actual_path, &path);
1150 TestResult::ProbeFailure { path } => {
1151 panic!("Unexpected probe failure: {:?}", path)
1153 TestResult::ProbeSuccess { path } => {
1154 panic!("Unexpected probe success: {:?}", path)
1160 fn probe_failed(&mut self, actual_path: &Path, _: u64) {
1161 if let Some(expectations) = &mut self.event_expectations {
1162 match expectations.pop_front().unwrap() {
1163 TestResult::PaymentFailure { path, .. } => {
1164 panic!("Unexpected payment path failure: {:?}", path)
1166 TestResult::PaymentSuccess { path } => {
1167 panic!("Unexpected payment path success: {:?}", path)
1169 TestResult::ProbeFailure { path } => {
1170 assert_eq!(actual_path, &path);
1172 TestResult::ProbeSuccess { path } => {
1173 panic!("Unexpected probe success: {:?}", path)
1178 fn probe_successful(&mut self, actual_path: &Path) {
1179 if let Some(expectations) = &mut self.event_expectations {
1180 match expectations.pop_front().unwrap() {
1181 TestResult::PaymentFailure { path, .. } => {
1182 panic!("Unexpected payment path failure: {:?}", path)
1184 TestResult::PaymentSuccess { path } => {
1185 panic!("Unexpected payment path success: {:?}", path)
1187 TestResult::ProbeFailure { path } => {
1188 panic!("Unexpected probe failure: {:?}", path)
1190 TestResult::ProbeSuccess { path } => {
1191 assert_eq!(actual_path, &path);
1199 impl lightning::routing::scoring::Score for TestScorer {}
1201 impl Drop for TestScorer {
1202 fn drop(&mut self) {
1203 if std::thread::panicking() {
1207 if let Some(event_expectations) = &self.event_expectations {
1208 if !event_expectations.is_empty() {
1209 panic!("Unsatisfied event expectations: {:?}", event_expectations);
1215 fn get_full_filepath(filepath: String, filename: String) -> String {
1216 let mut path = PathBuf::from(filepath);
1217 path.push(filename);
1218 path.to_str().unwrap().to_string()
1221 fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
1222 let persist_temp_path = env::temp_dir().join(persist_dir);
1223 let persist_dir = persist_temp_path.to_string_lossy().to_string();
1224 let network = Network::Bitcoin;
1225 let mut nodes = Vec::new();
1226 for i in 0..num_nodes {
1227 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
1228 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
1229 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1230 let genesis_block = genesis_block(network);
1231 let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1232 let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
1233 let seed = [i as u8; 32];
1234 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default()));
1235 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
1236 let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
1237 let now = Duration::from_secs(genesis_block.header.time as u64);
1238 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1239 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone()));
1240 let best_block = BestBlock::from_network(network);
1241 let params = ChainParameters { network, best_block };
1242 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
1243 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1244 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1245 let msg_handler = MessageHandler {
1246 chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))),
1247 route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
1248 onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
1250 let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
1251 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer };
1255 for i in 0..num_nodes {
1256 for j in (i+1)..num_nodes {
1257 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init {
1258 features: nodes[j].node.init_features(), networks: None, remote_network_address: None
1260 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init {
1261 features: nodes[i].node.init_features(), networks: None, remote_network_address: None
1266 (persist_dir, nodes)
1269 macro_rules! open_channel {
1270 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1271 begin_open_channel!($node_a, $node_b, $channel_value);
1272 let events = $node_a.node.get_and_clear_pending_events();
1273 assert_eq!(events.len(), 1);
1274 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1275 $node_a.node.funding_transaction_generated(&temporary_channel_id, &$node_b.node.get_our_node_id(), tx.clone()).unwrap();
1276 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1277 get_event!($node_b, Event::ChannelPending);
1278 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1279 get_event!($node_a, Event::ChannelPending);
1284 macro_rules! begin_open_channel {
1285 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1286 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None).unwrap();
1287 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1288 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1292 macro_rules! handle_funding_generation_ready {
1293 ($event: expr, $channel_value: expr) => {{
1295 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1296 assert_eq!(channel_value_satoshis, $channel_value);
1297 assert_eq!(user_channel_id, 42);
1299 let tx = Transaction { version: 1 as i32, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut {
1300 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1302 (temporary_channel_id, tx)
1304 _ => panic!("Unexpected event"),
1309 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1310 for i in 1..=depth {
1311 let prev_blockhash = node.best_block.block_hash();
1312 let height = node.best_block.height() + 1;
1313 let header = create_dummy_header(prev_blockhash, height);
1314 let txdata = vec![(0, tx)];
1315 node.best_block = BestBlock::new(header.block_hash(), height);
1318 node.node.transactions_confirmed(&header, &txdata, height);
1319 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1321 x if x == depth => {
1322 node.node.best_block_updated(&header, height);
1323 node.chain_monitor.best_block_updated(&header, height);
1329 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1330 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1334 fn test_background_processor() {
1335 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1336 // updates. Also test that when new updates are available, the manager signals that it needs
1337 // re-persistence and is successfully re-persisted.
1338 let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
1340 // Go through the channel creation process so that each node has something to persist. Since
1341 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1342 // avoid a race with processing events.
1343 let tx = open_channel!(nodes[0], nodes[1], 100000);
1345 // Initiate the background processors to watch each node.
1346 let data_dir = nodes[0].kv_store.get_data_dir();
1347 let persister = Arc::new(Persister::new(data_dir));
1348 let event_handler = |_: _| {};
1349 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1351 macro_rules! check_persisted_data {
1352 ($node: expr, $filepath: expr) => {
1353 let mut expected_bytes = Vec::new();
1355 expected_bytes.clear();
1356 match $node.write(&mut expected_bytes) {
1358 match std::fs::read($filepath) {
1360 if bytes == expected_bytes {
1369 Err(e) => panic!("Unexpected error: {}", e)
1375 // Check that the initial channel manager data is persisted as expected.
1376 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
1377 check_persisted_data!(nodes[0].node, filepath.clone());
1380 if !nodes[0].node.get_event_or_persist_condvar_value() { break }
1383 // Force-close the channel.
1384 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1386 // Check that the force-close updates are persisted.
1387 check_persisted_data!(nodes[0].node, filepath.clone());
1389 if !nodes[0].node.get_event_or_persist_condvar_value() { break }
1392 // Check network graph is persisted
1393 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
1394 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1396 // Check scorer is persisted
1397 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
1398 check_persisted_data!(nodes[0].scorer, filepath.clone());
1400 if !std::thread::panicking() {
1401 bg_processor.stop().unwrap();
1406 fn test_timer_tick_called() {
1408 // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1409 // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
1410 // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
1411 // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
1412 let (_, nodes) = create_nodes(1, "test_timer_tick_called");
1413 let data_dir = nodes[0].kv_store.get_data_dir();
1414 let persister = Arc::new(Persister::new(data_dir));
1415 let event_handler = |_: _| {};
1416 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1418 let log_entries = nodes[0].logger.lines.lock().unwrap();
1419 let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1420 let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1421 let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1422 let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
1423 if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
1424 log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
1425 log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
1426 log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
1431 if !std::thread::panicking() {
1432 bg_processor.stop().unwrap();
1437 fn test_channel_manager_persist_error() {
1438 // Test that if we encounter an error during manager persistence, the thread panics.
1439 let (_, nodes) = create_nodes(2, "test_persist_error");
1440 open_channel!(nodes[0], nodes[1], 100000);
1442 let data_dir = nodes[0].kv_store.get_data_dir();
1443 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1444 let event_handler = |_: _| {};
1445 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1446 match bg_processor.join() {
1447 Ok(_) => panic!("Expected error persisting manager"),
1449 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1450 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1456 #[cfg(feature = "futures")]
1457 async fn test_channel_manager_persist_error_async() {
1458 // Test that if we encounter an error during manager persistence, the thread panics.
1459 let (_, nodes) = create_nodes(2, "test_persist_error_sync");
1460 open_channel!(nodes[0], nodes[1], 100000);
1462 let data_dir = nodes[0].kv_store.get_data_dir();
1463 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1465 let bp_future = super::process_events_async(
1466 persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1467 nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1468 Some(nodes[0].scorer.clone()), move |dur: Duration| {
1469 Box::pin(async move {
1470 tokio::time::sleep(dur).await;
1475 match bp_future.await {
1476 Ok(_) => panic!("Expected error persisting manager"),
1478 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1479 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1485 fn test_network_graph_persist_error() {
1486 // Test that if we encounter an error during network graph persistence, an error gets returned.
1487 let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
1488 let data_dir = nodes[0].kv_store.get_data_dir();
1489 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1490 let event_handler = |_: _| {};
1491 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1493 match bg_processor.stop() {
1494 Ok(_) => panic!("Expected error persisting network graph"),
1496 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1497 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1503 fn test_scorer_persist_error() {
1504 // Test that if we encounter an error during scorer persistence, an error gets returned.
1505 let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
1506 let data_dir = nodes[0].kv_store.get_data_dir();
1507 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1508 let event_handler = |_: _| {};
1509 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1511 match bg_processor.stop() {
1512 Ok(_) => panic!("Expected error persisting scorer"),
1514 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1515 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1521 fn test_background_event_handling() {
1522 let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
1523 let channel_value = 100000;
1524 let data_dir = nodes[0].kv_store.get_data_dir();
1525 let persister = Arc::new(Persister::new(data_dir.clone()));
1527 // Set up a background event handler for FundingGenerationReady events.
1528 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
1529 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1530 let event_handler = move |event: Event| match event {
1531 Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1532 Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1533 Event::ChannelReady { .. } => {},
1534 _ => panic!("Unexpected event: {:?}", event),
1537 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1539 // Open a channel and check that the FundingGenerationReady event was handled.
1540 begin_open_channel!(nodes[0], nodes[1], channel_value);
1541 let (temporary_channel_id, funding_tx) = funding_generation_recv
1542 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1543 .expect("FundingGenerationReady not handled within deadline");
1544 nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), funding_tx.clone()).unwrap();
1545 nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id()));
1546 get_event!(nodes[1], Event::ChannelPending);
1547 nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id()));
1548 let _ = channel_pending_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1549 .expect("ChannelPending not handled within deadline");
1551 // Confirm the funding transaction.
1552 confirm_transaction(&mut nodes[0], &funding_tx);
1553 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1554 confirm_transaction(&mut nodes[1], &funding_tx);
1555 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1556 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1557 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1558 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1559 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1561 if !std::thread::panicking() {
1562 bg_processor.stop().unwrap();
1565 // Set up a background event handler for SpendableOutputs events.
1566 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1567 let event_handler = move |event: Event| match event {
1568 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1569 Event::ChannelReady { .. } => {},
1570 Event::ChannelClosed { .. } => {},
1571 _ => panic!("Unexpected event: {:?}", event),
1573 let persister = Arc::new(Persister::new(data_dir));
1574 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1576 // Force close the channel and check that the SpendableOutputs event was handled.
1577 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1578 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1579 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1581 let event = receiver
1582 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1583 .expect("Events not handled within deadline");
1585 Event::SpendableOutputs { .. } => {},
1586 _ => panic!("Unexpected event: {:?}", event),
1589 if !std::thread::panicking() {
1590 bg_processor.stop().unwrap();
1595 fn test_scorer_persistence() {
1596 let (_, nodes) = create_nodes(2, "test_scorer_persistence");
1597 let data_dir = nodes[0].kv_store.get_data_dir();
1598 let persister = Arc::new(Persister::new(data_dir));
1599 let event_handler = |_: _| {};
1600 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1603 let log_entries = nodes[0].logger.lines.lock().unwrap();
1604 let expected_log = "Persisting scorer".to_string();
1605 if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
1610 if !std::thread::panicking() {
1611 bg_processor.stop().unwrap();
1615 macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
1616 ($nodes: expr, $receive: expr, $sleep: expr) => {
1617 let features = ChannelFeatures::empty();
1618 $nodes[0].network_graph.add_channel_from_partial_announcement(
1619 42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id()
1620 ).expect("Failed to update channel from partial announcement");
1621 let original_graph_description = $nodes[0].network_graph.to_string();
1622 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1623 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
1627 let log_entries = $nodes[0].logger.lines.lock().unwrap();
1628 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1629 if *log_entries.get(&("lightning_background_processor", loop_counter))
1632 // Wait until the loop has gone around at least twice.
1637 let initialization_input = vec![
1638 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1639 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1640 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1641 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1642 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1643 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1644 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1645 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1646 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1647 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1648 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1649 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1650 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1652 $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1654 // this should have added two channels and pruned the previous one.
1655 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
1657 $receive.expect("Network graph not pruned within deadline");
1659 // all channels should now be pruned
1660 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
1665 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1666 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1668 let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
1669 let data_dir = nodes[0].kv_store.get_data_dir();
1670 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1672 let event_handler = |_: _| {};
1673 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1675 do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
1676 receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
1677 std::thread::sleep(Duration::from_millis(1)));
1679 background_processor.stop().unwrap();
1683 #[cfg(feature = "futures")]
1684 async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
1685 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1687 let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
1688 let data_dir = nodes[0].kv_store.get_data_dir();
1689 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1691 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1692 let bp_future = super::process_events_async(
1693 persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1694 nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1695 Some(nodes[0].scorer.clone()), move |dur: Duration| {
1696 let mut exit_receiver = exit_receiver.clone();
1697 Box::pin(async move {
1699 _ = tokio::time::sleep(dur) => false,
1700 _ = exit_receiver.changed() => true,
1706 let t1 = tokio::spawn(bp_future);
1707 let t2 = tokio::spawn(async move {
1708 do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
1711 tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await;
1712 if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); }
1716 }, tokio::time::sleep(Duration::from_millis(1)).await);
1717 exit_sender.send(()).unwrap();
1719 let (r1, r2) = tokio::join!(t1, t2);
1720 r1.unwrap().unwrap();
1724 macro_rules! do_test_payment_path_scoring {
1725 ($nodes: expr, $receive: expr) => {
1726 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1727 // that we update the scorer upon a payment path succeeding (note that the channel must be
1728 // public or else we won't score it).
1729 // A background event handler for FundingGenerationReady events must be hooked up to a
1730 // running background processor.
1731 let scored_scid = 4242;
1732 let secp_ctx = Secp256k1::new();
1733 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1734 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1736 let path = Path { hops: vec![RouteHop {
1738 node_features: NodeFeatures::empty(),
1739 short_channel_id: scored_scid,
1740 channel_features: ChannelFeatures::empty(),
1742 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1743 maybe_announced_channel: true,
1744 }], blinded_tail: None };
1746 $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1747 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1749 payment_hash: PaymentHash([42; 32]),
1750 payment_failed_permanently: false,
1751 failure: PathFailure::OnPath { network_update: None },
1753 short_channel_id: Some(scored_scid),
1755 let event = $receive.expect("PaymentPathFailed not handled within deadline");
1757 Event::PaymentPathFailed { .. } => {},
1758 _ => panic!("Unexpected event"),
1761 // Ensure we'll score payments that were explicitly failed back by the destination as
1763 $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
1764 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1766 payment_hash: PaymentHash([42; 32]),
1767 payment_failed_permanently: true,
1768 failure: PathFailure::OnPath { network_update: None },
1770 short_channel_id: None,
1772 let event = $receive.expect("PaymentPathFailed not handled within deadline");
1774 Event::PaymentPathFailed { .. } => {},
1775 _ => panic!("Unexpected event"),
1778 $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
1779 $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1780 payment_id: PaymentId([42; 32]),
1784 let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
1786 Event::PaymentPathSuccessful { .. } => {},
1787 _ => panic!("Unexpected event"),
1790 $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
1791 $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1792 payment_id: PaymentId([42; 32]),
1793 payment_hash: PaymentHash([42; 32]),
1796 let event = $receive.expect("ProbeSuccessful not handled within deadline");
1798 Event::ProbeSuccessful { .. } => {},
1799 _ => panic!("Unexpected event"),
1802 $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
1803 $nodes[0].node.push_pending_event(Event::ProbeFailed {
1804 payment_id: PaymentId([42; 32]),
1805 payment_hash: PaymentHash([42; 32]),
1807 short_channel_id: Some(scored_scid),
1809 let event = $receive.expect("ProbeFailure not handled within deadline");
1811 Event::ProbeFailed { .. } => {},
1812 _ => panic!("Unexpected event"),
1818 fn test_payment_path_scoring() {
1819 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1820 let event_handler = move |event: Event| match event {
1821 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1822 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1823 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1824 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1825 _ => panic!("Unexpected event: {:?}", event),
1828 let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
1829 let data_dir = nodes[0].kv_store.get_data_dir();
1830 let persister = Arc::new(Persister::new(data_dir));
1831 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1833 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
1835 if !std::thread::panicking() {
1836 bg_processor.stop().unwrap();
1839 let log_entries = nodes[0].logger.lines.lock().unwrap();
1840 let expected_log = "Persisting scorer after update".to_string();
1841 assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
1845 #[cfg(feature = "futures")]
1846 async fn test_payment_path_scoring_async() {
1847 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
1848 let event_handler = move |event: Event| {
1849 let sender_ref = sender.clone();
1852 Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
1853 Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1854 Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1855 Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
1856 _ => panic!("Unexpected event: {:?}", event),
1861 let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
1862 let data_dir = nodes[0].kv_store.get_data_dir();
1863 let persister = Arc::new(Persister::new(data_dir));
1865 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1867 let bp_future = super::process_events_async(
1868 persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1869 nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1870 Some(nodes[0].scorer.clone()), move |dur: Duration| {
1871 let mut exit_receiver = exit_receiver.clone();
1872 Box::pin(async move {
1874 _ = tokio::time::sleep(dur) => false,
1875 _ = exit_receiver.changed() => true,
1880 let t1 = tokio::spawn(bp_future);
1881 let t2 = tokio::spawn(async move {
1882 do_test_payment_path_scoring!(nodes, receiver.recv().await);
1883 exit_sender.send(()).unwrap();
1885 let log_entries = nodes[0].logger.lines.lock().unwrap();
1886 let expected_log = "Persisting scorer after update".to_string();
1887 assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
1890 let (r1, r2) = tokio::join!(t1, t2);
1891 r1.unwrap().unwrap();