1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::router::Router;
26 use lightning::routing::scoring::WriteableScore;
27 use lightning::util::events::{Event, EventHandler, EventsProvider};
28 use lightning::util::logger::Logger;
29 use lightning::util::persist::Persister;
30 use lightning_rapid_gossip_sync::RapidGossipSync;
32 use std::sync::atomic::{AtomicBool, Ordering};
34 use std::thread::JoinHandle;
35 use std::time::{Duration, Instant};
38 #[cfg(feature = "futures")]
39 use futures_util::{select_biased, future::FutureExt, task};
41 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
42 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
43 /// responsibilities are:
44 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
45 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
46 /// writing it to disk/backups by invoking the callback given to it at startup.
47 /// [`ChannelManager`] persistence should be done in the background.
48 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
49 /// at the appropriate intervals.
50 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
51 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
53 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
54 /// upon as doing so may result in high latency.
58 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
59 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
60 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
61 /// unilateral chain closure fees are at risk.
63 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
64 /// [`Event`]: lightning::util::events::Event
65 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
66 pub struct BackgroundProcessor {
67 stop_thread: Arc<AtomicBool>,
68 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
72 const FRESHNESS_TIMER: u64 = 60;
74 const FRESHNESS_TIMER: u64 = 1;
76 #[cfg(all(not(test), not(debug_assertions)))]
77 const PING_TIMER: u64 = 10;
78 /// Signature operations take a lot longer without compiler optimisations.
79 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
80 /// timeout is reached.
81 #[cfg(all(not(test), debug_assertions))]
82 const PING_TIMER: u64 = 30;
84 const PING_TIMER: u64 = 1;
86 /// Prune the network graph of stale entries hourly.
87 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
90 const SCORER_PERSIST_TIMER: u64 = 30;
92 const SCORER_PERSIST_TIMER: u64 = 1;
95 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
97 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
99 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
101 P: Deref<Target = P2PGossipSync<G, A, L>>,
102 R: Deref<Target = RapidGossipSync<G, L>>,
103 G: Deref<Target = NetworkGraph<L>>,
107 where A::Target: chain::Access, L::Target: Logger {
108 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
110 /// Rapid gossip sync from a trusted server.
117 P: Deref<Target = P2PGossipSync<G, A, L>>,
118 R: Deref<Target = RapidGossipSync<G, L>>,
119 G: Deref<Target = NetworkGraph<L>>,
122 > GossipSync<P, R, G, A, L>
123 where A::Target: chain::Access, L::Target: Logger {
124 fn network_graph(&self) -> Option<&G> {
126 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
128 GossipSync::None => None,
132 fn prunable_network_graph(&self) -> Option<&G> {
134 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
135 GossipSync::Rapid(gossip_sync) => {
136 if gossip_sync.is_initial_sync_complete() {
137 Some(gossip_sync.network_graph())
142 GossipSync::None => None,
147 /// (C-not exported) as the bindings concretize everything and have constructors for us
148 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
149 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
151 A::Target: chain::Access,
154 /// Initializes a new [`GossipSync::P2P`] variant.
155 pub fn p2p(gossip_sync: P) -> Self {
156 GossipSync::P2P(gossip_sync)
160 /// (C-not exported) as the bindings concretize everything and have constructors for us
161 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
163 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
166 &'a (dyn chain::Access + Send + Sync),
172 /// Initializes a new [`GossipSync::Rapid`] variant.
173 pub fn rapid(gossip_sync: R) -> Self {
174 GossipSync::Rapid(gossip_sync)
178 /// (C-not exported) as the bindings concretize everything and have constructors for us
181 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
182 &RapidGossipSync<&'a NetworkGraph<L>, L>,
184 &'a (dyn chain::Access + Send + Sync),
190 /// Initializes a new [`GossipSync::None`] variant.
191 pub fn none() -> Self {
196 fn handle_network_graph_update<L: Deref>(
197 network_graph: &NetworkGraph<L>, event: &Event
198 ) where L::Target: Logger {
199 if let Event::PaymentPathFailed { ref network_update, .. } = event {
200 if let Some(network_update) = network_update {
201 network_graph.handle_network_update(&network_update);
206 macro_rules! define_run_body {
207 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
208 $channel_manager: ident, $process_channel_manager_events: expr,
209 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
210 $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
212 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
213 $channel_manager.timer_tick_occurred();
215 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
216 let mut last_ping_call = $get_timer(PING_TIMER);
217 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
218 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
219 let mut have_pruned = false;
222 $process_channel_manager_events;
223 $process_chain_monitor_events;
225 // Note that the PeerManager::process_events may block on ChannelManager's locks,
226 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
227 // we want to ensure we get into `persist_manager` as quickly as we can, especially
228 // without running the normal event processing above and handing events to users.
230 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
231 // processing a message effectively at any point during this loop. In order to
232 // minimize the time between such processing completing and persisting the updated
233 // ChannelManager, we want to minimize methods blocking on a ChannelManager
234 // generally, and as a fallback place such blocking only immediately before
236 $peer_manager.process_events();
238 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
239 // see `await_start`'s use below.
240 let mut await_start = $get_timer(1);
241 let updates_available = $await;
242 let await_slow = $timer_elapsed(&mut await_start, 1);
244 if updates_available {
245 log_trace!($logger, "Persisting ChannelManager...");
246 $persister.persist_manager(&*$channel_manager)?;
247 log_trace!($logger, "Done persisting ChannelManager.");
249 // Exit the loop if the background processor was requested to stop.
250 if $loop_exit_check {
251 log_trace!($logger, "Terminating background processor.");
254 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
255 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
256 $channel_manager.timer_tick_occurred();
257 last_freshness_call = $get_timer(FRESHNESS_TIMER);
260 // On various platforms, we may be starved of CPU cycles for several reasons.
261 // E.g. on iOS, if we've been in the background, we will be entirely paused.
262 // Similarly, if we're on a desktop platform and the device has been asleep, we
263 // may not get any cycles.
264 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
265 // full second, at which point we assume sockets may have been killed (they
266 // appear to be at least on some platforms, even if it has only been a second).
267 // Note that we have to take care to not get here just because user event
268 // processing was slow at the top of the loop. For example, the sample client
269 // may call Bitcoin Core RPCs during event handling, which very often takes
270 // more than a handful of seconds to complete, and shouldn't disconnect all our
272 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
273 $peer_manager.disconnect_all_peers();
274 last_ping_call = $get_timer(PING_TIMER);
275 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
276 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
277 $peer_manager.timer_tick_occurred();
278 last_ping_call = $get_timer(PING_TIMER);
281 // Note that we want to run a graph prune once not long after startup before
282 // falling back to our usual hourly prunes. This avoids short-lived clients never
283 // pruning their network graph. We run once 60 seconds after startup before
284 // continuing our normal cadence.
285 if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
286 // The network graph must not be pruned while rapid sync completion is pending
287 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
288 log_trace!($logger, "Pruning and persisting network graph.");
289 network_graph.remove_stale_channels_and_tracking();
291 if let Err(e) = $persister.persist_graph(network_graph) {
292 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
295 last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
300 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
301 if let Some(ref scorer) = $scorer {
302 log_trace!($logger, "Persisting scorer");
303 if let Err(e) = $persister.persist_scorer(&scorer) {
304 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
307 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
311 // After we exit, ensure we persist the ChannelManager one final time - this avoids
312 // some races where users quit while channel updates were in-flight, with
313 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
314 $persister.persist_manager(&*$channel_manager)?;
316 // Persist Scorer on exit
317 if let Some(ref scorer) = $scorer {
318 $persister.persist_scorer(&scorer)?;
321 // Persist NetworkGraph on exit
322 if let Some(network_graph) = $gossip_sync.network_graph() {
323 $persister.persist_graph(network_graph)?;
330 /// Processes background events in a future.
332 /// `sleeper` should return a future which completes in the given amount of time and returns a
333 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
334 /// future which outputs true, the loop will exit and this function's future will complete.
336 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
337 #[cfg(feature = "futures")]
338 pub async fn process_events_async<
340 CA: 'static + Deref + Send + Sync,
341 CF: 'static + Deref + Send + Sync,
342 CW: 'static + Deref + Send + Sync,
343 T: 'static + Deref + Send + Sync,
344 ES: 'static + Deref + Send + Sync,
345 NS: 'static + Deref + Send + Sync,
346 SP: 'static + Deref + Send + Sync,
347 F: 'static + Deref + Send + Sync,
348 R: 'static + Deref + Send + Sync,
349 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
350 L: 'static + Deref + Send + Sync,
351 P: 'static + Deref + Send + Sync,
352 Descriptor: 'static + SocketDescriptor + Send + Sync,
353 CMH: 'static + Deref + Send + Sync,
354 RMH: 'static + Deref + Send + Sync,
355 OMH: 'static + Deref + Send + Sync,
356 EventHandlerFuture: core::future::Future<Output = ()>,
357 EventHandler: Fn(Event) -> EventHandlerFuture,
358 PS: 'static + Deref + Send,
359 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
360 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
361 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
362 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
363 UMH: 'static + Deref + Send + Sync,
364 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
365 S: 'static + Deref<Target = SC> + Send + Sync,
366 SC: WriteableScore<'a>,
367 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
368 Sleeper: Fn(Duration) -> SleepFuture
370 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
371 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
373 ) -> Result<(), std::io::Error>
375 CA::Target: 'static + chain::Access,
376 CF::Target: 'static + chain::Filter,
377 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
378 T::Target: 'static + BroadcasterInterface,
379 ES::Target: 'static + EntropySource,
380 NS::Target: 'static + NodeSigner,
381 SP::Target: 'static + SignerProvider,
382 F::Target: 'static + FeeEstimator,
383 R::Target: 'static + Router,
384 L::Target: 'static + Logger,
385 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
386 CMH::Target: 'static + ChannelMessageHandler,
387 OMH::Target: 'static + OnionMessageHandler,
388 RMH::Target: 'static + RoutingMessageHandler,
389 UMH::Target: 'static + CustomMessageHandler,
390 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
392 let mut should_break = true;
393 let async_event_handler = |event| {
394 let network_graph = gossip_sync.network_graph();
395 let event_handler = &event_handler;
397 if let Some(network_graph) = network_graph {
398 handle_network_graph_update(network_graph, &event)
400 event_handler(event).await;
403 define_run_body!(persister,
404 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
405 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
406 gossip_sync, peer_manager, logger, scorer, should_break, {
408 _ = channel_manager.get_persistable_update_future().fuse() => true,
409 exit = sleeper(Duration::from_millis(100)).fuse() => {
414 }, |t| sleeper(Duration::from_secs(t)),
415 |fut: &mut SleepFuture, _| {
416 let mut waker = task::noop_waker();
417 let mut ctx = task::Context::from_waker(&mut waker);
418 core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
422 impl BackgroundProcessor {
423 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
426 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
427 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
428 /// either [`join`] or [`stop`].
430 /// # Data Persistence
432 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
433 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
434 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
435 /// provided implementation.
437 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
438 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
439 /// See the `lightning-persister` crate for LDK's provided implementation.
441 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
442 /// error or call [`join`] and handle any error that may arise. For the latter case,
443 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
447 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
448 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
449 /// functionality implemented by other handlers.
450 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
452 /// # Rapid Gossip Sync
454 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
455 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
456 /// until the [`RapidGossipSync`] instance completes its first sync.
458 /// [top-level documentation]: BackgroundProcessor
459 /// [`join`]: Self::join
460 /// [`stop`]: Self::stop
461 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
462 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
463 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
464 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
465 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
466 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
469 CA: 'static + Deref + Send + Sync,
470 CF: 'static + Deref + Send + Sync,
471 CW: 'static + Deref + Send + Sync,
472 T: 'static + Deref + Send + Sync,
473 ES: 'static + Deref + Send + Sync,
474 NS: 'static + Deref + Send + Sync,
475 SP: 'static + Deref + Send + Sync,
476 F: 'static + Deref + Send + Sync,
477 R: 'static + Deref + Send + Sync,
478 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
479 L: 'static + Deref + Send + Sync,
480 P: 'static + Deref + Send + Sync,
481 Descriptor: 'static + SocketDescriptor + Send + Sync,
482 CMH: 'static + Deref + Send + Sync,
483 OMH: 'static + Deref + Send + Sync,
484 RMH: 'static + Deref + Send + Sync,
485 EH: 'static + EventHandler + Send,
486 PS: 'static + Deref + Send,
487 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
488 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
489 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
490 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
491 UMH: 'static + Deref + Send + Sync,
492 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
493 S: 'static + Deref<Target = SC> + Send + Sync,
494 SC: WriteableScore<'a>,
496 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
497 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
500 CA::Target: 'static + chain::Access,
501 CF::Target: 'static + chain::Filter,
502 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
503 T::Target: 'static + BroadcasterInterface,
504 ES::Target: 'static + EntropySource,
505 NS::Target: 'static + NodeSigner,
506 SP::Target: 'static + SignerProvider,
507 F::Target: 'static + FeeEstimator,
508 R::Target: 'static + Router,
509 L::Target: 'static + Logger,
510 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
511 CMH::Target: 'static + ChannelMessageHandler,
512 OMH::Target: 'static + OnionMessageHandler,
513 RMH::Target: 'static + RoutingMessageHandler,
514 UMH::Target: 'static + CustomMessageHandler,
515 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
517 let stop_thread = Arc::new(AtomicBool::new(false));
518 let stop_thread_clone = stop_thread.clone();
519 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
520 let event_handler = |event| {
521 let network_graph = gossip_sync.network_graph();
522 if let Some(network_graph) = network_graph {
523 handle_network_graph_update(network_graph, &event)
525 event_handler.handle_event(event);
527 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
528 channel_manager, channel_manager.process_pending_events(&event_handler),
529 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
530 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
531 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
533 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
536 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
537 /// [`ChannelManager`].
541 /// This function panics if the background thread has panicked such as while persisting or
544 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
545 pub fn join(mut self) -> Result<(), std::io::Error> {
546 assert!(self.thread_handle.is_some());
550 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
551 /// [`ChannelManager`].
555 /// This function panics if the background thread has panicked such as while persisting or
558 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
559 pub fn stop(mut self) -> Result<(), std::io::Error> {
560 assert!(self.thread_handle.is_some());
561 self.stop_and_join_thread()
564 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
565 self.stop_thread.store(true, Ordering::Release);
569 fn join_thread(&mut self) -> Result<(), std::io::Error> {
570 match self.thread_handle.take() {
571 Some(handle) => handle.join().unwrap(),
577 impl Drop for BackgroundProcessor {
579 self.stop_and_join_thread().unwrap();
585 use bitcoin::blockdata::block::BlockHeader;
586 use bitcoin::blockdata::constants::genesis_block;
587 use bitcoin::blockdata::locktime::PackedLockTime;
588 use bitcoin::blockdata::transaction::{Transaction, TxOut};
589 use bitcoin::network::constants::Network;
590 use lightning::chain::{BestBlock, Confirm, chainmonitor};
591 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
592 use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysManager, NodeSigner};
593 use lightning::chain::transaction::OutPoint;
594 use lightning::get_event_msg;
595 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
596 use lightning::ln::features::ChannelFeatures;
597 use lightning::ln::msgs::{ChannelMessageHandler, Init};
598 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
599 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
600 use lightning::routing::router::DefaultRouter;
601 use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
602 use lightning::util::config::UserConfig;
603 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
604 use lightning::util::ser::Writeable;
605 use lightning::util::test_utils;
606 use lightning::util::persist::KVStorePersister;
607 use lightning_invoice::payment::{InvoicePayer, Retry};
608 use lightning_persister::FilesystemPersister;
610 use std::path::PathBuf;
611 use std::sync::{Arc, Mutex};
612 use std::sync::mpsc::SyncSender;
613 use std::time::Duration;
614 use bitcoin::hashes::Hash;
615 use bitcoin::TxMerkleNode;
616 use lightning_rapid_gossip_sync::RapidGossipSync;
617 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
619 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
621 #[derive(Clone, Hash, PartialEq, Eq)]
622 struct TestDescriptor{}
623 impl SocketDescriptor for TestDescriptor {
624 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
628 fn disconnect_socket(&mut self) {}
631 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
633 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
634 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
637 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
638 p2p_gossip_sync: PGS,
639 rapid_gossip_sync: RGS,
640 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
641 chain_monitor: Arc<ChainMonitor>,
642 persister: Arc<FilesystemPersister>,
643 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
644 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
645 logger: Arc<test_utils::TestLogger>,
646 best_block: BestBlock,
647 scorer: Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
651 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
652 GossipSync::P2P(self.p2p_gossip_sync.clone())
655 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
656 GossipSync::Rapid(self.rapid_gossip_sync.clone())
659 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
666 let data_dir = self.persister.get_data_dir();
667 match fs::remove_dir_all(data_dir.clone()) {
668 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
675 graph_error: Option<(std::io::ErrorKind, &'static str)>,
676 graph_persistence_notifier: Option<SyncSender<()>>,
677 manager_error: Option<(std::io::ErrorKind, &'static str)>,
678 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
679 filesystem_persister: FilesystemPersister,
683 fn new(data_dir: String) -> Self {
684 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
685 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
688 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
689 Self { graph_error: Some((error, message)), ..self }
692 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
693 Self { graph_persistence_notifier: Some(sender), ..self }
696 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
697 Self { manager_error: Some((error, message)), ..self }
700 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
701 Self { scorer_error: Some((error, message)), ..self }
705 impl KVStorePersister for Persister {
706 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
707 if key == "manager" {
708 if let Some((error, message)) = self.manager_error {
709 return Err(std::io::Error::new(error, message))
713 if key == "network_graph" {
714 if let Some(sender) = &self.graph_persistence_notifier {
715 sender.send(()).unwrap();
718 if let Some((error, message)) = self.graph_error {
719 return Err(std::io::Error::new(error, message))
724 if let Some((error, message)) = self.scorer_error {
725 return Err(std::io::Error::new(error, message))
729 self.filesystem_persister.persist(key, object)
733 fn get_full_filepath(filepath: String, filename: String) -> String {
734 let mut path = PathBuf::from(filepath);
736 path.to_str().unwrap().to_string()
739 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
740 let mut nodes = Vec::new();
741 for i in 0..num_nodes {
742 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
743 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
744 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
745 let network = Network::Testnet;
746 let genesis_block = genesis_block(network);
747 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
748 let params = ProbabilisticScoringParameters::default();
749 let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone())));
750 let seed = [i as u8; 32];
751 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
752 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
753 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
754 let now = Duration::from_secs(genesis_block.header.time as u64);
755 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
756 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
757 let best_block = BestBlock::from_genesis(network);
758 let params = ChainParameters { network, best_block };
759 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
760 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
761 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
762 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
763 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
764 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
768 for i in 0..num_nodes {
769 for j in (i+1)..num_nodes {
770 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
771 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
778 macro_rules! open_channel {
779 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
780 begin_open_channel!($node_a, $node_b, $channel_value);
781 let events = $node_a.node.get_and_clear_pending_events();
782 assert_eq!(events.len(), 1);
783 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
784 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
789 macro_rules! begin_open_channel {
790 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
791 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
792 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), $node_a.node.init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
793 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), $node_b.node.init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
797 macro_rules! handle_funding_generation_ready {
798 ($event: expr, $channel_value: expr) => {{
800 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
801 assert_eq!(channel_value_satoshis, $channel_value);
802 assert_eq!(user_channel_id, 42);
804 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
805 value: channel_value_satoshis, script_pubkey: output_script.clone(),
807 (temporary_channel_id, tx)
809 _ => panic!("Unexpected event"),
814 macro_rules! end_open_channel {
815 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
816 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
817 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
818 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
822 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
824 let prev_blockhash = node.best_block.block_hash();
825 let height = node.best_block.height() + 1;
826 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
827 let txdata = vec![(0, tx)];
828 node.best_block = BestBlock::new(header.block_hash(), height);
831 node.node.transactions_confirmed(&header, &txdata, height);
832 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
835 node.node.best_block_updated(&header, height);
836 node.chain_monitor.best_block_updated(&header, height);
842 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
843 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
847 fn test_background_processor() {
848 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
849 // updates. Also test that when new updates are available, the manager signals that it needs
850 // re-persistence and is successfully re-persisted.
851 let nodes = create_nodes(2, "test_background_processor".to_string());
853 // Go through the channel creation process so that each node has something to persist. Since
854 // open_channel consumes events, it must complete before starting BackgroundProcessor to
855 // avoid a race with processing events.
856 let tx = open_channel!(nodes[0], nodes[1], 100000);
858 // Initiate the background processors to watch each node.
859 let data_dir = nodes[0].persister.get_data_dir();
860 let persister = Arc::new(Persister::new(data_dir));
861 let event_handler = |_: _| {};
862 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
864 macro_rules! check_persisted_data {
865 ($node: expr, $filepath: expr) => {
866 let mut expected_bytes = Vec::new();
868 expected_bytes.clear();
869 match $node.write(&mut expected_bytes) {
871 match std::fs::read($filepath) {
873 if bytes == expected_bytes {
882 Err(e) => panic!("Unexpected error: {}", e)
888 // Check that the initial channel manager data is persisted as expected.
889 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
890 check_persisted_data!(nodes[0].node, filepath.clone());
893 if !nodes[0].node.get_persistence_condvar_value() { break }
896 // Force-close the channel.
897 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
899 // Check that the force-close updates are persisted.
900 check_persisted_data!(nodes[0].node, filepath.clone());
902 if !nodes[0].node.get_persistence_condvar_value() { break }
905 // Check network graph is persisted
906 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
907 check_persisted_data!(nodes[0].network_graph, filepath.clone());
909 // Check scorer is persisted
910 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
911 check_persisted_data!(nodes[0].scorer, filepath.clone());
913 assert!(bg_processor.stop().is_ok());
917 fn test_timer_tick_called() {
918 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
919 // `FRESHNESS_TIMER`.
920 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
921 let data_dir = nodes[0].persister.get_data_dir();
922 let persister = Arc::new(Persister::new(data_dir));
923 let event_handler = |_: _| {};
924 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
926 let log_entries = nodes[0].logger.lines.lock().unwrap();
927 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
928 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
929 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
930 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
935 assert!(bg_processor.stop().is_ok());
939 fn test_channel_manager_persist_error() {
940 // Test that if we encounter an error during manager persistence, the thread panics.
941 let nodes = create_nodes(2, "test_persist_error".to_string());
942 open_channel!(nodes[0], nodes[1], 100000);
944 let data_dir = nodes[0].persister.get_data_dir();
945 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
946 let event_handler = |_: _| {};
947 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
948 match bg_processor.join() {
949 Ok(_) => panic!("Expected error persisting manager"),
951 assert_eq!(e.kind(), std::io::ErrorKind::Other);
952 assert_eq!(e.get_ref().unwrap().to_string(), "test");
958 fn test_network_graph_persist_error() {
959 // Test that if we encounter an error during network graph persistence, an error gets returned.
960 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
961 let data_dir = nodes[0].persister.get_data_dir();
962 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
963 let event_handler = |_: _| {};
964 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
966 match bg_processor.stop() {
967 Ok(_) => panic!("Expected error persisting network graph"),
969 assert_eq!(e.kind(), std::io::ErrorKind::Other);
970 assert_eq!(e.get_ref().unwrap().to_string(), "test");
976 fn test_scorer_persist_error() {
977 // Test that if we encounter an error during scorer persistence, an error gets returned.
978 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
979 let data_dir = nodes[0].persister.get_data_dir();
980 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
981 let event_handler = |_: _| {};
982 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
984 match bg_processor.stop() {
985 Ok(_) => panic!("Expected error persisting scorer"),
987 assert_eq!(e.kind(), std::io::ErrorKind::Other);
988 assert_eq!(e.get_ref().unwrap().to_string(), "test");
994 fn test_background_event_handling() {
995 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
996 let channel_value = 100000;
997 let data_dir = nodes[0].persister.get_data_dir();
998 let persister = Arc::new(Persister::new(data_dir.clone()));
1000 // Set up a background event handler for FundingGenerationReady events.
1001 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1002 let event_handler = move |event: Event| match event {
1003 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1004 Event::ChannelReady { .. } => {},
1005 _ => panic!("Unexpected event: {:?}", event),
1008 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1010 // Open a channel and check that the FundingGenerationReady event was handled.
1011 begin_open_channel!(nodes[0], nodes[1], channel_value);
1012 let (temporary_channel_id, funding_tx) = receiver
1013 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1014 .expect("FundingGenerationReady not handled within deadline");
1015 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1017 // Confirm the funding transaction.
1018 confirm_transaction(&mut nodes[0], &funding_tx);
1019 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1020 confirm_transaction(&mut nodes[1], &funding_tx);
1021 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1022 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1023 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1024 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1025 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1027 assert!(bg_processor.stop().is_ok());
1029 // Set up a background event handler for SpendableOutputs events.
1030 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1031 let event_handler = move |event: Event| match event {
1032 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1033 Event::ChannelReady { .. } => {},
1034 Event::ChannelClosed { .. } => {},
1035 _ => panic!("Unexpected event: {:?}", event),
1037 let persister = Arc::new(Persister::new(data_dir));
1038 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1040 // Force close the channel and check that the SpendableOutputs event was handled.
1041 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1042 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1043 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1045 let event = receiver
1046 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1047 .expect("Events not handled within deadline");
1049 Event::SpendableOutputs { .. } => {},
1050 _ => panic!("Unexpected event: {:?}", event),
1053 assert!(bg_processor.stop().is_ok());
1057 fn test_scorer_persistence() {
1058 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1059 let data_dir = nodes[0].persister.get_data_dir();
1060 let persister = Arc::new(Persister::new(data_dir));
1061 let event_handler = |_: _| {};
1062 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1065 let log_entries = nodes[0].logger.lines.lock().unwrap();
1066 let expected_log = "Persisting scorer".to_string();
1067 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1072 assert!(bg_processor.stop().is_ok());
1076 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1077 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1078 let data_dir = nodes[0].persister.get_data_dir();
1079 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1080 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1081 let network_graph = nodes[0].network_graph.clone();
1082 let features = ChannelFeatures::empty();
1083 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1084 .expect("Failed to update channel from partial announcement");
1085 let original_graph_description = network_graph.to_string();
1086 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1087 assert_eq!(network_graph.read_only().channels().len(), 1);
1089 let event_handler = |_: _| {};
1090 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1093 let log_entries = nodes[0].logger.lines.lock().unwrap();
1094 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1095 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1098 // Wait until the loop has gone around at least twice.
1103 let initialization_input = vec![
1104 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1105 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1106 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1107 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1108 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1109 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1110 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1111 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1112 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1113 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1114 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1115 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1116 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1118 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1120 // this should have added two channels
1121 assert_eq!(network_graph.read_only().channels().len(), 3);
1124 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1125 .expect("Network graph not pruned within deadline");
1127 background_processor.stop().unwrap();
1129 // all channels should now be pruned
1130 assert_eq!(network_graph.read_only().channels().len(), 0);
1134 fn test_invoice_payer() {
1135 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1136 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1137 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1139 // Initiate the background processors to watch each node.
1140 let data_dir = nodes[0].persister.get_data_dir();
1141 let persister = Arc::new(Persister::new(data_dir));
1142 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1143 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1144 let event_handler = Arc::clone(&invoice_payer);
1145 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1146 assert!(bg_processor.stop().is_ok());