1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{KeysInterface, SignerProvider};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::router::Router;
26 use lightning::routing::scoring::WriteableScore;
27 use lightning::util::events::{Event, EventHandler, EventsProvider};
28 use lightning::util::logger::Logger;
29 use lightning::util::persist::Persister;
30 use lightning_rapid_gossip_sync::RapidGossipSync;
32 use std::sync::atomic::{AtomicBool, Ordering};
34 use std::thread::JoinHandle;
35 use std::time::{Duration, Instant};
38 #[cfg(feature = "futures")]
39 use futures_util::{select_biased, future::FutureExt};
41 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
42 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
43 /// responsibilities are:
44 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
45 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
46 /// writing it to disk/backups by invoking the callback given to it at startup.
47 /// [`ChannelManager`] persistence should be done in the background.
48 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
49 /// at the appropriate intervals.
50 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
51 /// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
53 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
54 /// upon as doing so may result in high latency.
58 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
59 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
60 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
61 /// unilateral chain closure fees are at risk.
63 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
64 /// [`Event`]: lightning::util::events::Event
65 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
66 pub struct BackgroundProcessor {
67 stop_thread: Arc<AtomicBool>,
68 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
72 const FRESHNESS_TIMER: u64 = 60;
74 const FRESHNESS_TIMER: u64 = 1;
76 #[cfg(all(not(test), not(debug_assertions)))]
77 const PING_TIMER: u64 = 10;
78 /// Signature operations take a lot longer without compiler optimisations.
79 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
80 /// timeout is reached.
81 #[cfg(all(not(test), debug_assertions))]
82 const PING_TIMER: u64 = 30;
84 const PING_TIMER: u64 = 1;
86 /// Prune the network graph of stale entries hourly.
87 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
90 const SCORER_PERSIST_TIMER: u64 = 30;
92 const SCORER_PERSIST_TIMER: u64 = 1;
95 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
97 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
99 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
101 P: Deref<Target = P2PGossipSync<G, A, L>>,
102 R: Deref<Target = RapidGossipSync<G, L>>,
103 G: Deref<Target = NetworkGraph<L>>,
107 where A::Target: chain::Access, L::Target: Logger {
108 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
110 /// Rapid gossip sync from a trusted server.
117 P: Deref<Target = P2PGossipSync<G, A, L>>,
118 R: Deref<Target = RapidGossipSync<G, L>>,
119 G: Deref<Target = NetworkGraph<L>>,
122 > GossipSync<P, R, G, A, L>
123 where A::Target: chain::Access, L::Target: Logger {
124 fn network_graph(&self) -> Option<&G> {
126 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
127 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
128 GossipSync::None => None,
132 fn prunable_network_graph(&self) -> Option<&G> {
134 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
135 GossipSync::Rapid(gossip_sync) => {
136 if gossip_sync.is_initial_sync_complete() {
137 Some(gossip_sync.network_graph())
142 GossipSync::None => None,
147 /// (C-not exported) as the bindings concretize everything and have constructors for us
148 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
149 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
151 A::Target: chain::Access,
154 /// Initializes a new [`GossipSync::P2P`] variant.
155 pub fn p2p(gossip_sync: P) -> Self {
156 GossipSync::P2P(gossip_sync)
160 /// (C-not exported) as the bindings concretize everything and have constructors for us
161 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
163 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
166 &'a (dyn chain::Access + Send + Sync),
172 /// Initializes a new [`GossipSync::Rapid`] variant.
173 pub fn rapid(gossip_sync: R) -> Self {
174 GossipSync::Rapid(gossip_sync)
178 /// (C-not exported) as the bindings concretize everything and have constructors for us
181 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
182 &RapidGossipSync<&'a NetworkGraph<L>, L>,
184 &'a (dyn chain::Access + Send + Sync),
190 /// Initializes a new [`GossipSync::None`] variant.
191 pub fn none() -> Self {
196 fn handle_network_graph_update<L: Deref>(
197 network_graph: &NetworkGraph<L>, event: &Event
198 ) where L::Target: Logger {
199 if let Event::PaymentPathFailed { ref network_update, .. } = event {
200 if let Some(network_update) = network_update {
201 network_graph.handle_network_update(&network_update);
206 macro_rules! define_run_body {
207 ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
208 $channel_manager: ident, $process_channel_manager_events: expr,
209 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
210 $loop_exit_check: expr, $await: expr)
212 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
213 $channel_manager.timer_tick_occurred();
215 let mut last_freshness_call = Instant::now();
216 let mut last_ping_call = Instant::now();
217 let mut last_prune_call = Instant::now();
218 let mut last_scorer_persist_call = Instant::now();
219 let mut have_pruned = false;
222 $process_channel_manager_events;
223 $process_chain_monitor_events;
225 // Note that the PeerManager::process_events may block on ChannelManager's locks,
226 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
227 // we want to ensure we get into `persist_manager` as quickly as we can, especially
228 // without running the normal event processing above and handing events to users.
230 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
231 // processing a message effectively at any point during this loop. In order to
232 // minimize the time between such processing completing and persisting the updated
233 // ChannelManager, we want to minimize methods blocking on a ChannelManager
234 // generally, and as a fallback place such blocking only immediately before
236 $peer_manager.process_events();
238 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
239 // see `await_start`'s use below.
240 let await_start = Instant::now();
241 let updates_available = $await;
242 let await_time = await_start.elapsed();
244 if updates_available {
245 log_trace!($logger, "Persisting ChannelManager...");
246 $persister.persist_manager(&*$channel_manager)?;
247 log_trace!($logger, "Done persisting ChannelManager.");
249 // Exit the loop if the background processor was requested to stop.
250 if $loop_exit_check {
251 log_trace!($logger, "Terminating background processor.");
254 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
255 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
256 $channel_manager.timer_tick_occurred();
257 last_freshness_call = Instant::now();
259 if await_time > Duration::from_secs(1) {
260 // On various platforms, we may be starved of CPU cycles for several reasons.
261 // E.g. on iOS, if we've been in the background, we will be entirely paused.
262 // Similarly, if we're on a desktop platform and the device has been asleep, we
263 // may not get any cycles.
264 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
265 // full second, at which point we assume sockets may have been killed (they
266 // appear to be at least on some platforms, even if it has only been a second).
267 // Note that we have to take care to not get here just because user event
268 // processing was slow at the top of the loop. For example, the sample client
269 // may call Bitcoin Core RPCs during event handling, which very often takes
270 // more than a handful of seconds to complete, and shouldn't disconnect all our
272 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
273 $peer_manager.disconnect_all_peers();
274 last_ping_call = Instant::now();
275 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
276 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
277 $peer_manager.timer_tick_occurred();
278 last_ping_call = Instant::now();
281 // Note that we want to run a graph prune once not long after startup before
282 // falling back to our usual hourly prunes. This avoids short-lived clients never
283 // pruning their network graph. We run once 60 seconds after startup before
284 // continuing our normal cadence.
285 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
286 // The network graph must not be pruned while rapid sync completion is pending
287 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
288 log_trace!($logger, "Pruning and persisting network graph.");
289 network_graph.remove_stale_channels_and_tracking();
291 if let Err(e) = $persister.persist_graph(network_graph) {
292 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
295 last_prune_call = Instant::now();
300 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
301 if let Some(ref scorer) = $scorer {
302 log_trace!($logger, "Persisting scorer");
303 if let Err(e) = $persister.persist_scorer(&scorer) {
304 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
307 last_scorer_persist_call = Instant::now();
311 // After we exit, ensure we persist the ChannelManager one final time - this avoids
312 // some races where users quit while channel updates were in-flight, with
313 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
314 $persister.persist_manager(&*$channel_manager)?;
316 // Persist Scorer on exit
317 if let Some(ref scorer) = $scorer {
318 $persister.persist_scorer(&scorer)?;
321 // Persist NetworkGraph on exit
322 if let Some(network_graph) = $gossip_sync.network_graph() {
323 $persister.persist_graph(network_graph)?;
330 /// Processes background events in a future.
332 /// `sleeper` should return a future which completes in the given amount of time and returns a
333 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
334 /// future which outputs true, the loop will exit and this function's future will complete.
336 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
337 #[cfg(feature = "futures")]
338 pub async fn process_events_async<
340 CA: 'static + Deref + Send + Sync,
341 CF: 'static + Deref + Send + Sync,
342 CW: 'static + Deref + Send + Sync,
343 T: 'static + Deref + Send + Sync,
344 K: 'static + Deref + Send + Sync,
345 F: 'static + Deref + Send + Sync,
346 R: 'static + Deref + Send + Sync,
347 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
348 L: 'static + Deref + Send + Sync,
349 P: 'static + Deref + Send + Sync,
350 Descriptor: 'static + SocketDescriptor + Send + Sync,
351 CMH: 'static + Deref + Send + Sync,
352 RMH: 'static + Deref + Send + Sync,
353 OMH: 'static + Deref + Send + Sync,
354 EventHandlerFuture: core::future::Future<Output = ()>,
355 EventHandler: Fn(Event) -> EventHandlerFuture,
356 PS: 'static + Deref + Send,
357 M: 'static + Deref<Target = ChainMonitor<<K::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
358 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, R, L>> + Send + Sync,
359 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
360 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
361 UMH: 'static + Deref + Send + Sync,
362 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
363 S: 'static + Deref<Target = SC> + Send + Sync,
364 SC: WriteableScore<'a>,
365 SleepFuture: core::future::Future<Output = bool>,
366 Sleeper: Fn(Duration) -> SleepFuture
368 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
369 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
371 ) -> Result<(), std::io::Error>
373 CA::Target: 'static + chain::Access,
374 CF::Target: 'static + chain::Filter,
375 CW::Target: 'static + chain::Watch<<K::Target as SignerProvider>::Signer>,
376 T::Target: 'static + BroadcasterInterface,
377 K::Target: 'static + KeysInterface,
378 F::Target: 'static + FeeEstimator,
379 R::Target: 'static + Router,
380 L::Target: 'static + Logger,
381 P::Target: 'static + Persist<<K::Target as SignerProvider>::Signer>,
382 CMH::Target: 'static + ChannelMessageHandler,
383 OMH::Target: 'static + OnionMessageHandler,
384 RMH::Target: 'static + RoutingMessageHandler,
385 UMH::Target: 'static + CustomMessageHandler,
386 PS::Target: 'static + Persister<'a, CW, T, K, F, R, L, SC>,
388 let mut should_break = true;
389 let async_event_handler = |event| {
390 let network_graph = gossip_sync.network_graph();
391 let event_handler = &event_handler;
393 if let Some(network_graph) = network_graph {
394 handle_network_graph_update(network_graph, &event)
396 event_handler(event).await;
399 define_run_body!(persister,
400 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
401 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
402 gossip_sync, peer_manager, logger, scorer, should_break, {
404 _ = channel_manager.get_persistable_update_future().fuse() => true,
405 exit = sleeper(Duration::from_millis(100)).fuse() => {
413 impl BackgroundProcessor {
414 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
417 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
418 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
419 /// either [`join`] or [`stop`].
421 /// # Data Persistence
423 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
424 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
425 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
426 /// provided implementation.
428 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
429 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
430 /// See the `lightning-persister` crate for LDK's provided implementation.
432 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
433 /// error or call [`join`] and handle any error that may arise. For the latter case,
434 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
438 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
439 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
440 /// functionality implemented by other handlers.
441 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
443 /// # Rapid Gossip Sync
445 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
446 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
447 /// until the [`RapidGossipSync`] instance completes its first sync.
449 /// [top-level documentation]: BackgroundProcessor
450 /// [`join`]: Self::join
451 /// [`stop`]: Self::stop
452 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
453 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
454 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
455 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
456 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
457 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
460 CA: 'static + Deref + Send + Sync,
461 CF: 'static + Deref + Send + Sync,
462 CW: 'static + Deref + Send + Sync,
463 T: 'static + Deref + Send + Sync,
464 K: 'static + Deref + Send + Sync,
465 F: 'static + Deref + Send + Sync,
466 R: 'static + Deref + Send + Sync,
467 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
468 L: 'static + Deref + Send + Sync,
469 P: 'static + Deref + Send + Sync,
470 Descriptor: 'static + SocketDescriptor + Send + Sync,
471 CMH: 'static + Deref + Send + Sync,
472 OMH: 'static + Deref + Send + Sync,
473 RMH: 'static + Deref + Send + Sync,
474 EH: 'static + EventHandler + Send,
475 PS: 'static + Deref + Send,
476 M: 'static + Deref<Target = ChainMonitor<<K::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
477 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, R, L>> + Send + Sync,
478 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
479 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
480 UMH: 'static + Deref + Send + Sync,
481 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
482 S: 'static + Deref<Target = SC> + Send + Sync,
483 SC: WriteableScore<'a>,
485 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
486 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
489 CA::Target: 'static + chain::Access,
490 CF::Target: 'static + chain::Filter,
491 CW::Target: 'static + chain::Watch<<K::Target as SignerProvider>::Signer>,
492 T::Target: 'static + BroadcasterInterface,
493 K::Target: 'static + KeysInterface,
494 F::Target: 'static + FeeEstimator,
495 R::Target: 'static + Router,
496 L::Target: 'static + Logger,
497 P::Target: 'static + Persist<<K::Target as SignerProvider>::Signer>,
498 CMH::Target: 'static + ChannelMessageHandler,
499 OMH::Target: 'static + OnionMessageHandler,
500 RMH::Target: 'static + RoutingMessageHandler,
501 UMH::Target: 'static + CustomMessageHandler,
502 PS::Target: 'static + Persister<'a, CW, T, K, F, R, L, SC>,
504 let stop_thread = Arc::new(AtomicBool::new(false));
505 let stop_thread_clone = stop_thread.clone();
506 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
507 let event_handler = |event| {
508 let network_graph = gossip_sync.network_graph();
509 if let Some(network_graph) = network_graph {
510 handle_network_graph_update(network_graph, &event)
512 event_handler.handle_event(event);
514 define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
515 channel_manager, channel_manager.process_pending_events(&event_handler),
516 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
517 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
519 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
522 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
523 /// [`ChannelManager`].
527 /// This function panics if the background thread has panicked such as while persisting or
530 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
531 pub fn join(mut self) -> Result<(), std::io::Error> {
532 assert!(self.thread_handle.is_some());
536 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
537 /// [`ChannelManager`].
541 /// This function panics if the background thread has panicked such as while persisting or
544 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
545 pub fn stop(mut self) -> Result<(), std::io::Error> {
546 assert!(self.thread_handle.is_some());
547 self.stop_and_join_thread()
550 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
551 self.stop_thread.store(true, Ordering::Release);
555 fn join_thread(&mut self) -> Result<(), std::io::Error> {
556 match self.thread_handle.take() {
557 Some(handle) => handle.join().unwrap(),
563 impl Drop for BackgroundProcessor {
565 self.stop_and_join_thread().unwrap();
571 use bitcoin::blockdata::block::BlockHeader;
572 use bitcoin::blockdata::constants::genesis_block;
573 use bitcoin::blockdata::locktime::PackedLockTime;
574 use bitcoin::blockdata::transaction::{Transaction, TxOut};
575 use bitcoin::network::constants::Network;
576 use lightning::chain::{BestBlock, Confirm, chainmonitor};
577 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
578 use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysInterface, KeysManager, NodeSigner};
579 use lightning::chain::transaction::OutPoint;
580 use lightning::get_event_msg;
581 use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
582 use lightning::ln::features::ChannelFeatures;
583 use lightning::ln::msgs::{ChannelMessageHandler, Init};
584 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
585 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
586 use lightning::routing::router::DefaultRouter;
587 use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
588 use lightning::util::config::UserConfig;
589 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
590 use lightning::util::ser::Writeable;
591 use lightning::util::test_utils;
592 use lightning::util::persist::KVStorePersister;
593 use lightning_invoice::payment::{InvoicePayer, Retry};
594 use lightning_persister::FilesystemPersister;
596 use std::path::PathBuf;
597 use std::sync::{Arc, Mutex};
598 use std::sync::mpsc::SyncSender;
599 use std::time::Duration;
600 use bitcoin::hashes::Hash;
601 use bitcoin::TxMerkleNode;
602 use lightning_rapid_gossip_sync::RapidGossipSync;
603 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
605 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
607 #[derive(Clone, Hash, PartialEq, Eq)]
608 struct TestDescriptor{}
609 impl SocketDescriptor for TestDescriptor {
610 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
614 fn disconnect_socket(&mut self) {}
617 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
619 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
620 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
623 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
624 p2p_gossip_sync: PGS,
625 rapid_gossip_sync: RGS,
626 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
627 chain_monitor: Arc<ChainMonitor>,
628 persister: Arc<FilesystemPersister>,
629 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
630 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
631 logger: Arc<test_utils::TestLogger>,
632 best_block: BestBlock,
633 scorer: Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
637 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
638 GossipSync::P2P(self.p2p_gossip_sync.clone())
641 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
642 GossipSync::Rapid(self.rapid_gossip_sync.clone())
645 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
652 let data_dir = self.persister.get_data_dir();
653 match fs::remove_dir_all(data_dir.clone()) {
654 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
661 graph_error: Option<(std::io::ErrorKind, &'static str)>,
662 graph_persistence_notifier: Option<SyncSender<()>>,
663 manager_error: Option<(std::io::ErrorKind, &'static str)>,
664 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
665 filesystem_persister: FilesystemPersister,
669 fn new(data_dir: String) -> Self {
670 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
671 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
674 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
675 Self { graph_error: Some((error, message)), ..self }
678 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
679 Self { graph_persistence_notifier: Some(sender), ..self }
682 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
683 Self { manager_error: Some((error, message)), ..self }
686 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
687 Self { scorer_error: Some((error, message)), ..self }
691 impl KVStorePersister for Persister {
692 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
693 if key == "manager" {
694 if let Some((error, message)) = self.manager_error {
695 return Err(std::io::Error::new(error, message))
699 if key == "network_graph" {
700 if let Some(sender) = &self.graph_persistence_notifier {
701 sender.send(()).unwrap();
704 if let Some((error, message)) = self.graph_error {
705 return Err(std::io::Error::new(error, message))
710 if let Some((error, message)) = self.scorer_error {
711 return Err(std::io::Error::new(error, message))
715 self.filesystem_persister.persist(key, object)
719 fn get_full_filepath(filepath: String, filename: String) -> String {
720 let mut path = PathBuf::from(filepath);
722 path.to_str().unwrap().to_string()
725 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
726 let mut nodes = Vec::new();
727 for i in 0..num_nodes {
728 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
729 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
730 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
731 let network = Network::Testnet;
732 let genesis_block = genesis_block(network);
733 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
734 let params = ProbabilisticScoringParameters::default();
735 let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone())));
736 let seed = [i as u8; 32];
737 let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
738 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
739 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
740 let now = Duration::from_secs(genesis_block.header.time as u64);
741 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
742 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
743 let best_block = BestBlock::from_genesis(network);
744 let params = ChainParameters { network, best_block };
745 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
746 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
747 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
748 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
749 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
750 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
754 for i in 0..num_nodes {
755 for j in (i+1)..num_nodes {
756 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
757 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
764 macro_rules! open_channel {
765 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
766 begin_open_channel!($node_a, $node_b, $channel_value);
767 let events = $node_a.node.get_and_clear_pending_events();
768 assert_eq!(events.len(), 1);
769 let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
770 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
775 macro_rules! begin_open_channel {
776 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
777 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
778 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
779 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
783 macro_rules! handle_funding_generation_ready {
784 ($event: expr, $channel_value: expr) => {{
786 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
787 assert_eq!(channel_value_satoshis, $channel_value);
788 assert_eq!(user_channel_id, 42);
790 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
791 value: channel_value_satoshis, script_pubkey: output_script.clone(),
793 (temporary_channel_id, tx)
795 _ => panic!("Unexpected event"),
800 macro_rules! end_open_channel {
801 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
802 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
803 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
804 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
808 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
810 let prev_blockhash = node.best_block.block_hash();
811 let height = node.best_block.height() + 1;
812 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
813 let txdata = vec![(0, tx)];
814 node.best_block = BestBlock::new(header.block_hash(), height);
817 node.node.transactions_confirmed(&header, &txdata, height);
818 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
821 node.node.best_block_updated(&header, height);
822 node.chain_monitor.best_block_updated(&header, height);
828 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
829 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
833 fn test_background_processor() {
834 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
835 // updates. Also test that when new updates are available, the manager signals that it needs
836 // re-persistence and is successfully re-persisted.
837 let nodes = create_nodes(2, "test_background_processor".to_string());
839 // Go through the channel creation process so that each node has something to persist. Since
840 // open_channel consumes events, it must complete before starting BackgroundProcessor to
841 // avoid a race with processing events.
842 let tx = open_channel!(nodes[0], nodes[1], 100000);
844 // Initiate the background processors to watch each node.
845 let data_dir = nodes[0].persister.get_data_dir();
846 let persister = Arc::new(Persister::new(data_dir));
847 let event_handler = |_: _| {};
848 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
850 macro_rules! check_persisted_data {
851 ($node: expr, $filepath: expr) => {
852 let mut expected_bytes = Vec::new();
854 expected_bytes.clear();
855 match $node.write(&mut expected_bytes) {
857 match std::fs::read($filepath) {
859 if bytes == expected_bytes {
868 Err(e) => panic!("Unexpected error: {}", e)
874 // Check that the initial channel manager data is persisted as expected.
875 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
876 check_persisted_data!(nodes[0].node, filepath.clone());
879 if !nodes[0].node.get_persistence_condvar_value() { break }
882 // Force-close the channel.
883 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
885 // Check that the force-close updates are persisted.
886 check_persisted_data!(nodes[0].node, filepath.clone());
888 if !nodes[0].node.get_persistence_condvar_value() { break }
891 // Check network graph is persisted
892 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
893 check_persisted_data!(nodes[0].network_graph, filepath.clone());
895 // Check scorer is persisted
896 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
897 check_persisted_data!(nodes[0].scorer, filepath.clone());
899 assert!(bg_processor.stop().is_ok());
903 fn test_timer_tick_called() {
904 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
905 // `FRESHNESS_TIMER`.
906 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
907 let data_dir = nodes[0].persister.get_data_dir();
908 let persister = Arc::new(Persister::new(data_dir));
909 let event_handler = |_: _| {};
910 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
912 let log_entries = nodes[0].logger.lines.lock().unwrap();
913 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
914 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
915 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
916 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
921 assert!(bg_processor.stop().is_ok());
925 fn test_channel_manager_persist_error() {
926 // Test that if we encounter an error during manager persistence, the thread panics.
927 let nodes = create_nodes(2, "test_persist_error".to_string());
928 open_channel!(nodes[0], nodes[1], 100000);
930 let data_dir = nodes[0].persister.get_data_dir();
931 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
932 let event_handler = |_: _| {};
933 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
934 match bg_processor.join() {
935 Ok(_) => panic!("Expected error persisting manager"),
937 assert_eq!(e.kind(), std::io::ErrorKind::Other);
938 assert_eq!(e.get_ref().unwrap().to_string(), "test");
944 fn test_network_graph_persist_error() {
945 // Test that if we encounter an error during network graph persistence, an error gets returned.
946 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
947 let data_dir = nodes[0].persister.get_data_dir();
948 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
949 let event_handler = |_: _| {};
950 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
952 match bg_processor.stop() {
953 Ok(_) => panic!("Expected error persisting network graph"),
955 assert_eq!(e.kind(), std::io::ErrorKind::Other);
956 assert_eq!(e.get_ref().unwrap().to_string(), "test");
962 fn test_scorer_persist_error() {
963 // Test that if we encounter an error during scorer persistence, an error gets returned.
964 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
965 let data_dir = nodes[0].persister.get_data_dir();
966 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
967 let event_handler = |_: _| {};
968 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
970 match bg_processor.stop() {
971 Ok(_) => panic!("Expected error persisting scorer"),
973 assert_eq!(e.kind(), std::io::ErrorKind::Other);
974 assert_eq!(e.get_ref().unwrap().to_string(), "test");
980 fn test_background_event_handling() {
981 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
982 let channel_value = 100000;
983 let data_dir = nodes[0].persister.get_data_dir();
984 let persister = Arc::new(Persister::new(data_dir.clone()));
986 // Set up a background event handler for FundingGenerationReady events.
987 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
988 let event_handler = move |event: Event| match event {
989 Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
990 Event::ChannelReady { .. } => {},
991 _ => panic!("Unexpected event: {:?}", event),
994 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
996 // Open a channel and check that the FundingGenerationReady event was handled.
997 begin_open_channel!(nodes[0], nodes[1], channel_value);
998 let (temporary_channel_id, funding_tx) = receiver
999 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1000 .expect("FundingGenerationReady not handled within deadline");
1001 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1003 // Confirm the funding transaction.
1004 confirm_transaction(&mut nodes[0], &funding_tx);
1005 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1006 confirm_transaction(&mut nodes[1], &funding_tx);
1007 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1008 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1009 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1010 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1011 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1013 assert!(bg_processor.stop().is_ok());
1015 // Set up a background event handler for SpendableOutputs events.
1016 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1017 let event_handler = move |event: Event| match event {
1018 Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1019 Event::ChannelReady { .. } => {},
1020 Event::ChannelClosed { .. } => {},
1021 _ => panic!("Unexpected event: {:?}", event),
1023 let persister = Arc::new(Persister::new(data_dir));
1024 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1026 // Force close the channel and check that the SpendableOutputs event was handled.
1027 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1028 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1029 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1031 let event = receiver
1032 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1033 .expect("Events not handled within deadline");
1035 Event::SpendableOutputs { .. } => {},
1036 _ => panic!("Unexpected event: {:?}", event),
1039 assert!(bg_processor.stop().is_ok());
1043 fn test_scorer_persistence() {
1044 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1045 let data_dir = nodes[0].persister.get_data_dir();
1046 let persister = Arc::new(Persister::new(data_dir));
1047 let event_handler = |_: _| {};
1048 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1051 let log_entries = nodes[0].logger.lines.lock().unwrap();
1052 let expected_log = "Persisting scorer".to_string();
1053 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1058 assert!(bg_processor.stop().is_ok());
1062 fn test_not_pruning_network_graph_until_graph_sync_completion() {
1063 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1064 let data_dir = nodes[0].persister.get_data_dir();
1065 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1066 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1067 let network_graph = nodes[0].network_graph.clone();
1068 let features = ChannelFeatures::empty();
1069 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1070 .expect("Failed to update channel from partial announcement");
1071 let original_graph_description = network_graph.to_string();
1072 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1073 assert_eq!(network_graph.read_only().channels().len(), 1);
1075 let event_handler = |_: _| {};
1076 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1079 let log_entries = nodes[0].logger.lines.lock().unwrap();
1080 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1081 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1084 // Wait until the loop has gone around at least twice.
1089 let initialization_input = vec![
1090 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1091 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1092 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1093 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1094 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1095 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1096 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1097 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1098 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1099 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1100 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1101 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1102 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1104 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1106 // this should have added two channels
1107 assert_eq!(network_graph.read_only().channels().len(), 3);
1110 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1111 .expect("Network graph not pruned within deadline");
1113 background_processor.stop().unwrap();
1115 // all channels should now be pruned
1116 assert_eq!(network_graph.read_only().channels().len(), 0);
1120 fn test_invoice_payer() {
1121 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1122 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1123 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1125 // Initiate the background processors to watch each node.
1126 let data_dir = nodes[0].persister.get_data_dir();
1127 let persister = Arc::new(Persister::new(data_dir));
1128 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1129 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1130 let event_handler = Arc::clone(&invoice_payer);
1131 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1132 assert!(bg_processor.stop().is_ok());