484439b3907b364dddc3dc2c13bb6c078be2ad0c
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 #![deny(broken_intra_doc_links)]
6 #![deny(missing_docs)]
7 #![deny(unsafe_code)]
8
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
10
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
13
14 use lightning::chain;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
27 use std::sync::Arc;
28 use std::sync::atomic::{AtomicBool, Ordering};
29 use std::thread;
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
32 use std::ops::Deref;
33
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 ///   writing it to disk/backups by invoking the callback given to it at startup.
40 ///   [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 ///   at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
44 ///   is provided to [`BackgroundProcessor::start`]).
45 ///
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
48 ///
49 /// # Note
50 ///
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
55 ///
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60         stop_thread: Arc<AtomicBool>,
61         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
62 }
63
64 #[cfg(not(test))]
65 const FRESHNESS_TIMER: u64 = 60;
66 #[cfg(test)]
67 const FRESHNESS_TIMER: u64 = 1;
68
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
76 #[cfg(test)]
77 const PING_TIMER: u64 = 1;
78
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
81
82 #[cfg(not(test))]
83 const SCORER_PERSIST_TIMER: u64 = 30;
84 #[cfg(test)]
85 const SCORER_PERSIST_TIMER: u64 = 1;
86
87 #[cfg(not(test))]
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
89 #[cfg(test)]
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
91
92 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
93 pub enum GossipSync<
94         P: Deref<Target = P2PGossipSync<G, A, L>>,
95         R: Deref<Target = RapidGossipSync<G, L>>,
96         G: Deref<Target = NetworkGraph<L>>,
97         A: Deref,
98         L: Deref,
99 >
100 where A::Target: chain::Access, L::Target: Logger {
101         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
102         P2P(P),
103         /// Rapid gossip sync from a trusted server.
104         Rapid(R),
105         /// No gossip sync.
106         None,
107 }
108
109 impl<
110         P: Deref<Target = P2PGossipSync<G, A, L>>,
111         R: Deref<Target = RapidGossipSync<G, L>>,
112         G: Deref<Target = NetworkGraph<L>>,
113         A: Deref,
114         L: Deref,
115 > GossipSync<P, R, G, A, L>
116 where A::Target: chain::Access, L::Target: Logger {
117         fn network_graph(&self) -> Option<&G> {
118                 match self {
119                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
120                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
121                         GossipSync::None => None,
122                 }
123         }
124
125         fn prunable_network_graph(&self) -> Option<&G> {
126                 match self {
127                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
128                         GossipSync::Rapid(gossip_sync) => {
129                                 if gossip_sync.is_initial_sync_complete() {
130                                         Some(gossip_sync.network_graph())
131                                 } else {
132                                         None
133                                 }
134                         },
135                         GossipSync::None => None,
136                 }
137         }
138 }
139
140 /// (C-not exported) as the bindings concretize everything and have constructors for us
141 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
142         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
143 where
144         A::Target: chain::Access,
145         L::Target: Logger,
146 {
147         /// Initializes a new [`GossipSync::P2P`] variant.
148         pub fn p2p(gossip_sync: P) -> Self {
149                 GossipSync::P2P(gossip_sync)
150         }
151 }
152
153 /// (C-not exported) as the bindings concretize everything and have constructors for us
154 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
155         GossipSync<
156                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
157                 R,
158                 G,
159                 &'a (dyn chain::Access + Send + Sync),
160                 L,
161         >
162 where
163         L::Target: Logger,
164 {
165         /// Initializes a new [`GossipSync::Rapid`] variant.
166         pub fn rapid(gossip_sync: R) -> Self {
167                 GossipSync::Rapid(gossip_sync)
168         }
169 }
170
171 /// (C-not exported) as the bindings concretize everything and have constructors for us
172 impl<'a, L: Deref>
173         GossipSync<
174                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
175                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
176                 &'a NetworkGraph<L>,
177                 &'a (dyn chain::Access + Send + Sync),
178                 L,
179         >
180 where
181         L::Target: Logger,
182 {
183         /// Initializes a new [`GossipSync::None`] variant.
184         pub fn none() -> Self {
185                 GossipSync::None
186         }
187 }
188
189 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
190 struct DecoratingEventHandler<
191         'a,
192         E: EventHandler,
193         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
194         RGS: Deref<Target = RapidGossipSync<G, L>>,
195         G: Deref<Target = NetworkGraph<L>>,
196         A: Deref,
197         L: Deref,
198 >
199 where A::Target: chain::Access, L::Target: Logger {
200         event_handler: E,
201         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
202 }
203
204 impl<
205         'a,
206         E: EventHandler,
207         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
208         RGS: Deref<Target = RapidGossipSync<G, L>>,
209         G: Deref<Target = NetworkGraph<L>>,
210         A: Deref,
211         L: Deref,
212 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
213 where A::Target: chain::Access, L::Target: Logger {
214         fn handle_event(&self, event: &Event) {
215                 if let Some(network_graph) = self.gossip_sync.network_graph() {
216                         network_graph.handle_event(event);
217                 }
218                 self.event_handler.handle_event(event);
219         }
220 }
221
222 impl BackgroundProcessor {
223         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
224         /// documentation].
225         ///
226         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
227         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
228         /// either [`join`] or [`stop`].
229         ///
230         /// # Data Persistence
231         ///
232         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
233         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
234         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
235         /// provided implementation.
236         ///
237         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
238         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
239         /// See the `lightning-persister` crate for LDK's provided implementation.
240         ///
241         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
242         /// error or call [`join`] and handle any error that may arise. For the latter case,
243         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
244         ///
245         /// # Event Handling
246         ///
247         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
248         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
249         /// functionality implemented by other handlers.
250         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
251         ///
252         /// # Rapid Gossip Sync
253         ///
254         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
255         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
256         /// until the [`RapidGossipSync`] instance completes its first sync.
257         ///
258         /// [top-level documentation]: BackgroundProcessor
259         /// [`join`]: Self::join
260         /// [`stop`]: Self::stop
261         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
262         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
263         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
264         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
265         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
266         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
267         pub fn start<
268                 'a,
269                 Signer: 'static + Sign,
270                 CA: 'static + Deref + Send + Sync,
271                 CF: 'static + Deref + Send + Sync,
272                 CW: 'static + Deref + Send + Sync,
273                 T: 'static + Deref + Send + Sync,
274                 K: 'static + Deref + Send + Sync,
275                 F: 'static + Deref + Send + Sync,
276                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
277                 L: 'static + Deref + Send + Sync,
278                 P: 'static + Deref + Send + Sync,
279                 Descriptor: 'static + SocketDescriptor + Send + Sync,
280                 CMH: 'static + Deref + Send + Sync,
281                 RMH: 'static + Deref + Send + Sync,
282                 EH: 'static + EventHandler + Send,
283                 PS: 'static + Deref + Send,
284                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
285                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
286                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
287                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
288                 UMH: 'static + Deref + Send + Sync,
289                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
290                 S: 'static + Deref<Target = SC> + Send + Sync,
291                 SC: WriteableScore<'a>,
292         >(
293                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
294                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
295         ) -> Self
296         where
297                 CA::Target: 'static + chain::Access,
298                 CF::Target: 'static + chain::Filter,
299                 CW::Target: 'static + chain::Watch<Signer>,
300                 T::Target: 'static + BroadcasterInterface,
301                 K::Target: 'static + KeysInterface<Signer = Signer>,
302                 F::Target: 'static + FeeEstimator,
303                 L::Target: 'static + Logger,
304                 P::Target: 'static + Persist<Signer>,
305                 CMH::Target: 'static + ChannelMessageHandler,
306                 RMH::Target: 'static + RoutingMessageHandler,
307                 UMH::Target: 'static + CustomMessageHandler,
308                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
309         {
310                 let stop_thread = Arc::new(AtomicBool::new(false));
311                 let stop_thread_clone = stop_thread.clone();
312                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
313                         let event_handler = DecoratingEventHandler {
314                                 event_handler,
315                                 gossip_sync: &gossip_sync,
316                         };
317
318                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
319                         channel_manager.timer_tick_occurred();
320
321                         let mut last_freshness_call = Instant::now();
322                         let mut last_ping_call = Instant::now();
323                         let mut last_prune_call = Instant::now();
324                         let mut last_scorer_persist_call = Instant::now();
325                         let mut have_pruned = false;
326
327                         loop {
328                                 channel_manager.process_pending_events(&event_handler);
329                                 chain_monitor.process_pending_events(&event_handler);
330
331                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
332                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
333                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
334                                 // without running the normal event processing above and handing events to users.
335                                 //
336                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
337                                 // processing a message effectively at any point during this loop. In order to
338                                 // minimize the time between such processing completing and persisting the updated
339                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
340                                 // generally, and as a fallback place such blocking only immediately before
341                                 // persistence.
342                                 peer_manager.process_events();
343
344                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
345                                 // see `await_start`'s use below.
346                                 let await_start = Instant::now();
347                                 let updates_available =
348                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
349                                 let await_time = await_start.elapsed();
350
351                                 if updates_available {
352                                         log_trace!(logger, "Persisting ChannelManager...");
353                                         persister.persist_manager(&*channel_manager)?;
354                                         log_trace!(logger, "Done persisting ChannelManager.");
355                                 }
356                                 // Exit the loop if the background processor was requested to stop.
357                                 if stop_thread.load(Ordering::Acquire) == true {
358                                         log_trace!(logger, "Terminating background processor.");
359                                         break;
360                                 }
361                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
362                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
363                                         channel_manager.timer_tick_occurred();
364                                         last_freshness_call = Instant::now();
365                                 }
366                                 if await_time > Duration::from_secs(1) {
367                                         // On various platforms, we may be starved of CPU cycles for several reasons.
368                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
369                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
370                                         // may not get any cycles.
371                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
372                                         // full second, at which point we assume sockets may have been killed (they
373                                         // appear to be at least on some platforms, even if it has only been a second).
374                                         // Note that we have to take care to not get here just because user event
375                                         // processing was slow at the top of the loop. For example, the sample client
376                                         // may call Bitcoin Core RPCs during event handling, which very often takes
377                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
378                                         // peers.
379                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
380                                         peer_manager.disconnect_all_peers();
381                                         last_ping_call = Instant::now();
382                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
383                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
384                                         peer_manager.timer_tick_occurred();
385                                         last_ping_call = Instant::now();
386                                 }
387
388                                 // Note that we want to run a graph prune once not long after startup before
389                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
390                                 // pruning their network graph. We run once 60 seconds after startup before
391                                 // continuing our normal cadence.
392                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
393                                         // The network graph must not be pruned while rapid sync completion is pending
394                                         log_trace!(logger, "Assessing prunability of network graph");
395                                         if let Some(network_graph) = gossip_sync.prunable_network_graph() {
396                                                 network_graph.remove_stale_channels();
397
398                                                 if let Err(e) = persister.persist_graph(network_graph) {
399                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
400                                                 }
401
402                                                 last_prune_call = Instant::now();
403                                                 have_pruned = true;
404                                         } else {
405                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
406                                         }
407                                 }
408
409                                 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
410                                         if let Some(ref scorer) = scorer {
411                                                 log_trace!(logger, "Persisting scorer");
412                                                 if let Err(e) = persister.persist_scorer(&scorer) {
413                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
414                                                 }
415                                         }
416                                         last_scorer_persist_call = Instant::now();
417                                 }
418                         }
419
420                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
421                         // some races where users quit while channel updates were in-flight, with
422                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
423                         persister.persist_manager(&*channel_manager)?;
424
425                         // Persist Scorer on exit
426                         if let Some(ref scorer) = scorer {
427                                 persister.persist_scorer(&scorer)?;
428                         }
429
430                         // Persist NetworkGraph on exit
431                         if let Some(network_graph) = gossip_sync.network_graph() {
432                                 persister.persist_graph(network_graph)?;
433                         }
434
435                         Ok(())
436                 });
437                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
438         }
439
440         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
441         /// [`ChannelManager`].
442         ///
443         /// # Panics
444         ///
445         /// This function panics if the background thread has panicked such as while persisting or
446         /// handling events.
447         ///
448         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
449         pub fn join(mut self) -> Result<(), std::io::Error> {
450                 assert!(self.thread_handle.is_some());
451                 self.join_thread()
452         }
453
454         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
455         /// [`ChannelManager`].
456         ///
457         /// # Panics
458         ///
459         /// This function panics if the background thread has panicked such as while persisting or
460         /// handling events.
461         ///
462         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
463         pub fn stop(mut self) -> Result<(), std::io::Error> {
464                 assert!(self.thread_handle.is_some());
465                 self.stop_and_join_thread()
466         }
467
468         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
469                 self.stop_thread.store(true, Ordering::Release);
470                 self.join_thread()
471         }
472
473         fn join_thread(&mut self) -> Result<(), std::io::Error> {
474                 match self.thread_handle.take() {
475                         Some(handle) => handle.join().unwrap(),
476                         None => Ok(()),
477                 }
478         }
479 }
480
481 impl Drop for BackgroundProcessor {
482         fn drop(&mut self) {
483                 self.stop_and_join_thread().unwrap();
484         }
485 }
486
487 #[cfg(test)]
488 mod tests {
489         use bitcoin::blockdata::block::BlockHeader;
490         use bitcoin::blockdata::constants::genesis_block;
491         use bitcoin::blockdata::transaction::{Transaction, TxOut};
492         use bitcoin::network::constants::Network;
493         use lightning::chain::{BestBlock, Confirm, chainmonitor};
494         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
495         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
496         use lightning::chain::transaction::OutPoint;
497         use lightning::get_event_msg;
498         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
499         use lightning::ln::features::{ChannelFeatures, InitFeatures};
500         use lightning::ln::msgs::{ChannelMessageHandler, Init};
501         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
502         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
503         use lightning::util::config::UserConfig;
504         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
505         use lightning::util::ser::Writeable;
506         use lightning::util::test_utils;
507         use lightning::util::persist::KVStorePersister;
508         use lightning_invoice::payment::{InvoicePayer, Retry};
509         use lightning_invoice::utils::DefaultRouter;
510         use lightning_persister::FilesystemPersister;
511         use std::fs;
512         use std::path::PathBuf;
513         use std::sync::{Arc, Mutex};
514         use std::sync::mpsc::SyncSender;
515         use std::time::Duration;
516         use lightning::routing::scoring::{FixedPenaltyScorer};
517         use lightning_rapid_gossip_sync::RapidGossipSync;
518         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
519
520         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
521
522         #[derive(Clone, Eq, Hash, PartialEq)]
523         struct TestDescriptor{}
524         impl SocketDescriptor for TestDescriptor {
525                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
526                         0
527                 }
528
529                 fn disconnect_socket(&mut self) {}
530         }
531
532         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
533
534         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
535         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
536
537         struct Node {
538                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
539                 p2p_gossip_sync: PGS,
540                 rapid_gossip_sync: RGS,
541                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
542                 chain_monitor: Arc<ChainMonitor>,
543                 persister: Arc<FilesystemPersister>,
544                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
545                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
546                 logger: Arc<test_utils::TestLogger>,
547                 best_block: BestBlock,
548                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
549         }
550
551         impl Node {
552                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
553                         GossipSync::P2P(self.p2p_gossip_sync.clone())
554                 }
555
556                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
557                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
558                 }
559
560                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
561                         GossipSync::None
562                 }
563         }
564
565         impl Drop for Node {
566                 fn drop(&mut self) {
567                         let data_dir = self.persister.get_data_dir();
568                         match fs::remove_dir_all(data_dir.clone()) {
569                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
570                                 _ => {}
571                         }
572                 }
573         }
574
575         struct Persister {
576                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
577                 graph_persistence_notifier: Option<SyncSender<()>>,
578                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
579                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
580                 filesystem_persister: FilesystemPersister,
581         }
582
583         impl Persister {
584                 fn new(data_dir: String) -> Self {
585                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
586                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
587                 }
588
589                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
590                         Self { graph_error: Some((error, message)), ..self }
591                 }
592
593                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
594                         Self { graph_persistence_notifier: Some(sender), ..self }
595                 }
596
597                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
598                         Self { manager_error: Some((error, message)), ..self }
599                 }
600
601                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
602                         Self { scorer_error: Some((error, message)), ..self }
603                 }
604         }
605
606         impl KVStorePersister for Persister {
607                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
608                         if key == "manager" {
609                                 if let Some((error, message)) = self.manager_error {
610                                         return Err(std::io::Error::new(error, message))
611                                 }
612                         }
613
614                         if key == "network_graph" {
615                                 if let Some(sender) = &self.graph_persistence_notifier {
616                                         sender.send(()).unwrap();
617                                 };
618
619                                 if let Some((error, message)) = self.graph_error {
620                                         return Err(std::io::Error::new(error, message))
621                                 }
622                         }
623
624                         if key == "scorer" {
625                                 if let Some((error, message)) = self.scorer_error {
626                                         return Err(std::io::Error::new(error, message))
627                                 }
628                         }
629
630                         self.filesystem_persister.persist(key, object)
631                 }
632         }
633
634         fn get_full_filepath(filepath: String, filename: String) -> String {
635                 let mut path = PathBuf::from(filepath);
636                 path.push(filename);
637                 path.to_str().unwrap().to_string()
638         }
639
640         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
641                 let mut nodes = Vec::new();
642                 for i in 0..num_nodes {
643                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
644                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
645                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
646                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
647                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
648                         let seed = [i as u8; 32];
649                         let network = Network::Testnet;
650                         let genesis_block = genesis_block(network);
651                         let now = Duration::from_secs(genesis_block.header.time as u64);
652                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
653                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
654                         let best_block = BestBlock::from_genesis(network);
655                         let params = ChainParameters { network, best_block };
656                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
657                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
658                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
659                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
660                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
661                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
662                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
663                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
664                         nodes.push(node);
665                 }
666
667                 for i in 0..num_nodes {
668                         for j in (i+1)..num_nodes {
669                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
670                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
671                         }
672                 }
673
674                 nodes
675         }
676
677         macro_rules! open_channel {
678                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
679                         begin_open_channel!($node_a, $node_b, $channel_value);
680                         let events = $node_a.node.get_and_clear_pending_events();
681                         assert_eq!(events.len(), 1);
682                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
683                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
684                         tx
685                 }}
686         }
687
688         macro_rules! begin_open_channel {
689                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
690                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
691                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
692                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
693                 }}
694         }
695
696         macro_rules! handle_funding_generation_ready {
697                 ($event: expr, $channel_value: expr) => {{
698                         match $event {
699                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
700                                         assert_eq!(channel_value_satoshis, $channel_value);
701                                         assert_eq!(user_channel_id, 42);
702
703                                         let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
704                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
705                                         }]};
706                                         (temporary_channel_id, tx)
707                                 },
708                                 _ => panic!("Unexpected event"),
709                         }
710                 }}
711         }
712
713         macro_rules! end_open_channel {
714                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
715                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
716                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
717                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
718                 }}
719         }
720
721         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
722                 for i in 1..=depth {
723                         let prev_blockhash = node.best_block.block_hash();
724                         let height = node.best_block.height() + 1;
725                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
726                         let txdata = vec![(0, tx)];
727                         node.best_block = BestBlock::new(header.block_hash(), height);
728                         match i {
729                                 1 => {
730                                         node.node.transactions_confirmed(&header, &txdata, height);
731                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
732                                 },
733                                 x if x == depth => {
734                                         node.node.best_block_updated(&header, height);
735                                         node.chain_monitor.best_block_updated(&header, height);
736                                 },
737                                 _ => {},
738                         }
739                 }
740         }
741         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
742                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
743         }
744
745         #[test]
746         fn test_background_processor() {
747                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
748                 // updates. Also test that when new updates are available, the manager signals that it needs
749                 // re-persistence and is successfully re-persisted.
750                 let nodes = create_nodes(2, "test_background_processor".to_string());
751
752                 // Go through the channel creation process so that each node has something to persist. Since
753                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
754                 // avoid a race with processing events.
755                 let tx = open_channel!(nodes[0], nodes[1], 100000);
756
757                 // Initiate the background processors to watch each node.
758                 let data_dir = nodes[0].persister.get_data_dir();
759                 let persister = Arc::new(Persister::new(data_dir));
760                 let event_handler = |_: &_| {};
761                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
762
763                 macro_rules! check_persisted_data {
764                         ($node: expr, $filepath: expr) => {
765                                 let mut expected_bytes = Vec::new();
766                                 loop {
767                                         expected_bytes.clear();
768                                         match $node.write(&mut expected_bytes) {
769                                                 Ok(()) => {
770                                                         match std::fs::read($filepath) {
771                                                                 Ok(bytes) => {
772                                                                         if bytes == expected_bytes {
773                                                                                 break
774                                                                         } else {
775                                                                                 continue
776                                                                         }
777                                                                 },
778                                                                 Err(_) => continue
779                                                         }
780                                                 },
781                                                 Err(e) => panic!("Unexpected error: {}", e)
782                                         }
783                                 }
784                         }
785                 }
786
787                 // Check that the initial channel manager data is persisted as expected.
788                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
789                 check_persisted_data!(nodes[0].node, filepath.clone());
790
791                 loop {
792                         if !nodes[0].node.get_persistence_condvar_value() { break }
793                 }
794
795                 // Force-close the channel.
796                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
797
798                 // Check that the force-close updates are persisted.
799                 check_persisted_data!(nodes[0].node, filepath.clone());
800                 loop {
801                         if !nodes[0].node.get_persistence_condvar_value() { break }
802                 }
803
804                 // Check network graph is persisted
805                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
806                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
807
808                 // Check scorer is persisted
809                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
810                 check_persisted_data!(nodes[0].scorer, filepath.clone());
811
812                 assert!(bg_processor.stop().is_ok());
813         }
814
815         #[test]
816         fn test_timer_tick_called() {
817                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
818                 // `FRESHNESS_TIMER`.
819                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
820                 let data_dir = nodes[0].persister.get_data_dir();
821                 let persister = Arc::new(Persister::new(data_dir));
822                 let event_handler = |_: &_| {};
823                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
824                 loop {
825                         let log_entries = nodes[0].logger.lines.lock().unwrap();
826                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
827                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
828                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
829                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
830                                 break
831                         }
832                 }
833
834                 assert!(bg_processor.stop().is_ok());
835         }
836
837         #[test]
838         fn test_channel_manager_persist_error() {
839                 // Test that if we encounter an error during manager persistence, the thread panics.
840                 let nodes = create_nodes(2, "test_persist_error".to_string());
841                 open_channel!(nodes[0], nodes[1], 100000);
842
843                 let data_dir = nodes[0].persister.get_data_dir();
844                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
845                 let event_handler = |_: &_| {};
846                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
847                 match bg_processor.join() {
848                         Ok(_) => panic!("Expected error persisting manager"),
849                         Err(e) => {
850                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
851                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
852                         },
853                 }
854         }
855
856         #[test]
857         fn test_network_graph_persist_error() {
858                 // Test that if we encounter an error during network graph persistence, an error gets returned.
859                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
860                 let data_dir = nodes[0].persister.get_data_dir();
861                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
862                 let event_handler = |_: &_| {};
863                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
864
865                 match bg_processor.stop() {
866                         Ok(_) => panic!("Expected error persisting network graph"),
867                         Err(e) => {
868                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
869                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
870                         },
871                 }
872         }
873
874         #[test]
875         fn test_scorer_persist_error() {
876                 // Test that if we encounter an error during scorer persistence, an error gets returned.
877                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
878                 let data_dir = nodes[0].persister.get_data_dir();
879                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
880                 let event_handler = |_: &_| {};
881                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
882
883                 match bg_processor.stop() {
884                         Ok(_) => panic!("Expected error persisting scorer"),
885                         Err(e) => {
886                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
887                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
888                         },
889                 }
890         }
891
892         #[test]
893         fn test_background_event_handling() {
894                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
895                 let channel_value = 100000;
896                 let data_dir = nodes[0].persister.get_data_dir();
897                 let persister = Arc::new(Persister::new(data_dir.clone()));
898
899                 // Set up a background event handler for FundingGenerationReady events.
900                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
901                 let event_handler = move |event: &Event| {
902                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
903                 };
904                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
905
906                 // Open a channel and check that the FundingGenerationReady event was handled.
907                 begin_open_channel!(nodes[0], nodes[1], channel_value);
908                 let (temporary_channel_id, funding_tx) = receiver
909                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
910                         .expect("FundingGenerationReady not handled within deadline");
911                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
912
913                 // Confirm the funding transaction.
914                 confirm_transaction(&mut nodes[0], &funding_tx);
915                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
916                 confirm_transaction(&mut nodes[1], &funding_tx);
917                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
918                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
919                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
920                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
921                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
922
923                 assert!(bg_processor.stop().is_ok());
924
925                 // Set up a background event handler for SpendableOutputs events.
926                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
927                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
928                 let persister = Arc::new(Persister::new(data_dir));
929                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
930
931                 // Force close the channel and check that the SpendableOutputs event was handled.
932                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
933                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
934                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
935                 let event = receiver
936                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
937                         .expect("SpendableOutputs not handled within deadline");
938                 match event {
939                         Event::SpendableOutputs { .. } => {},
940                         Event::ChannelClosed { .. } => {},
941                         _ => panic!("Unexpected event: {:?}", event),
942                 }
943
944                 assert!(bg_processor.stop().is_ok());
945         }
946
947         #[test]
948         fn test_scorer_persistence() {
949                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
950                 let data_dir = nodes[0].persister.get_data_dir();
951                 let persister = Arc::new(Persister::new(data_dir));
952                 let event_handler = |_: &_| {};
953                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
954
955                 loop {
956                         let log_entries = nodes[0].logger.lines.lock().unwrap();
957                         let expected_log = "Persisting scorer".to_string();
958                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
959                                 break
960                         }
961                 }
962
963                 assert!(bg_processor.stop().is_ok());
964         }
965
966         #[test]
967         fn test_not_pruning_network_graph_until_graph_sync_completion() {
968                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
969                 let data_dir = nodes[0].persister.get_data_dir();
970                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
971                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
972                 let network_graph = nodes[0].network_graph.clone();
973                 let features = ChannelFeatures::empty();
974                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
975                         .expect("Failed to update channel from partial announcement");
976                 let original_graph_description = network_graph.to_string();
977                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
978                 assert_eq!(network_graph.read_only().channels().len(), 1);
979
980                 let event_handler = |_: &_| {};
981                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
982
983                 loop {
984                         let log_entries = nodes[0].logger.lines.lock().unwrap();
985                         let expected_log_a = "Assessing prunability of network graph".to_string();
986                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
987                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
988                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
989                                 break
990                         }
991                 }
992
993                 let initialization_input = vec![
994                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
995                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
996                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
997                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
998                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
999                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1000                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1001                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1002                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1003                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1004                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1005                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1006                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1007                 ];
1008                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1009
1010                 // this should have added two channels
1011                 assert_eq!(network_graph.read_only().channels().len(), 3);
1012
1013                 let _ = receiver
1014                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1015                         .expect("Network graph not pruned within deadline");
1016
1017                 background_processor.stop().unwrap();
1018
1019                 // all channels should now be pruned
1020                 assert_eq!(network_graph.read_only().channels().len(), 0);
1021         }
1022
1023         #[test]
1024         fn test_invoice_payer() {
1025                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1026                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1027                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1028
1029                 // Initiate the background processors to watch each node.
1030                 let data_dir = nodes[0].persister.get_data_dir();
1031                 let persister = Arc::new(Persister::new(data_dir));
1032                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1033                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1034                 let event_handler = Arc::clone(&invoice_payer);
1035                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1036                 assert!(bg_processor.stop().is_ok());
1037         }
1038 }