]> git.bitcoin.ninja Git - rust-lightning/blob - lightning-background-processor/src/lib.rs
603dc545eb86129946506a0d43c4d0ad02af8101
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 #![deny(broken_intra_doc_links)]
6 #![deny(missing_docs)]
7 #![deny(unsafe_code)]
8
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
10
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
13
14 use lightning::chain;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
27 use std::sync::Arc;
28 use std::sync::atomic::{AtomicBool, Ordering};
29 use std::thread;
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
32 use std::ops::Deref;
33
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 ///   writing it to disk/backups by invoking the callback given to it at startup.
40 ///   [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 ///   at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`NetGraphMsgHandler`] is provided to
44 ///   [`BackgroundProcessor::start`]).
45 ///
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
48 ///
49 /// # Note
50 ///
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
55 ///
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60         stop_thread: Arc<AtomicBool>,
61         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
62 }
63
64 #[cfg(not(test))]
65 const FRESHNESS_TIMER: u64 = 60;
66 #[cfg(test)]
67 const FRESHNESS_TIMER: u64 = 1;
68
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
76 #[cfg(test)]
77 const PING_TIMER: u64 = 1;
78
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
81
82 #[cfg(not(test))]
83 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
84 #[cfg(test)]
85 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
86
87
88 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
89 struct DecoratingEventHandler<
90         E: EventHandler,
91         N: Deref<Target = NetGraphMsgHandler<G, A, L>>,
92         G: Deref<Target = NetworkGraph>,
93         A: Deref,
94         L: Deref,
95 >
96 where A::Target: chain::Access, L::Target: Logger {
97         event_handler: E,
98         net_graph_msg_handler: Option<N>,
99 }
100
101 impl<
102         E: EventHandler,
103         N: Deref<Target = NetGraphMsgHandler<G, A, L>>,
104         G: Deref<Target = NetworkGraph>,
105         A: Deref,
106         L: Deref,
107 > EventHandler for DecoratingEventHandler<E, N, G, A, L>
108 where A::Target: chain::Access, L::Target: Logger {
109         fn handle_event(&self, event: &Event) {
110                 if let Some(event_handler) = &self.net_graph_msg_handler {
111                         event_handler.handle_event(event);
112                 }
113                 self.event_handler.handle_event(event);
114         }
115 }
116
117 impl BackgroundProcessor {
118         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
119         /// documentation].
120         ///
121         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
122         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
123         /// either [`join`] or [`stop`].
124         ///
125         /// # Data Persistence
126         ///
127         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
128         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
129         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
130         /// provided implementation.
131         ///
132         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
133         /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See the `lightning-persister` crate
134         /// for LDK's provided implementation.
135         ///
136         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
137         /// error or call [`join`] and handle any error that may arise. For the latter case,
138         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
139         ///
140         /// # Event Handling
141         ///
142         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
143         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
144         /// functionality implemented by other handlers.
145         /// * [`NetGraphMsgHandler`] if given will update the [`NetworkGraph`] based on payment failures.
146         ///
147         /// # Rapid Gossip Sync
148         ///
149         /// If rapid gossip sync is meant to run at startup, pass an optional [`RapidGossipSync`]
150         /// to `rapid_gossip_sync` to indicate to [`BackgroundProcessor`] not to prune the
151         /// [`NetworkGraph`] instance until the [`RapidGossipSync`] instance completes its first sync.
152         ///
153         /// [top-level documentation]: BackgroundProcessor
154         /// [`join`]: Self::join
155         /// [`stop`]: Self::stop
156         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
157         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
158         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
159         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
160         /// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
161         /// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
162         pub fn start<
163                 'a,
164                 Signer: 'static + Sign,
165                 CA: 'static + Deref + Send + Sync,
166                 CF: 'static + Deref + Send + Sync,
167                 CW: 'static + Deref + Send + Sync,
168                 T: 'static + Deref + Send + Sync,
169                 K: 'static + Deref + Send + Sync,
170                 F: 'static + Deref + Send + Sync,
171                 G: 'static + Deref<Target = NetworkGraph> + Send + Sync,
172                 L: 'static + Deref + Send + Sync,
173                 P: 'static + Deref + Send + Sync,
174                 Descriptor: 'static + SocketDescriptor + Send + Sync,
175                 CMH: 'static + Deref + Send + Sync,
176                 RMH: 'static + Deref + Send + Sync,
177                 EH: 'static + EventHandler + Send,
178                 PS: 'static + Deref + Send,
179                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
180                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
181                 NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
182                 UMH: 'static + Deref + Send + Sync,
183                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
184                 S: 'static + Deref<Target = SC> + Send + Sync,
185                 SC: WriteableScore<'a>,
186                 RGS: 'static + Deref<Target = RapidGossipSync<G>> + Send
187         >(
188                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
189                 net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>,
190                 rapid_gossip_sync: Option<RGS>
191         ) -> Self
192         where
193                 CA::Target: 'static + chain::Access,
194                 CF::Target: 'static + chain::Filter,
195                 CW::Target: 'static + chain::Watch<Signer>,
196                 T::Target: 'static + BroadcasterInterface,
197                 K::Target: 'static + KeysInterface<Signer = Signer>,
198                 F::Target: 'static + FeeEstimator,
199                 L::Target: 'static + Logger,
200                 P::Target: 'static + Persist<Signer>,
201                 CMH::Target: 'static + ChannelMessageHandler,
202                 RMH::Target: 'static + RoutingMessageHandler,
203                 UMH::Target: 'static + CustomMessageHandler,
204                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
205         {
206                 let stop_thread = Arc::new(AtomicBool::new(false));
207                 let stop_thread_clone = stop_thread.clone();
208                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
209                         let event_handler = DecoratingEventHandler { event_handler, net_graph_msg_handler: net_graph_msg_handler.as_ref().map(|t| t.deref()) };
210
211                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
212                         channel_manager.timer_tick_occurred();
213
214                         let mut last_freshness_call = Instant::now();
215                         let mut last_ping_call = Instant::now();
216                         let mut last_prune_call = Instant::now();
217                         let mut have_pruned = false;
218
219                         loop {
220                                 channel_manager.process_pending_events(&event_handler);
221                                 chain_monitor.process_pending_events(&event_handler);
222
223                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
224                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
225                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
226                                 // without running the normal event processing above and handing events to users.
227                                 //
228                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
229                                 // processing a message effectively at any point during this loop. In order to
230                                 // minimize the time between such processing completing and persisting the updated
231                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
232                                 // generally, and as a fallback place such blocking only immediately before
233                                 // persistence.
234                                 peer_manager.process_events();
235
236                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
237                                 // see `await_start`'s use below.
238                                 let await_start = Instant::now();
239                                 let updates_available =
240                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
241                                 let await_time = await_start.elapsed();
242
243                                 if updates_available {
244                                         log_trace!(logger, "Persisting ChannelManager...");
245                                         persister.persist_manager(&*channel_manager)?;
246                                         log_trace!(logger, "Done persisting ChannelManager.");
247                                 }
248                                 // Exit the loop if the background processor was requested to stop.
249                                 if stop_thread.load(Ordering::Acquire) == true {
250                                         log_trace!(logger, "Terminating background processor.");
251                                         break;
252                                 }
253                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
254                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
255                                         channel_manager.timer_tick_occurred();
256                                         last_freshness_call = Instant::now();
257                                 }
258                                 if await_time > Duration::from_secs(1) {
259                                         // On various platforms, we may be starved of CPU cycles for several reasons.
260                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
261                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
262                                         // may not get any cycles.
263                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
264                                         // full second, at which point we assume sockets may have been killed (they
265                                         // appear to be at least on some platforms, even if it has only been a second).
266                                         // Note that we have to take care to not get here just because user event
267                                         // processing was slow at the top of the loop. For example, the sample client
268                                         // may call Bitcoin Core RPCs during event handling, which very often takes
269                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
270                                         // peers.
271                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
272                                         peer_manager.disconnect_all_peers();
273                                         last_ping_call = Instant::now();
274                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
275                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
276                                         peer_manager.timer_tick_occurred();
277                                         last_ping_call = Instant::now();
278                                 }
279
280                                 // Note that we want to run a graph prune once not long after startup before
281                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
282                                 // pruning their network graph. We run once 60 seconds after startup before
283                                 // continuing our normal cadence.
284                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
285                                         // The network graph must not be pruned while rapid sync completion is pending
286                                         log_trace!(logger, "Assessing prunability of network graph");
287                                         let graph_to_prune = match rapid_gossip_sync.as_ref() {
288                                                 Some(rapid_sync) => {
289                                                         if rapid_sync.is_initial_sync_complete() {
290                                                                 Some(rapid_sync.network_graph())
291                                                         } else {
292                                                                 None
293                                                         }
294                                                 },
295                                                 None => net_graph_msg_handler.as_ref().map(|handler| handler.network_graph())
296                                         };
297
298                                         if let Some(network_graph_reference) = graph_to_prune {
299                                                 network_graph_reference.remove_stale_channels();
300
301                                                 if let Err(e) = persister.persist_graph(network_graph_reference) {
302                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
303                                                 }
304
305                                                 last_prune_call = Instant::now();
306                                                 have_pruned = true;
307                                         } else {
308                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
309                                         }
310                                         if let Some(ref scorer) = scorer {
311                                                 log_trace!(logger, "Persisting scorer");
312                                                 if let Err(e) = persister.persist_scorer(&scorer) {
313                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
314                                                 }
315                                         }
316                                 }
317                         }
318
319                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
320                         // some races where users quit while channel updates were in-flight, with
321                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
322                         persister.persist_manager(&*channel_manager)?;
323
324                         // Persist Scorer on exit
325                         if let Some(ref scorer) = scorer {
326                                 persister.persist_scorer(&scorer)?;
327                         }
328
329                         // Persist NetworkGraph on exit
330                         if let Some(ref handler) = net_graph_msg_handler {
331                                 persister.persist_graph(handler.network_graph())?;
332                         }
333
334                         Ok(())
335                 });
336                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
337         }
338
339         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
340         /// [`ChannelManager`].
341         ///
342         /// # Panics
343         ///
344         /// This function panics if the background thread has panicked such as while persisting or
345         /// handling events.
346         ///
347         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
348         pub fn join(mut self) -> Result<(), std::io::Error> {
349                 assert!(self.thread_handle.is_some());
350                 self.join_thread()
351         }
352
353         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
354         /// [`ChannelManager`].
355         ///
356         /// # Panics
357         ///
358         /// This function panics if the background thread has panicked such as while persisting or
359         /// handling events.
360         ///
361         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
362         pub fn stop(mut self) -> Result<(), std::io::Error> {
363                 assert!(self.thread_handle.is_some());
364                 self.stop_and_join_thread()
365         }
366
367         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
368                 self.stop_thread.store(true, Ordering::Release);
369                 self.join_thread()
370         }
371
372         fn join_thread(&mut self) -> Result<(), std::io::Error> {
373                 match self.thread_handle.take() {
374                         Some(handle) => handle.join().unwrap(),
375                         None => Ok(()),
376                 }
377         }
378 }
379
380 impl Drop for BackgroundProcessor {
381         fn drop(&mut self) {
382                 self.stop_and_join_thread().unwrap();
383         }
384 }
385
386 #[cfg(test)]
387 mod tests {
388         use bitcoin::blockdata::block::BlockHeader;
389         use bitcoin::blockdata::constants::genesis_block;
390         use bitcoin::blockdata::transaction::{Transaction, TxOut};
391         use bitcoin::network::constants::Network;
392         use lightning::chain::{BestBlock, Confirm, chainmonitor};
393         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
394         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
395         use lightning::chain::transaction::OutPoint;
396         use lightning::get_event_msg;
397         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
398         use lightning::ln::features::{ChannelFeatures, InitFeatures};
399         use lightning::ln::msgs::{ChannelMessageHandler, Init};
400         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
401         use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
402         use lightning::util::config::UserConfig;
403         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
404         use lightning::util::ser::Writeable;
405         use lightning::util::test_utils;
406         use lightning::util::persist::KVStorePersister;
407         use lightning_invoice::payment::{InvoicePayer, Retry};
408         use lightning_invoice::utils::DefaultRouter;
409         use lightning_persister::FilesystemPersister;
410         use std::fs;
411         use std::path::PathBuf;
412         use std::sync::{Arc, Mutex};
413         use std::sync::mpsc::SyncSender;
414         use std::time::Duration;
415         use lightning::routing::scoring::{FixedPenaltyScorer};
416         use lightning_rapid_gossip_sync::RapidGossipSync;
417         use super::{BackgroundProcessor, FRESHNESS_TIMER};
418
419         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
420
421         #[derive(Clone, Eq, Hash, PartialEq)]
422         struct TestDescriptor{}
423         impl SocketDescriptor for TestDescriptor {
424                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
425                         0
426                 }
427
428                 fn disconnect_socket(&mut self) {}
429         }
430
431         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
432
433         struct Node {
434                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
435                 net_graph_msg_handler: Option<Arc<NetGraphMsgHandler<Arc<NetworkGraph>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>>,
436                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
437                 chain_monitor: Arc<ChainMonitor>,
438                 persister: Arc<FilesystemPersister>,
439                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
440                 network_graph: Arc<NetworkGraph>,
441                 logger: Arc<test_utils::TestLogger>,
442                 best_block: BestBlock,
443                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
444                 rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>>
445         }
446
447         impl Drop for Node {
448                 fn drop(&mut self) {
449                         let data_dir = self.persister.get_data_dir();
450                         match fs::remove_dir_all(data_dir.clone()) {
451                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
452                                 _ => {}
453                         }
454                 }
455         }
456
457         struct Persister {
458                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
459                 graph_persistence_notifier: Option<SyncSender<()>>,
460                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
461                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
462                 filesystem_persister: FilesystemPersister,
463         }
464
465         impl Persister {
466                 fn new(data_dir: String) -> Self {
467                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
468                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
469                 }
470
471                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
472                         Self { graph_error: Some((error, message)), ..self }
473                 }
474
475                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
476                         Self { graph_persistence_notifier: Some(sender), ..self }
477                 }
478
479                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
480                         Self { manager_error: Some((error, message)), ..self }
481                 }
482
483                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
484                         Self { scorer_error: Some((error, message)), ..self }
485                 }
486         }
487
488         impl KVStorePersister for Persister {
489                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
490                         if key == "manager" {
491                                 if let Some((error, message)) = self.manager_error {
492                                         return Err(std::io::Error::new(error, message))
493                                 }
494                         }
495
496                         if key == "network_graph" {
497                                 if let Some(sender) = &self.graph_persistence_notifier {
498                                         sender.send(()).unwrap();
499                                 };
500
501                                 if let Some((error, message)) = self.graph_error {
502                                         return Err(std::io::Error::new(error, message))
503                                 }
504                         }
505
506                         if key == "scorer" {
507                                 if let Some((error, message)) = self.scorer_error {
508                                         return Err(std::io::Error::new(error, message))
509                                 }
510                         }
511
512                         self.filesystem_persister.persist(key, object)
513                 }
514         }
515
516         fn get_full_filepath(filepath: String, filename: String) -> String {
517                 let mut path = PathBuf::from(filepath);
518                 path.push(filename);
519                 path.to_str().unwrap().to_string()
520         }
521
522         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
523                 let mut nodes = Vec::new();
524                 for i in 0..num_nodes {
525                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
526                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
527                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
528                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
529                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
530                         let seed = [i as u8; 32];
531                         let network = Network::Testnet;
532                         let genesis_block = genesis_block(network);
533                         let now = Duration::from_secs(genesis_block.header.time as u64);
534                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
535                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
536                         let best_block = BestBlock::from_genesis(network);
537                         let params = ChainParameters { network, best_block };
538                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
539                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash()));
540                         let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
541                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
542                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
543                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
544                         let rapid_gossip_sync = None;
545                         let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer, rapid_gossip_sync };
546                         nodes.push(node);
547                 }
548
549                 for i in 0..num_nodes {
550                         for j in (i+1)..num_nodes {
551                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
552                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
553                         }
554                 }
555
556                 nodes
557         }
558
559         macro_rules! open_channel {
560                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
561                         begin_open_channel!($node_a, $node_b, $channel_value);
562                         let events = $node_a.node.get_and_clear_pending_events();
563                         assert_eq!(events.len(), 1);
564                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
565                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
566                         tx
567                 }}
568         }
569
570         macro_rules! begin_open_channel {
571                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
572                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
573                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
574                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
575                 }}
576         }
577
578         macro_rules! handle_funding_generation_ready {
579                 ($event: expr, $channel_value: expr) => {{
580                         match $event {
581                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
582                                         assert_eq!(channel_value_satoshis, $channel_value);
583                                         assert_eq!(user_channel_id, 42);
584
585                                         let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
586                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
587                                         }]};
588                                         (temporary_channel_id, tx)
589                                 },
590                                 _ => panic!("Unexpected event"),
591                         }
592                 }}
593         }
594
595         macro_rules! end_open_channel {
596                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
597                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
598                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
599                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
600                 }}
601         }
602
603         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
604                 for i in 1..=depth {
605                         let prev_blockhash = node.best_block.block_hash();
606                         let height = node.best_block.height() + 1;
607                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
608                         let txdata = vec![(0, tx)];
609                         node.best_block = BestBlock::new(header.block_hash(), height);
610                         match i {
611                                 1 => {
612                                         node.node.transactions_confirmed(&header, &txdata, height);
613                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
614                                 },
615                                 x if x == depth => {
616                                         node.node.best_block_updated(&header, height);
617                                         node.chain_monitor.best_block_updated(&header, height);
618                                 },
619                                 _ => {},
620                         }
621                 }
622         }
623         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
624                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
625         }
626
627         #[test]
628         fn test_background_processor() {
629                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
630                 // updates. Also test that when new updates are available, the manager signals that it needs
631                 // re-persistence and is successfully re-persisted.
632                 let nodes = create_nodes(2, "test_background_processor".to_string());
633
634                 // Go through the channel creation process so that each node has something to persist. Since
635                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
636                 // avoid a race with processing events.
637                 let tx = open_channel!(nodes[0], nodes[1], 100000);
638
639                 // Initiate the background processors to watch each node.
640                 let data_dir = nodes[0].persister.get_data_dir();
641                 let persister = Arc::new(Persister::new(data_dir));
642                 let event_handler = |_: &_| {};
643                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
644
645                 macro_rules! check_persisted_data {
646                         ($node: expr, $filepath: expr) => {
647                                 let mut expected_bytes = Vec::new();
648                                 loop {
649                                         expected_bytes.clear();
650                                         match $node.write(&mut expected_bytes) {
651                                                 Ok(()) => {
652                                                         match std::fs::read($filepath) {
653                                                                 Ok(bytes) => {
654                                                                         if bytes == expected_bytes {
655                                                                                 break
656                                                                         } else {
657                                                                                 continue
658                                                                         }
659                                                                 },
660                                                                 Err(_) => continue
661                                                         }
662                                                 },
663                                                 Err(e) => panic!("Unexpected error: {}", e)
664                                         }
665                                 }
666                         }
667                 }
668
669                 // Check that the initial channel manager data is persisted as expected.
670                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
671                 check_persisted_data!(nodes[0].node, filepath.clone());
672
673                 loop {
674                         if !nodes[0].node.get_persistence_condvar_value() { break }
675                 }
676
677                 // Force-close the channel.
678                 nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
679
680                 // Check that the force-close updates are persisted.
681                 check_persisted_data!(nodes[0].node, filepath.clone());
682                 loop {
683                         if !nodes[0].node.get_persistence_condvar_value() { break }
684                 }
685
686                 // Check network graph is persisted
687                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
688                 if let Some(ref handler) = nodes[0].net_graph_msg_handler {
689                         let network_graph = handler.network_graph();
690                         check_persisted_data!(network_graph, filepath.clone());
691                 }
692
693                 // Check scorer is persisted
694                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
695                 check_persisted_data!(nodes[0].scorer, filepath.clone());
696
697                 assert!(bg_processor.stop().is_ok());
698         }
699
700         #[test]
701         fn test_timer_tick_called() {
702                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
703                 // `FRESHNESS_TIMER`.
704                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
705                 let data_dir = nodes[0].persister.get_data_dir();
706                 let persister = Arc::new(Persister::new(data_dir));
707                 let event_handler = |_: &_| {};
708                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
709                 loop {
710                         let log_entries = nodes[0].logger.lines.lock().unwrap();
711                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
712                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
713                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
714                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
715                                 break
716                         }
717                 }
718
719                 assert!(bg_processor.stop().is_ok());
720         }
721
722         #[test]
723         fn test_channel_manager_persist_error() {
724                 // Test that if we encounter an error during manager persistence, the thread panics.
725                 let nodes = create_nodes(2, "test_persist_error".to_string());
726                 open_channel!(nodes[0], nodes[1], 100000);
727
728                 let data_dir = nodes[0].persister.get_data_dir();
729                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
730                 let event_handler = |_: &_| {};
731                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
732                 match bg_processor.join() {
733                         Ok(_) => panic!("Expected error persisting manager"),
734                         Err(e) => {
735                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
736                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
737                         },
738                 }
739         }
740
741         #[test]
742         fn test_network_graph_persist_error() {
743                 // Test that if we encounter an error during network graph persistence, an error gets returned.
744                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
745                 let data_dir = nodes[0].persister.get_data_dir();
746                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
747                 let event_handler = |_: &_| {};
748                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
749
750                 match bg_processor.stop() {
751                         Ok(_) => panic!("Expected error persisting network graph"),
752                         Err(e) => {
753                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
754                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
755                         },
756                 }
757         }
758
759         #[test]
760         fn test_scorer_persist_error() {
761                 // Test that if we encounter an error during scorer persistence, an error gets returned.
762                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
763                 let data_dir = nodes[0].persister.get_data_dir();
764                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
765                 let event_handler = |_: &_| {};
766                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
767
768                 match bg_processor.stop() {
769                         Ok(_) => panic!("Expected error persisting scorer"),
770                         Err(e) => {
771                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
772                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
773                         },
774                 }
775         }
776
777         #[test]
778         fn test_background_event_handling() {
779                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
780                 let channel_value = 100000;
781                 let data_dir = nodes[0].persister.get_data_dir();
782                 let persister = Arc::new(Persister::new(data_dir.clone()));
783
784                 // Set up a background event handler for FundingGenerationReady events.
785                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
786                 let event_handler = move |event: &Event| {
787                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
788                 };
789                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
790
791                 // Open a channel and check that the FundingGenerationReady event was handled.
792                 begin_open_channel!(nodes[0], nodes[1], channel_value);
793                 let (temporary_channel_id, funding_tx) = receiver
794                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
795                         .expect("FundingGenerationReady not handled within deadline");
796                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
797
798                 // Confirm the funding transaction.
799                 confirm_transaction(&mut nodes[0], &funding_tx);
800                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
801                 confirm_transaction(&mut nodes[1], &funding_tx);
802                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
803                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
804                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
805                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
806                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
807
808                 assert!(bg_processor.stop().is_ok());
809
810                 // Set up a background event handler for SpendableOutputs events.
811                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
812                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
813                 let persister = Arc::new(Persister::new(data_dir));
814                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
815
816                 // Force close the channel and check that the SpendableOutputs event was handled.
817                 nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
818                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
819                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
820                 let event = receiver
821                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
822                         .expect("SpendableOutputs not handled within deadline");
823                 match event {
824                         Event::SpendableOutputs { .. } => {},
825                         Event::ChannelClosed { .. } => {},
826                         _ => panic!("Unexpected event: {:?}", event),
827                 }
828
829                 assert!(bg_processor.stop().is_ok());
830         }
831
832         #[test]
833         fn test_scorer_persistence() {
834                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
835                 let data_dir = nodes[0].persister.get_data_dir();
836                 let persister = Arc::new(Persister::new(data_dir));
837                 let event_handler = |_: &_| {};
838                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
839
840                 loop {
841                         let log_entries = nodes[0].logger.lines.lock().unwrap();
842                         let expected_log = "Persisting scorer".to_string();
843                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
844                                 break
845                         }
846                 }
847
848                 assert!(bg_processor.stop().is_ok());
849         }
850
851         #[test]
852         fn test_not_pruning_network_graph_until_graph_sync_completion() {
853                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
854                 let data_dir = nodes[0].persister.get_data_dir();
855                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
856                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
857                 let network_graph = nodes[0].network_graph.clone();
858                 let rapid_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
859                 let features = ChannelFeatures::empty();
860                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
861                         .expect("Failed to update channel from partial announcement");
862                 let original_graph_description = network_graph.to_string();
863                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
864                 assert_eq!(network_graph.read_only().channels().len(), 1);
865
866                 let event_handler = |_: &_| {};
867                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), Some(rapid_sync.clone()));
868
869                 loop {
870                         let log_entries = nodes[0].logger.lines.lock().unwrap();
871                         let expected_log_a = "Assessing prunability of network graph".to_string();
872                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
873                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
874                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
875                                 break
876                         }
877                 }
878
879                 let initialization_input = vec![
880                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
881                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
882                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
883                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
884                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
885                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
886                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
887                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
888                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
889                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
890                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
891                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
892                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
893                 ];
894                 rapid_sync.update_network_graph(&initialization_input[..]).unwrap();
895
896                 // this should have added two channels
897                 assert_eq!(network_graph.read_only().channels().len(), 3);
898
899                 let _ = receiver
900                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
901                         .expect("Network graph not pruned within deadline");
902
903                 background_processor.stop().unwrap();
904
905                 // all channels should now be pruned
906                 assert_eq!(network_graph.read_only().channels().len(), 0);
907         }
908
909         #[test]
910         fn test_invoice_payer() {
911                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
912                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
913                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
914
915                 // Initiate the background processors to watch each node.
916                 let data_dir = nodes[0].persister.get_data_dir();
917                 let persister = Arc::new(Persister::new(data_dir));
918                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
919                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
920                 let event_handler = Arc::clone(&invoice_payer);
921                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
922                 assert!(bg_processor.stop().is_ok());
923         }
924 }