2e947a7041afdd92c4c8ac86c053c6781de24f26
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 #![deny(broken_intra_doc_links)]
6 #![deny(missing_docs)]
7 #![deny(unsafe_code)]
8
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
10
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
13
14 use lightning::chain;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
27 use std::sync::Arc;
28 use std::sync::atomic::{AtomicBool, Ordering};
29 use std::thread;
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
32 use std::ops::Deref;
33
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 ///   writing it to disk/backups by invoking the callback given to it at startup.
40 ///   [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 ///   at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`P2PGossipSync`] is provided to
44 ///   [`BackgroundProcessor::start`]).
45 ///
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
48 ///
49 /// # Note
50 ///
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
55 ///
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60         stop_thread: Arc<AtomicBool>,
61         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
62 }
63
64 #[cfg(not(test))]
65 const FRESHNESS_TIMER: u64 = 60;
66 #[cfg(test)]
67 const FRESHNESS_TIMER: u64 = 1;
68
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
76 #[cfg(test)]
77 const PING_TIMER: u64 = 1;
78
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
81
82 #[cfg(all(not(test), debug_assertions))]
83 const SCORER_PERSIST_TIMER: u64 = 30;
84 #[cfg(test)]
85 const SCORER_PERSIST_TIMER: u64 = 1;
86
87 #[cfg(not(test))]
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
89 #[cfg(test)]
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
91
92
93 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
94 struct DecoratingEventHandler<
95         E: EventHandler,
96         P: Deref<Target = P2PGossipSync<G, A, L>>,
97         G: Deref<Target = NetworkGraph>,
98         A: Deref,
99         L: Deref,
100 >
101 where A::Target: chain::Access, L::Target: Logger {
102         event_handler: E,
103         p2p_gossip_sync: Option<P>,
104 }
105
106 impl<
107         E: EventHandler,
108         P: Deref<Target = P2PGossipSync<G, A, L>>,
109         G: Deref<Target = NetworkGraph>,
110         A: Deref,
111         L: Deref,
112 > EventHandler for DecoratingEventHandler<E, P, G, A, L>
113 where A::Target: chain::Access, L::Target: Logger {
114         fn handle_event(&self, event: &Event) {
115                 if let Some(event_handler) = &self.p2p_gossip_sync {
116                         event_handler.handle_event(event);
117                 }
118                 self.event_handler.handle_event(event);
119         }
120 }
121
122 impl BackgroundProcessor {
123         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
124         /// documentation].
125         ///
126         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
127         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
128         /// either [`join`] or [`stop`].
129         ///
130         /// # Data Persistence
131         ///
132         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
133         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
134         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
135         /// provided implementation.
136         ///
137         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
138         /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See the `lightning-persister` crate
139         /// for LDK's provided implementation.
140         ///
141         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
142         /// error or call [`join`] and handle any error that may arise. For the latter case,
143         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
144         ///
145         /// # Event Handling
146         ///
147         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
148         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
149         /// functionality implemented by other handlers.
150         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
151         ///
152         /// # Rapid Gossip Sync
153         ///
154         /// If rapid gossip sync is meant to run at startup, pass an optional [`RapidGossipSync`]
155         /// to `rapid_gossip_sync` to indicate to [`BackgroundProcessor`] not to prune the
156         /// [`NetworkGraph`] instance until the [`RapidGossipSync`] instance completes its first sync.
157         ///
158         /// [top-level documentation]: BackgroundProcessor
159         /// [`join`]: Self::join
160         /// [`stop`]: Self::stop
161         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
162         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
163         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
164         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
165         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
166         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
167         pub fn start<
168                 'a,
169                 Signer: 'static + Sign,
170                 CA: 'static + Deref + Send + Sync,
171                 CF: 'static + Deref + Send + Sync,
172                 CW: 'static + Deref + Send + Sync,
173                 T: 'static + Deref + Send + Sync,
174                 K: 'static + Deref + Send + Sync,
175                 F: 'static + Deref + Send + Sync,
176                 G: 'static + Deref<Target = NetworkGraph> + Send + Sync,
177                 L: 'static + Deref + Send + Sync,
178                 P: 'static + Deref + Send + Sync,
179                 Descriptor: 'static + SocketDescriptor + Send + Sync,
180                 CMH: 'static + Deref + Send + Sync,
181                 RMH: 'static + Deref + Send + Sync,
182                 EH: 'static + EventHandler + Send,
183                 PS: 'static + Deref + Send,
184                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
185                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
186                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
187                 UMH: 'static + Deref + Send + Sync,
188                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
189                 S: 'static + Deref<Target = SC> + Send + Sync,
190                 SC: WriteableScore<'a>,
191                 RGS: 'static + Deref<Target = RapidGossipSync<G>> + Send
192         >(
193                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
194                 p2p_gossip_sync: Option<PGS>, peer_manager: PM, logger: L, scorer: Option<S>,
195                 rapid_gossip_sync: Option<RGS>
196         ) -> Self
197         where
198                 CA::Target: 'static + chain::Access,
199                 CF::Target: 'static + chain::Filter,
200                 CW::Target: 'static + chain::Watch<Signer>,
201                 T::Target: 'static + BroadcasterInterface,
202                 K::Target: 'static + KeysInterface<Signer = Signer>,
203                 F::Target: 'static + FeeEstimator,
204                 L::Target: 'static + Logger,
205                 P::Target: 'static + Persist<Signer>,
206                 CMH::Target: 'static + ChannelMessageHandler,
207                 RMH::Target: 'static + RoutingMessageHandler,
208                 UMH::Target: 'static + CustomMessageHandler,
209                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
210         {
211                 let stop_thread = Arc::new(AtomicBool::new(false));
212                 let stop_thread_clone = stop_thread.clone();
213                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
214                         let event_handler = DecoratingEventHandler { event_handler, p2p_gossip_sync: p2p_gossip_sync.as_ref().map(|t| t.deref()) };
215
216                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
217                         channel_manager.timer_tick_occurred();
218
219                         let mut last_freshness_call = Instant::now();
220                         let mut last_ping_call = Instant::now();
221                         let mut last_prune_call = Instant::now();
222                         let mut last_scorer_persist_call = Instant::now();
223                         let mut have_pruned = false;
224
225                         loop {
226                                 channel_manager.process_pending_events(&event_handler);
227                                 chain_monitor.process_pending_events(&event_handler);
228
229                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
230                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
231                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
232                                 // without running the normal event processing above and handing events to users.
233                                 //
234                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
235                                 // processing a message effectively at any point during this loop. In order to
236                                 // minimize the time between such processing completing and persisting the updated
237                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
238                                 // generally, and as a fallback place such blocking only immediately before
239                                 // persistence.
240                                 peer_manager.process_events();
241
242                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
243                                 // see `await_start`'s use below.
244                                 let await_start = Instant::now();
245                                 let updates_available =
246                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
247                                 let await_time = await_start.elapsed();
248
249                                 if updates_available {
250                                         log_trace!(logger, "Persisting ChannelManager...");
251                                         persister.persist_manager(&*channel_manager)?;
252                                         log_trace!(logger, "Done persisting ChannelManager.");
253                                 }
254                                 // Exit the loop if the background processor was requested to stop.
255                                 if stop_thread.load(Ordering::Acquire) == true {
256                                         log_trace!(logger, "Terminating background processor.");
257                                         break;
258                                 }
259                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
260                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
261                                         channel_manager.timer_tick_occurred();
262                                         last_freshness_call = Instant::now();
263                                 }
264                                 if await_time > Duration::from_secs(1) {
265                                         // On various platforms, we may be starved of CPU cycles for several reasons.
266                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
267                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
268                                         // may not get any cycles.
269                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
270                                         // full second, at which point we assume sockets may have been killed (they
271                                         // appear to be at least on some platforms, even if it has only been a second).
272                                         // Note that we have to take care to not get here just because user event
273                                         // processing was slow at the top of the loop. For example, the sample client
274                                         // may call Bitcoin Core RPCs during event handling, which very often takes
275                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
276                                         // peers.
277                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
278                                         peer_manager.disconnect_all_peers();
279                                         last_ping_call = Instant::now();
280                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
281                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
282                                         peer_manager.timer_tick_occurred();
283                                         last_ping_call = Instant::now();
284                                 }
285
286                                 // Note that we want to run a graph prune once not long after startup before
287                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
288                                 // pruning their network graph. We run once 60 seconds after startup before
289                                 // continuing our normal cadence.
290                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
291                                         // The network graph must not be pruned while rapid sync completion is pending
292                                         log_trace!(logger, "Assessing prunability of network graph");
293                                         let graph_to_prune = match rapid_gossip_sync.as_ref() {
294                                                 Some(rapid_sync) => {
295                                                         if rapid_sync.is_initial_sync_complete() {
296                                                                 Some(rapid_sync.network_graph())
297                                                         } else {
298                                                                 None
299                                                         }
300                                                 },
301                                                 None => p2p_gossip_sync.as_ref().map(|sync| sync.network_graph())
302                                         };
303
304                                         if let Some(network_graph_reference) = graph_to_prune {
305                                                 network_graph_reference.remove_stale_channels();
306
307                                                 if let Err(e) = persister.persist_graph(network_graph_reference) {
308                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
309                                                 }
310
311                                                 last_prune_call = Instant::now();
312                                                 have_pruned = true;
313                                         } else {
314                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
315                                         }
316                                 }
317
318                                 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
319                                         if let Some(ref scorer) = scorer {
320                                                 log_trace!(logger, "Persisting scorer");
321                                                 if let Err(e) = persister.persist_scorer(&scorer) {
322                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
323                                                 }
324                                         }
325                                         last_scorer_persist_call = Instant::now();
326                                 }
327                         }
328
329                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
330                         // some races where users quit while channel updates were in-flight, with
331                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
332                         persister.persist_manager(&*channel_manager)?;
333
334                         // Persist Scorer on exit
335                         if let Some(ref scorer) = scorer {
336                                 persister.persist_scorer(&scorer)?;
337                         }
338
339                         // Persist NetworkGraph on exit
340                         if let Some(ref gossip_sync) = p2p_gossip_sync {
341                                 persister.persist_graph(gossip_sync.network_graph())?;
342                         }
343
344                         Ok(())
345                 });
346                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
347         }
348
349         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
350         /// [`ChannelManager`].
351         ///
352         /// # Panics
353         ///
354         /// This function panics if the background thread has panicked such as while persisting or
355         /// handling events.
356         ///
357         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
358         pub fn join(mut self) -> Result<(), std::io::Error> {
359                 assert!(self.thread_handle.is_some());
360                 self.join_thread()
361         }
362
363         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
364         /// [`ChannelManager`].
365         ///
366         /// # Panics
367         ///
368         /// This function panics if the background thread has panicked such as while persisting or
369         /// handling events.
370         ///
371         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
372         pub fn stop(mut self) -> Result<(), std::io::Error> {
373                 assert!(self.thread_handle.is_some());
374                 self.stop_and_join_thread()
375         }
376
377         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
378                 self.stop_thread.store(true, Ordering::Release);
379                 self.join_thread()
380         }
381
382         fn join_thread(&mut self) -> Result<(), std::io::Error> {
383                 match self.thread_handle.take() {
384                         Some(handle) => handle.join().unwrap(),
385                         None => Ok(()),
386                 }
387         }
388 }
389
390 impl Drop for BackgroundProcessor {
391         fn drop(&mut self) {
392                 self.stop_and_join_thread().unwrap();
393         }
394 }
395
396 #[cfg(test)]
397 mod tests {
398         use bitcoin::blockdata::block::BlockHeader;
399         use bitcoin::blockdata::constants::genesis_block;
400         use bitcoin::blockdata::transaction::{Transaction, TxOut};
401         use bitcoin::network::constants::Network;
402         use lightning::chain::{BestBlock, Confirm, chainmonitor};
403         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
404         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
405         use lightning::chain::transaction::OutPoint;
406         use lightning::get_event_msg;
407         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
408         use lightning::ln::features::{ChannelFeatures, InitFeatures};
409         use lightning::ln::msgs::{ChannelMessageHandler, Init};
410         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
411         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
412         use lightning::util::config::UserConfig;
413         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
414         use lightning::util::ser::Writeable;
415         use lightning::util::test_utils;
416         use lightning::util::persist::KVStorePersister;
417         use lightning_invoice::payment::{InvoicePayer, Retry};
418         use lightning_invoice::utils::DefaultRouter;
419         use lightning_persister::FilesystemPersister;
420         use std::fs;
421         use std::path::PathBuf;
422         use std::sync::{Arc, Mutex};
423         use std::sync::mpsc::SyncSender;
424         use std::time::Duration;
425         use lightning::routing::scoring::{FixedPenaltyScorer};
426         use lightning_rapid_gossip_sync::RapidGossipSync;
427         use super::{BackgroundProcessor, FRESHNESS_TIMER};
428
429         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
430
431         #[derive(Clone, Eq, Hash, PartialEq)]
432         struct TestDescriptor{}
433         impl SocketDescriptor for TestDescriptor {
434                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
435                         0
436                 }
437
438                 fn disconnect_socket(&mut self) {}
439         }
440
441         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
442
443         struct Node {
444                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
445                 p2p_gossip_sync: Option<Arc<P2PGossipSync<Arc<NetworkGraph>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>>,
446                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
447                 chain_monitor: Arc<ChainMonitor>,
448                 persister: Arc<FilesystemPersister>,
449                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
450                 network_graph: Arc<NetworkGraph>,
451                 logger: Arc<test_utils::TestLogger>,
452                 best_block: BestBlock,
453                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
454                 rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph>>>>
455         }
456
457         impl Drop for Node {
458                 fn drop(&mut self) {
459                         let data_dir = self.persister.get_data_dir();
460                         match fs::remove_dir_all(data_dir.clone()) {
461                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
462                                 _ => {}
463                         }
464                 }
465         }
466
467         struct Persister {
468                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
469                 graph_persistence_notifier: Option<SyncSender<()>>,
470                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
471                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
472                 filesystem_persister: FilesystemPersister,
473         }
474
475         impl Persister {
476                 fn new(data_dir: String) -> Self {
477                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
478                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
479                 }
480
481                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
482                         Self { graph_error: Some((error, message)), ..self }
483                 }
484
485                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
486                         Self { graph_persistence_notifier: Some(sender), ..self }
487                 }
488
489                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
490                         Self { manager_error: Some((error, message)), ..self }
491                 }
492
493                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
494                         Self { scorer_error: Some((error, message)), ..self }
495                 }
496         }
497
498         impl KVStorePersister for Persister {
499                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
500                         if key == "manager" {
501                                 if let Some((error, message)) = self.manager_error {
502                                         return Err(std::io::Error::new(error, message))
503                                 }
504                         }
505
506                         if key == "network_graph" {
507                                 if let Some(sender) = &self.graph_persistence_notifier {
508                                         sender.send(()).unwrap();
509                                 };
510
511                                 if let Some((error, message)) = self.graph_error {
512                                         return Err(std::io::Error::new(error, message))
513                                 }
514                         }
515
516                         if key == "scorer" {
517                                 if let Some((error, message)) = self.scorer_error {
518                                         return Err(std::io::Error::new(error, message))
519                                 }
520                         }
521
522                         self.filesystem_persister.persist(key, object)
523                 }
524         }
525
526         fn get_full_filepath(filepath: String, filename: String) -> String {
527                 let mut path = PathBuf::from(filepath);
528                 path.push(filename);
529                 path.to_str().unwrap().to_string()
530         }
531
532         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
533                 let mut nodes = Vec::new();
534                 for i in 0..num_nodes {
535                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
536                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
537                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
538                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
539                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
540                         let seed = [i as u8; 32];
541                         let network = Network::Testnet;
542                         let genesis_block = genesis_block(network);
543                         let now = Duration::from_secs(genesis_block.header.time as u64);
544                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
545                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
546                         let best_block = BestBlock::from_genesis(network);
547                         let params = ChainParameters { network, best_block };
548                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
549                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash()));
550                         let p2p_gossip_sync = Some(Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
551                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
552                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
553                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
554                         let rapid_gossip_sync = None;
555                         let node = Node { node: manager, p2p_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer, rapid_gossip_sync };
556                         nodes.push(node);
557                 }
558
559                 for i in 0..num_nodes {
560                         for j in (i+1)..num_nodes {
561                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
562                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
563                         }
564                 }
565
566                 nodes
567         }
568
569         macro_rules! open_channel {
570                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
571                         begin_open_channel!($node_a, $node_b, $channel_value);
572                         let events = $node_a.node.get_and_clear_pending_events();
573                         assert_eq!(events.len(), 1);
574                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
575                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
576                         tx
577                 }}
578         }
579
580         macro_rules! begin_open_channel {
581                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
582                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
583                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
584                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
585                 }}
586         }
587
588         macro_rules! handle_funding_generation_ready {
589                 ($event: expr, $channel_value: expr) => {{
590                         match $event {
591                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
592                                         assert_eq!(channel_value_satoshis, $channel_value);
593                                         assert_eq!(user_channel_id, 42);
594
595                                         let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
596                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
597                                         }]};
598                                         (temporary_channel_id, tx)
599                                 },
600                                 _ => panic!("Unexpected event"),
601                         }
602                 }}
603         }
604
605         macro_rules! end_open_channel {
606                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
607                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
608                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
609                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
610                 }}
611         }
612
613         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
614                 for i in 1..=depth {
615                         let prev_blockhash = node.best_block.block_hash();
616                         let height = node.best_block.height() + 1;
617                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
618                         let txdata = vec![(0, tx)];
619                         node.best_block = BestBlock::new(header.block_hash(), height);
620                         match i {
621                                 1 => {
622                                         node.node.transactions_confirmed(&header, &txdata, height);
623                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
624                                 },
625                                 x if x == depth => {
626                                         node.node.best_block_updated(&header, height);
627                                         node.chain_monitor.best_block_updated(&header, height);
628                                 },
629                                 _ => {},
630                         }
631                 }
632         }
633         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
634                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
635         }
636
637         #[test]
638         fn test_background_processor() {
639                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
640                 // updates. Also test that when new updates are available, the manager signals that it needs
641                 // re-persistence and is successfully re-persisted.
642                 let nodes = create_nodes(2, "test_background_processor".to_string());
643
644                 // Go through the channel creation process so that each node has something to persist. Since
645                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
646                 // avoid a race with processing events.
647                 let tx = open_channel!(nodes[0], nodes[1], 100000);
648
649                 // Initiate the background processors to watch each node.
650                 let data_dir = nodes[0].persister.get_data_dir();
651                 let persister = Arc::new(Persister::new(data_dir));
652                 let event_handler = |_: &_| {};
653                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
654
655                 macro_rules! check_persisted_data {
656                         ($node: expr, $filepath: expr) => {
657                                 let mut expected_bytes = Vec::new();
658                                 loop {
659                                         expected_bytes.clear();
660                                         match $node.write(&mut expected_bytes) {
661                                                 Ok(()) => {
662                                                         match std::fs::read($filepath) {
663                                                                 Ok(bytes) => {
664                                                                         if bytes == expected_bytes {
665                                                                                 break
666                                                                         } else {
667                                                                                 continue
668                                                                         }
669                                                                 },
670                                                                 Err(_) => continue
671                                                         }
672                                                 },
673                                                 Err(e) => panic!("Unexpected error: {}", e)
674                                         }
675                                 }
676                         }
677                 }
678
679                 // Check that the initial channel manager data is persisted as expected.
680                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
681                 check_persisted_data!(nodes[0].node, filepath.clone());
682
683                 loop {
684                         if !nodes[0].node.get_persistence_condvar_value() { break }
685                 }
686
687                 // Force-close the channel.
688                 nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
689
690                 // Check that the force-close updates are persisted.
691                 check_persisted_data!(nodes[0].node, filepath.clone());
692                 loop {
693                         if !nodes[0].node.get_persistence_condvar_value() { break }
694                 }
695
696                 // Check network graph is persisted
697                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
698                 if let Some(ref handler) = nodes[0].p2p_gossip_sync {
699                         let network_graph = handler.network_graph();
700                         check_persisted_data!(network_graph, filepath.clone());
701                 }
702
703                 // Check scorer is persisted
704                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
705                 check_persisted_data!(nodes[0].scorer, filepath.clone());
706
707                 assert!(bg_processor.stop().is_ok());
708         }
709
710         #[test]
711         fn test_timer_tick_called() {
712                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
713                 // `FRESHNESS_TIMER`.
714                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
715                 let data_dir = nodes[0].persister.get_data_dir();
716                 let persister = Arc::new(Persister::new(data_dir));
717                 let event_handler = |_: &_| {};
718                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
719                 loop {
720                         let log_entries = nodes[0].logger.lines.lock().unwrap();
721                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
722                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
723                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
724                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
725                                 break
726                         }
727                 }
728
729                 assert!(bg_processor.stop().is_ok());
730         }
731
732         #[test]
733         fn test_channel_manager_persist_error() {
734                 // Test that if we encounter an error during manager persistence, the thread panics.
735                 let nodes = create_nodes(2, "test_persist_error".to_string());
736                 open_channel!(nodes[0], nodes[1], 100000);
737
738                 let data_dir = nodes[0].persister.get_data_dir();
739                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
740                 let event_handler = |_: &_| {};
741                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
742                 match bg_processor.join() {
743                         Ok(_) => panic!("Expected error persisting manager"),
744                         Err(e) => {
745                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
746                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
747                         },
748                 }
749         }
750
751         #[test]
752         fn test_network_graph_persist_error() {
753                 // Test that if we encounter an error during network graph persistence, an error gets returned.
754                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
755                 let data_dir = nodes[0].persister.get_data_dir();
756                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
757                 let event_handler = |_: &_| {};
758                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
759
760                 match bg_processor.stop() {
761                         Ok(_) => panic!("Expected error persisting network graph"),
762                         Err(e) => {
763                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
764                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
765                         },
766                 }
767         }
768
769         #[test]
770         fn test_scorer_persist_error() {
771                 // Test that if we encounter an error during scorer persistence, an error gets returned.
772                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
773                 let data_dir = nodes[0].persister.get_data_dir();
774                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
775                 let event_handler = |_: &_| {};
776                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
777
778                 match bg_processor.stop() {
779                         Ok(_) => panic!("Expected error persisting scorer"),
780                         Err(e) => {
781                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
782                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
783                         },
784                 }
785         }
786
787         #[test]
788         fn test_background_event_handling() {
789                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
790                 let channel_value = 100000;
791                 let data_dir = nodes[0].persister.get_data_dir();
792                 let persister = Arc::new(Persister::new(data_dir.clone()));
793
794                 // Set up a background event handler for FundingGenerationReady events.
795                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
796                 let event_handler = move |event: &Event| {
797                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
798                 };
799                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
800
801                 // Open a channel and check that the FundingGenerationReady event was handled.
802                 begin_open_channel!(nodes[0], nodes[1], channel_value);
803                 let (temporary_channel_id, funding_tx) = receiver
804                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
805                         .expect("FundingGenerationReady not handled within deadline");
806                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
807
808                 // Confirm the funding transaction.
809                 confirm_transaction(&mut nodes[0], &funding_tx);
810                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
811                 confirm_transaction(&mut nodes[1], &funding_tx);
812                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
813                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
814                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
815                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
816                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
817
818                 assert!(bg_processor.stop().is_ok());
819
820                 // Set up a background event handler for SpendableOutputs events.
821                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
822                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
823                 let persister = Arc::new(Persister::new(data_dir));
824                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
825
826                 // Force close the channel and check that the SpendableOutputs event was handled.
827                 nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
828                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
829                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
830                 let event = receiver
831                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
832                         .expect("SpendableOutputs not handled within deadline");
833                 match event {
834                         Event::SpendableOutputs { .. } => {},
835                         Event::ChannelClosed { .. } => {},
836                         _ => panic!("Unexpected event: {:?}", event),
837                 }
838
839                 assert!(bg_processor.stop().is_ok());
840         }
841
842         #[test]
843         fn test_scorer_persistence() {
844                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
845                 let data_dir = nodes[0].persister.get_data_dir();
846                 let persister = Arc::new(Persister::new(data_dir));
847                 let event_handler = |_: &_| {};
848                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
849
850                 loop {
851                         let log_entries = nodes[0].logger.lines.lock().unwrap();
852                         let expected_log = "Persisting scorer".to_string();
853                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
854                                 break
855                         }
856                 }
857
858                 assert!(bg_processor.stop().is_ok());
859         }
860
861         #[test]
862         fn test_not_pruning_network_graph_until_graph_sync_completion() {
863                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
864                 let data_dir = nodes[0].persister.get_data_dir();
865                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
866                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
867                 let network_graph = nodes[0].network_graph.clone();
868                 let rapid_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
869                 let features = ChannelFeatures::empty();
870                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
871                         .expect("Failed to update channel from partial announcement");
872                 let original_graph_description = network_graph.to_string();
873                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
874                 assert_eq!(network_graph.read_only().channels().len(), 1);
875
876                 let event_handler = |_: &_| {};
877                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), Some(rapid_sync.clone()));
878
879                 loop {
880                         let log_entries = nodes[0].logger.lines.lock().unwrap();
881                         let expected_log_a = "Assessing prunability of network graph".to_string();
882                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
883                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
884                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
885                                 break
886                         }
887                 }
888
889                 let initialization_input = vec![
890                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
891                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
892                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
893                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
894                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
895                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
896                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
897                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
898                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
899                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
900                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
901                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
902                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
903                 ];
904                 rapid_sync.update_network_graph(&initialization_input[..]).unwrap();
905
906                 // this should have added two channels
907                 assert_eq!(network_graph.read_only().channels().len(), 3);
908
909                 let _ = receiver
910                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
911                         .expect("Network graph not pruned within deadline");
912
913                 background_processor.stop().unwrap();
914
915                 // all channels should now be pruned
916                 assert_eq!(network_graph.read_only().channels().len(), 0);
917         }
918
919         #[test]
920         fn test_invoice_payer() {
921                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
922                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
923                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
924
925                 // Initiate the background processors to watch each node.
926                 let data_dir = nodes[0].persister.get_data_dir();
927                 let persister = Arc::new(Persister::new(data_dir));
928                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
929                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
930                 let event_handler = Arc::clone(&invoice_payer);
931                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
932                 assert!(bg_processor.stop().is_ok());
933         }
934 }