1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 #![deny(broken_intra_doc_links)]
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
28 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
34 #[cfg(feature = "futures")]
35 use futures::{select, future::FutureExt};
37 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
38 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
39 /// responsibilities are:
40 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
41 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
42 /// writing it to disk/backups by invoking the callback given to it at startup.
43 /// [`ChannelManager`] persistence should be done in the background.
44 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
45 /// at the appropriate intervals.
46 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
47 /// is provided to [`BackgroundProcessor::start`]).
49 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
50 /// upon as doing so may result in high latency.
54 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
55 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
56 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
57 /// unilateral chain closure fees are at risk.
59 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
60 /// [`Event`]: lightning::util::events::Event
61 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
62 pub struct BackgroundProcessor {
63 stop_thread: Arc<AtomicBool>,
64 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 const FRESHNESS_TIMER: u64 = 60;
70 const FRESHNESS_TIMER: u64 = 1;
72 #[cfg(all(not(test), not(debug_assertions)))]
73 const PING_TIMER: u64 = 10;
74 /// Signature operations take a lot longer without compiler optimisations.
75 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
76 /// timeout is reached.
77 #[cfg(all(not(test), debug_assertions))]
78 const PING_TIMER: u64 = 30;
80 const PING_TIMER: u64 = 1;
82 /// Prune the network graph of stale entries hourly.
83 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
86 const SCORER_PERSIST_TIMER: u64 = 30;
88 const SCORER_PERSIST_TIMER: u64 = 1;
91 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
93 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
95 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
97 P: Deref<Target = P2PGossipSync<G, A, L>>,
98 R: Deref<Target = RapidGossipSync<G, L>>,
99 G: Deref<Target = NetworkGraph<L>>,
103 where A::Target: chain::Access, L::Target: Logger {
104 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
106 /// Rapid gossip sync from a trusted server.
113 P: Deref<Target = P2PGossipSync<G, A, L>>,
114 R: Deref<Target = RapidGossipSync<G, L>>,
115 G: Deref<Target = NetworkGraph<L>>,
118 > GossipSync<P, R, G, A, L>
119 where A::Target: chain::Access, L::Target: Logger {
120 fn network_graph(&self) -> Option<&G> {
122 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
123 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
124 GossipSync::None => None,
128 fn prunable_network_graph(&self) -> Option<&G> {
130 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
131 GossipSync::Rapid(gossip_sync) => {
132 if gossip_sync.is_initial_sync_complete() {
133 Some(gossip_sync.network_graph())
138 GossipSync::None => None,
143 /// (C-not exported) as the bindings concretize everything and have constructors for us
144 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
145 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
147 A::Target: chain::Access,
150 /// Initializes a new [`GossipSync::P2P`] variant.
151 pub fn p2p(gossip_sync: P) -> Self {
152 GossipSync::P2P(gossip_sync)
156 /// (C-not exported) as the bindings concretize everything and have constructors for us
157 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
159 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
162 &'a (dyn chain::Access + Send + Sync),
168 /// Initializes a new [`GossipSync::Rapid`] variant.
169 pub fn rapid(gossip_sync: R) -> Self {
170 GossipSync::Rapid(gossip_sync)
174 /// (C-not exported) as the bindings concretize everything and have constructors for us
177 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
178 &RapidGossipSync<&'a NetworkGraph<L>, L>,
180 &'a (dyn chain::Access + Send + Sync),
186 /// Initializes a new [`GossipSync::None`] variant.
187 pub fn none() -> Self {
192 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
193 struct DecoratingEventHandler<
196 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
197 RGS: Deref<Target = RapidGossipSync<G, L>>,
198 G: Deref<Target = NetworkGraph<L>>,
202 where A::Target: chain::Access, L::Target: Logger {
204 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
210 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
211 RGS: Deref<Target = RapidGossipSync<G, L>>,
212 G: Deref<Target = NetworkGraph<L>>,
215 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
216 where A::Target: chain::Access, L::Target: Logger {
217 fn handle_event(&self, event: &Event) {
218 if let Some(network_graph) = self.gossip_sync.network_graph() {
219 network_graph.handle_event(event);
221 self.event_handler.handle_event(event);
225 macro_rules! define_run_body {
226 ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
227 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
228 $loop_exit_check: expr, $await: expr)
230 let event_handler = DecoratingEventHandler {
231 event_handler: $event_handler,
232 gossip_sync: &$gossip_sync,
235 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
236 $channel_manager.timer_tick_occurred();
238 let mut last_freshness_call = Instant::now();
239 let mut last_ping_call = Instant::now();
240 let mut last_prune_call = Instant::now();
241 let mut last_scorer_persist_call = Instant::now();
242 let mut have_pruned = false;
245 $channel_manager.process_pending_events(&event_handler);
246 $chain_monitor.process_pending_events(&event_handler);
248 // Note that the PeerManager::process_events may block on ChannelManager's locks,
249 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
250 // we want to ensure we get into `persist_manager` as quickly as we can, especially
251 // without running the normal event processing above and handing events to users.
253 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
254 // processing a message effectively at any point during this loop. In order to
255 // minimize the time between such processing completing and persisting the updated
256 // ChannelManager, we want to minimize methods blocking on a ChannelManager
257 // generally, and as a fallback place such blocking only immediately before
259 $peer_manager.process_events();
261 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
262 // see `await_start`'s use below.
263 let await_start = Instant::now();
264 let updates_available = $await;
265 let await_time = await_start.elapsed();
267 if updates_available {
268 log_trace!($logger, "Persisting ChannelManager...");
269 $persister.persist_manager(&*$channel_manager)?;
270 log_trace!($logger, "Done persisting ChannelManager.");
272 // Exit the loop if the background processor was requested to stop.
273 if $loop_exit_check {
274 log_trace!($logger, "Terminating background processor.");
277 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
278 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
279 $channel_manager.timer_tick_occurred();
280 last_freshness_call = Instant::now();
282 if await_time > Duration::from_secs(1) {
283 // On various platforms, we may be starved of CPU cycles for several reasons.
284 // E.g. on iOS, if we've been in the background, we will be entirely paused.
285 // Similarly, if we're on a desktop platform and the device has been asleep, we
286 // may not get any cycles.
287 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
288 // full second, at which point we assume sockets may have been killed (they
289 // appear to be at least on some platforms, even if it has only been a second).
290 // Note that we have to take care to not get here just because user event
291 // processing was slow at the top of the loop. For example, the sample client
292 // may call Bitcoin Core RPCs during event handling, which very often takes
293 // more than a handful of seconds to complete, and shouldn't disconnect all our
295 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
296 $peer_manager.disconnect_all_peers();
297 last_ping_call = Instant::now();
298 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
299 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
300 $peer_manager.timer_tick_occurred();
301 last_ping_call = Instant::now();
304 // Note that we want to run a graph prune once not long after startup before
305 // falling back to our usual hourly prunes. This avoids short-lived clients never
306 // pruning their network graph. We run once 60 seconds after startup before
307 // continuing our normal cadence.
308 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
309 // The network graph must not be pruned while rapid sync completion is pending
310 log_trace!($logger, "Assessing prunability of network graph");
311 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
312 network_graph.remove_stale_channels();
314 if let Err(e) = $persister.persist_graph(network_graph) {
315 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
318 last_prune_call = Instant::now();
321 log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
325 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
326 if let Some(ref scorer) = $scorer {
327 log_trace!($logger, "Persisting scorer");
328 if let Err(e) = $persister.persist_scorer(&scorer) {
329 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
332 last_scorer_persist_call = Instant::now();
336 // After we exit, ensure we persist the ChannelManager one final time - this avoids
337 // some races where users quit while channel updates were in-flight, with
338 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
339 $persister.persist_manager(&*$channel_manager)?;
341 // Persist Scorer on exit
342 if let Some(ref scorer) = $scorer {
343 $persister.persist_scorer(&scorer)?;
346 // Persist NetworkGraph on exit
347 if let Some(network_graph) = $gossip_sync.network_graph() {
348 $persister.persist_graph(network_graph)?;
355 /// Processes background events in a future.
357 /// `sleeper` should return a future which completes in the given amount of time and returns a
358 /// boolean indicating whether the background processing should continue. Once `sleeper` returns a
359 /// future which outputs false, the loop will exit and this function's future will complete.
361 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
362 #[cfg(feature = "futures")]
363 pub async fn process_events_async<
365 Signer: 'static + Sign,
366 CA: 'static + Deref + Send + Sync,
367 CF: 'static + Deref + Send + Sync,
368 CW: 'static + Deref + Send + Sync,
369 T: 'static + Deref + Send + Sync,
370 K: 'static + Deref + Send + Sync,
371 F: 'static + Deref + Send + Sync,
372 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
373 L: 'static + Deref + Send + Sync,
374 P: 'static + Deref + Send + Sync,
375 Descriptor: 'static + SocketDescriptor + Send + Sync,
376 CMH: 'static + Deref + Send + Sync,
377 RMH: 'static + Deref + Send + Sync,
378 EH: 'static + EventHandler + Send,
379 PS: 'static + Deref + Send,
380 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
381 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
382 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
383 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
384 UMH: 'static + Deref + Send + Sync,
385 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
386 S: 'static + Deref<Target = SC> + Send + Sync,
387 SC: WriteableScore<'a>,
388 SleepFuture: core::future::Future<Output = bool>,
389 Sleeper: Fn(Duration) -> SleepFuture
391 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
392 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
394 ) -> Result<(), std::io::Error>
396 CA::Target: 'static + chain::Access,
397 CF::Target: 'static + chain::Filter,
398 CW::Target: 'static + chain::Watch<Signer>,
399 T::Target: 'static + BroadcasterInterface,
400 K::Target: 'static + KeysInterface<Signer = Signer>,
401 F::Target: 'static + FeeEstimator,
402 L::Target: 'static + Logger,
403 P::Target: 'static + Persist<Signer>,
404 CMH::Target: 'static + ChannelMessageHandler,
405 RMH::Target: 'static + RoutingMessageHandler,
406 UMH::Target: 'static + CustomMessageHandler,
407 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
409 let mut should_continue = true;
410 define_run_body!(persister, event_handler, chain_monitor, channel_manager,
411 gossip_sync, peer_manager, logger, scorer, should_continue, {
413 _ = channel_manager.get_persistable_update_future().fuse() => true,
414 cont = sleeper(Duration::from_millis(100)).fuse() => {
415 should_continue = cont;
422 impl BackgroundProcessor {
423 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
426 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
427 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
428 /// either [`join`] or [`stop`].
430 /// # Data Persistence
432 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
433 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
434 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
435 /// provided implementation.
437 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
438 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
439 /// See the `lightning-persister` crate for LDK's provided implementation.
441 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
442 /// error or call [`join`] and handle any error that may arise. For the latter case,
443 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
447 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
448 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
449 /// functionality implemented by other handlers.
450 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
452 /// # Rapid Gossip Sync
454 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
455 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
456 /// until the [`RapidGossipSync`] instance completes its first sync.
458 /// [top-level documentation]: BackgroundProcessor
459 /// [`join`]: Self::join
460 /// [`stop`]: Self::stop
461 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
462 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
463 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
464 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
465 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
466 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
469 Signer: 'static + Sign,
470 CA: 'static + Deref + Send + Sync,
471 CF: 'static + Deref + Send + Sync,
472 CW: 'static + Deref + Send + Sync,
473 T: 'static + Deref + Send + Sync,
474 K: 'static + Deref + Send + Sync,
475 F: 'static + Deref + Send + Sync,
476 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
477 L: 'static + Deref + Send + Sync,
478 P: 'static + Deref + Send + Sync,
479 Descriptor: 'static + SocketDescriptor + Send + Sync,
480 CMH: 'static + Deref + Send + Sync,
481 RMH: 'static + Deref + Send + Sync,
482 EH: 'static + EventHandler + Send,
483 PS: 'static + Deref + Send,
484 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
485 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
486 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
487 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
488 UMH: 'static + Deref + Send + Sync,
489 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
490 S: 'static + Deref<Target = SC> + Send + Sync,
491 SC: WriteableScore<'a>,
493 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
494 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
497 CA::Target: 'static + chain::Access,
498 CF::Target: 'static + chain::Filter,
499 CW::Target: 'static + chain::Watch<Signer>,
500 T::Target: 'static + BroadcasterInterface,
501 K::Target: 'static + KeysInterface<Signer = Signer>,
502 F::Target: 'static + FeeEstimator,
503 L::Target: 'static + Logger,
504 P::Target: 'static + Persist<Signer>,
505 CMH::Target: 'static + ChannelMessageHandler,
506 RMH::Target: 'static + RoutingMessageHandler,
507 UMH::Target: 'static + CustomMessageHandler,
508 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
510 let stop_thread = Arc::new(AtomicBool::new(false));
511 let stop_thread_clone = stop_thread.clone();
512 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
513 define_run_body!(persister, event_handler, chain_monitor, channel_manager,
514 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
515 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
517 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
520 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
521 /// [`ChannelManager`].
525 /// This function panics if the background thread has panicked such as while persisting or
528 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
529 pub fn join(mut self) -> Result<(), std::io::Error> {
530 assert!(self.thread_handle.is_some());
534 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
535 /// [`ChannelManager`].
539 /// This function panics if the background thread has panicked such as while persisting or
542 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
543 pub fn stop(mut self) -> Result<(), std::io::Error> {
544 assert!(self.thread_handle.is_some());
545 self.stop_and_join_thread()
548 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
549 self.stop_thread.store(true, Ordering::Release);
553 fn join_thread(&mut self) -> Result<(), std::io::Error> {
554 match self.thread_handle.take() {
555 Some(handle) => handle.join().unwrap(),
561 impl Drop for BackgroundProcessor {
563 self.stop_and_join_thread().unwrap();
569 use bitcoin::blockdata::block::BlockHeader;
570 use bitcoin::blockdata::constants::genesis_block;
571 use bitcoin::blockdata::transaction::{Transaction, TxOut};
572 use bitcoin::network::constants::Network;
573 use lightning::chain::{BestBlock, Confirm, chainmonitor};
574 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
575 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
576 use lightning::chain::transaction::OutPoint;
577 use lightning::get_event_msg;
578 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
579 use lightning::ln::features::{ChannelFeatures, InitFeatures};
580 use lightning::ln::msgs::{ChannelMessageHandler, Init};
581 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
582 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
583 use lightning::util::config::UserConfig;
584 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
585 use lightning::util::ser::Writeable;
586 use lightning::util::test_utils;
587 use lightning::util::persist::KVStorePersister;
588 use lightning_invoice::payment::{InvoicePayer, Retry};
589 use lightning_invoice::utils::DefaultRouter;
590 use lightning_persister::FilesystemPersister;
592 use std::path::PathBuf;
593 use std::sync::{Arc, Mutex};
594 use std::sync::mpsc::SyncSender;
595 use std::time::Duration;
596 use lightning::routing::scoring::{FixedPenaltyScorer};
597 use lightning_rapid_gossip_sync::RapidGossipSync;
598 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
600 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
602 #[derive(Clone, Eq, Hash, PartialEq)]
603 struct TestDescriptor{}
604 impl SocketDescriptor for TestDescriptor {
605 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
609 fn disconnect_socket(&mut self) {}
612 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
614 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
615 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
618 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
619 p2p_gossip_sync: PGS,
620 rapid_gossip_sync: RGS,
621 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
622 chain_monitor: Arc<ChainMonitor>,
623 persister: Arc<FilesystemPersister>,
624 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
625 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
626 logger: Arc<test_utils::TestLogger>,
627 best_block: BestBlock,
628 scorer: Arc<Mutex<FixedPenaltyScorer>>,
632 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
633 GossipSync::P2P(self.p2p_gossip_sync.clone())
636 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
637 GossipSync::Rapid(self.rapid_gossip_sync.clone())
640 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
647 let data_dir = self.persister.get_data_dir();
648 match fs::remove_dir_all(data_dir.clone()) {
649 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
656 graph_error: Option<(std::io::ErrorKind, &'static str)>,
657 graph_persistence_notifier: Option<SyncSender<()>>,
658 manager_error: Option<(std::io::ErrorKind, &'static str)>,
659 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
660 filesystem_persister: FilesystemPersister,
664 fn new(data_dir: String) -> Self {
665 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
666 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
669 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
670 Self { graph_error: Some((error, message)), ..self }
673 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
674 Self { graph_persistence_notifier: Some(sender), ..self }
677 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
678 Self { manager_error: Some((error, message)), ..self }
681 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
682 Self { scorer_error: Some((error, message)), ..self }
686 impl KVStorePersister for Persister {
687 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
688 if key == "manager" {
689 if let Some((error, message)) = self.manager_error {
690 return Err(std::io::Error::new(error, message))
694 if key == "network_graph" {
695 if let Some(sender) = &self.graph_persistence_notifier {
696 sender.send(()).unwrap();
699 if let Some((error, message)) = self.graph_error {
700 return Err(std::io::Error::new(error, message))
705 if let Some((error, message)) = self.scorer_error {
706 return Err(std::io::Error::new(error, message))
710 self.filesystem_persister.persist(key, object)
714 fn get_full_filepath(filepath: String, filename: String) -> String {
715 let mut path = PathBuf::from(filepath);
717 path.to_str().unwrap().to_string()
720 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
721 let mut nodes = Vec::new();
722 for i in 0..num_nodes {
723 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
724 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
725 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
726 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
727 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
728 let seed = [i as u8; 32];
729 let network = Network::Testnet;
730 let genesis_block = genesis_block(network);
731 let now = Duration::from_secs(genesis_block.header.time as u64);
732 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
733 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
734 let best_block = BestBlock::from_genesis(network);
735 let params = ChainParameters { network, best_block };
736 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
737 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
738 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
739 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
740 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
741 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
742 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
743 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
747 for i in 0..num_nodes {
748 for j in (i+1)..num_nodes {
749 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
750 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
757 macro_rules! open_channel {
758 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
759 begin_open_channel!($node_a, $node_b, $channel_value);
760 let events = $node_a.node.get_and_clear_pending_events();
761 assert_eq!(events.len(), 1);
762 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
763 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
768 macro_rules! begin_open_channel {
769 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
770 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
771 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
772 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
776 macro_rules! handle_funding_generation_ready {
777 ($event: expr, $channel_value: expr) => {{
779 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
780 assert_eq!(channel_value_satoshis, $channel_value);
781 assert_eq!(user_channel_id, 42);
783 let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
784 value: channel_value_satoshis, script_pubkey: output_script.clone(),
786 (temporary_channel_id, tx)
788 _ => panic!("Unexpected event"),
793 macro_rules! end_open_channel {
794 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
795 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
796 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
797 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
801 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
803 let prev_blockhash = node.best_block.block_hash();
804 let height = node.best_block.height() + 1;
805 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
806 let txdata = vec![(0, tx)];
807 node.best_block = BestBlock::new(header.block_hash(), height);
810 node.node.transactions_confirmed(&header, &txdata, height);
811 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
814 node.node.best_block_updated(&header, height);
815 node.chain_monitor.best_block_updated(&header, height);
821 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
822 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
826 fn test_background_processor() {
827 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
828 // updates. Also test that when new updates are available, the manager signals that it needs
829 // re-persistence and is successfully re-persisted.
830 let nodes = create_nodes(2, "test_background_processor".to_string());
832 // Go through the channel creation process so that each node has something to persist. Since
833 // open_channel consumes events, it must complete before starting BackgroundProcessor to
834 // avoid a race with processing events.
835 let tx = open_channel!(nodes[0], nodes[1], 100000);
837 // Initiate the background processors to watch each node.
838 let data_dir = nodes[0].persister.get_data_dir();
839 let persister = Arc::new(Persister::new(data_dir));
840 let event_handler = |_: &_| {};
841 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
843 macro_rules! check_persisted_data {
844 ($node: expr, $filepath: expr) => {
845 let mut expected_bytes = Vec::new();
847 expected_bytes.clear();
848 match $node.write(&mut expected_bytes) {
850 match std::fs::read($filepath) {
852 if bytes == expected_bytes {
861 Err(e) => panic!("Unexpected error: {}", e)
867 // Check that the initial channel manager data is persisted as expected.
868 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
869 check_persisted_data!(nodes[0].node, filepath.clone());
872 if !nodes[0].node.get_persistence_condvar_value() { break }
875 // Force-close the channel.
876 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
878 // Check that the force-close updates are persisted.
879 check_persisted_data!(nodes[0].node, filepath.clone());
881 if !nodes[0].node.get_persistence_condvar_value() { break }
884 // Check network graph is persisted
885 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
886 check_persisted_data!(nodes[0].network_graph, filepath.clone());
888 // Check scorer is persisted
889 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
890 check_persisted_data!(nodes[0].scorer, filepath.clone());
892 assert!(bg_processor.stop().is_ok());
896 fn test_timer_tick_called() {
897 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
898 // `FRESHNESS_TIMER`.
899 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
900 let data_dir = nodes[0].persister.get_data_dir();
901 let persister = Arc::new(Persister::new(data_dir));
902 let event_handler = |_: &_| {};
903 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
905 let log_entries = nodes[0].logger.lines.lock().unwrap();
906 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
907 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
908 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
909 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
914 assert!(bg_processor.stop().is_ok());
918 fn test_channel_manager_persist_error() {
919 // Test that if we encounter an error during manager persistence, the thread panics.
920 let nodes = create_nodes(2, "test_persist_error".to_string());
921 open_channel!(nodes[0], nodes[1], 100000);
923 let data_dir = nodes[0].persister.get_data_dir();
924 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
925 let event_handler = |_: &_| {};
926 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
927 match bg_processor.join() {
928 Ok(_) => panic!("Expected error persisting manager"),
930 assert_eq!(e.kind(), std::io::ErrorKind::Other);
931 assert_eq!(e.get_ref().unwrap().to_string(), "test");
937 fn test_network_graph_persist_error() {
938 // Test that if we encounter an error during network graph persistence, an error gets returned.
939 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
940 let data_dir = nodes[0].persister.get_data_dir();
941 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
942 let event_handler = |_: &_| {};
943 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
945 match bg_processor.stop() {
946 Ok(_) => panic!("Expected error persisting network graph"),
948 assert_eq!(e.kind(), std::io::ErrorKind::Other);
949 assert_eq!(e.get_ref().unwrap().to_string(), "test");
955 fn test_scorer_persist_error() {
956 // Test that if we encounter an error during scorer persistence, an error gets returned.
957 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
958 let data_dir = nodes[0].persister.get_data_dir();
959 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
960 let event_handler = |_: &_| {};
961 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
963 match bg_processor.stop() {
964 Ok(_) => panic!("Expected error persisting scorer"),
966 assert_eq!(e.kind(), std::io::ErrorKind::Other);
967 assert_eq!(e.get_ref().unwrap().to_string(), "test");
973 fn test_background_event_handling() {
974 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
975 let channel_value = 100000;
976 let data_dir = nodes[0].persister.get_data_dir();
977 let persister = Arc::new(Persister::new(data_dir.clone()));
979 // Set up a background event handler for FundingGenerationReady events.
980 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
981 let event_handler = move |event: &Event| {
982 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
984 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
986 // Open a channel and check that the FundingGenerationReady event was handled.
987 begin_open_channel!(nodes[0], nodes[1], channel_value);
988 let (temporary_channel_id, funding_tx) = receiver
989 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
990 .expect("FundingGenerationReady not handled within deadline");
991 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
993 // Confirm the funding transaction.
994 confirm_transaction(&mut nodes[0], &funding_tx);
995 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
996 confirm_transaction(&mut nodes[1], &funding_tx);
997 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
998 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
999 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1000 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1001 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1003 assert!(bg_processor.stop().is_ok());
1005 // Set up a background event handler for SpendableOutputs events.
1006 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1007 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
1008 let persister = Arc::new(Persister::new(data_dir));
1009 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1011 // Force close the channel and check that the SpendableOutputs event was handled.
1012 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1013 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1014 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1015 let event = receiver
1016 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1017 .expect("SpendableOutputs not handled within deadline");
1019 Event::SpendableOutputs { .. } => {},
1020 Event::ChannelClosed { .. } => {},
1021 _ => panic!("Unexpected event: {:?}", event),
1024 assert!(bg_processor.stop().is_ok());
1028 fn test_scorer_persistence() {
1029 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1030 let data_dir = nodes[0].persister.get_data_dir();
1031 let persister = Arc::new(Persister::new(data_dir));
1032 let event_handler = |_: &_| {};
1033 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1036 let log_entries = nodes[0].logger.lines.lock().unwrap();
1037 let expected_log = "Persisting scorer".to_string();
1038 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1043 assert!(bg_processor.stop().is_ok());
1047 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1048 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1049 let data_dir = nodes[0].persister.get_data_dir();
1050 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1051 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1052 let network_graph = nodes[0].network_graph.clone();
1053 let features = ChannelFeatures::empty();
1054 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1055 .expect("Failed to update channel from partial announcement");
1056 let original_graph_description = network_graph.to_string();
1057 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1058 assert_eq!(network_graph.read_only().channels().len(), 1);
1060 let event_handler = |_: &_| {};
1061 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1064 let log_entries = nodes[0].logger.lines.lock().unwrap();
1065 let expected_log_a = "Assessing prunability of network graph".to_string();
1066 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1067 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1068 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1073 let initialization_input = vec![
1074 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1075 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1076 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1077 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1078 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1079 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1080 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1081 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1082 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1083 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1084 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1085 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1086 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1088 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1090 // this should have added two channels
1091 assert_eq!(network_graph.read_only().channels().len(), 3);
1094 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1095 .expect("Network graph not pruned within deadline");
1097 background_processor.stop().unwrap();
1099 // all channels should now be pruned
1100 assert_eq!(network_graph.read_only().channels().len(), 0);
1104 fn test_invoice_payer() {
1105 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1106 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1107 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1109 // Initiate the background processors to watch each node.
1110 let data_dir = nodes[0].persister.get_data_dir();
1111 let persister = Arc::new(Persister::new(data_dir));
1112 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1113 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1114 let event_handler = Arc::clone(&invoice_payer);
1115 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1116 assert!(bg_processor.stop().is_ok());