Parameterize Simple*ChannelManager with DefaultRouter and ProbScorer
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{KeysInterface, SignerProvider};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::router::Router;
26 use lightning::routing::scoring::WriteableScore;
27 use lightning::util::events::{Event, EventHandler, EventsProvider};
28 use lightning::util::logger::Logger;
29 use lightning::util::persist::Persister;
30 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::Arc;
32 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread;
34 use std::thread::JoinHandle;
35 use std::time::{Duration, Instant};
36 use std::ops::Deref;
37
38 #[cfg(feature = "futures")]
39 use futures_util::{select_biased, future::FutureExt};
40
41 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
42 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
43 /// responsibilities are:
44 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
45 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
46 ///   writing it to disk/backups by invoking the callback given to it at startup.
47 ///   [`ChannelManager`] persistence should be done in the background.
48 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
49 ///   at the appropriate intervals.
50 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
51 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
52 ///
53 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
54 /// upon as doing so may result in high latency.
55 ///
56 /// # Note
57 ///
58 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
59 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
60 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
61 /// unilateral chain closure fees are at risk.
62 ///
63 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
64 /// [`Event`]: lightning::util::events::Event
65 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
66 pub struct BackgroundProcessor {
67         stop_thread: Arc<AtomicBool>,
68         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
69 }
70
71 #[cfg(not(test))]
72 const FRESHNESS_TIMER: u64 = 60;
73 #[cfg(test)]
74 const FRESHNESS_TIMER: u64 = 1;
75
76 #[cfg(all(not(test), not(debug_assertions)))]
77 const PING_TIMER: u64 = 10;
78 /// Signature operations take a lot longer without compiler optimisations.
79 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
80 /// timeout is reached.
81 #[cfg(all(not(test), debug_assertions))]
82 const PING_TIMER: u64 = 30;
83 #[cfg(test)]
84 const PING_TIMER: u64 = 1;
85
86 /// Prune the network graph of stale entries hourly.
87 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
88
89 #[cfg(not(test))]
90 const SCORER_PERSIST_TIMER: u64 = 30;
91 #[cfg(test)]
92 const SCORER_PERSIST_TIMER: u64 = 1;
93
94 #[cfg(not(test))]
95 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 #[cfg(test)]
97 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98
99 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 pub enum GossipSync<
101         P: Deref<Target = P2PGossipSync<G, A, L>>,
102         R: Deref<Target = RapidGossipSync<G, L>>,
103         G: Deref<Target = NetworkGraph<L>>,
104         A: Deref,
105         L: Deref,
106 >
107 where A::Target: chain::Access, L::Target: Logger {
108         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109         P2P(P),
110         /// Rapid gossip sync from a trusted server.
111         Rapid(R),
112         /// No gossip sync.
113         None,
114 }
115
116 impl<
117         P: Deref<Target = P2PGossipSync<G, A, L>>,
118         R: Deref<Target = RapidGossipSync<G, L>>,
119         G: Deref<Target = NetworkGraph<L>>,
120         A: Deref,
121         L: Deref,
122 > GossipSync<P, R, G, A, L>
123 where A::Target: chain::Access, L::Target: Logger {
124         fn network_graph(&self) -> Option<&G> {
125                 match self {
126                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
128                         GossipSync::None => None,
129                 }
130         }
131
132         fn prunable_network_graph(&self) -> Option<&G> {
133                 match self {
134                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
135                         GossipSync::Rapid(gossip_sync) => {
136                                 if gossip_sync.is_initial_sync_complete() {
137                                         Some(gossip_sync.network_graph())
138                                 } else {
139                                         None
140                                 }
141                         },
142                         GossipSync::None => None,
143                 }
144         }
145 }
146
147 /// (C-not exported) as the bindings concretize everything and have constructors for us
148 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
149         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 where
151         A::Target: chain::Access,
152         L::Target: Logger,
153 {
154         /// Initializes a new [`GossipSync::P2P`] variant.
155         pub fn p2p(gossip_sync: P) -> Self {
156                 GossipSync::P2P(gossip_sync)
157         }
158 }
159
160 /// (C-not exported) as the bindings concretize everything and have constructors for us
161 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162         GossipSync<
163                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
164                 R,
165                 G,
166                 &'a (dyn chain::Access + Send + Sync),
167                 L,
168         >
169 where
170         L::Target: Logger,
171 {
172         /// Initializes a new [`GossipSync::Rapid`] variant.
173         pub fn rapid(gossip_sync: R) -> Self {
174                 GossipSync::Rapid(gossip_sync)
175         }
176 }
177
178 /// (C-not exported) as the bindings concretize everything and have constructors for us
179 impl<'a, L: Deref>
180         GossipSync<
181                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
182                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183                 &'a NetworkGraph<L>,
184                 &'a (dyn chain::Access + Send + Sync),
185                 L,
186         >
187 where
188         L::Target: Logger,
189 {
190         /// Initializes a new [`GossipSync::None`] variant.
191         pub fn none() -> Self {
192                 GossipSync::None
193         }
194 }
195
196 fn handle_network_graph_update<L: Deref>(
197         network_graph: &NetworkGraph<L>, event: &Event
198 ) where L::Target: Logger {
199         if let Event::PaymentPathFailed { ref network_update, .. } = event {
200                 if let Some(network_update) = network_update {
201                         network_graph.handle_network_update(&network_update);
202                 }
203         }
204 }
205
206 macro_rules! define_run_body {
207         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
208          $channel_manager: ident, $process_channel_manager_events: expr,
209          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
210          $loop_exit_check: expr, $await: expr)
211         => { {
212                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
213                 $channel_manager.timer_tick_occurred();
214
215                 let mut last_freshness_call = Instant::now();
216                 let mut last_ping_call = Instant::now();
217                 let mut last_prune_call = Instant::now();
218                 let mut last_scorer_persist_call = Instant::now();
219                 let mut have_pruned = false;
220
221                 loop {
222                         $process_channel_manager_events;
223                         $process_chain_monitor_events;
224
225                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
226                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
227                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
228                         // without running the normal event processing above and handing events to users.
229                         //
230                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
231                         // processing a message effectively at any point during this loop. In order to
232                         // minimize the time between such processing completing and persisting the updated
233                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
234                         // generally, and as a fallback place such blocking only immediately before
235                         // persistence.
236                         $peer_manager.process_events();
237
238                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
239                         // see `await_start`'s use below.
240                         let await_start = Instant::now();
241                         let updates_available = $await;
242                         let await_time = await_start.elapsed();
243
244                         if updates_available {
245                                 log_trace!($logger, "Persisting ChannelManager...");
246                                 $persister.persist_manager(&*$channel_manager)?;
247                                 log_trace!($logger, "Done persisting ChannelManager.");
248                         }
249                         // Exit the loop if the background processor was requested to stop.
250                         if $loop_exit_check {
251                                 log_trace!($logger, "Terminating background processor.");
252                                 break;
253                         }
254                         if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
255                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
256                                 $channel_manager.timer_tick_occurred();
257                                 last_freshness_call = Instant::now();
258                         }
259                         if await_time > Duration::from_secs(1) {
260                                 // On various platforms, we may be starved of CPU cycles for several reasons.
261                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
262                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
263                                 // may not get any cycles.
264                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
265                                 // full second, at which point we assume sockets may have been killed (they
266                                 // appear to be at least on some platforms, even if it has only been a second).
267                                 // Note that we have to take care to not get here just because user event
268                                 // processing was slow at the top of the loop. For example, the sample client
269                                 // may call Bitcoin Core RPCs during event handling, which very often takes
270                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
271                                 // peers.
272                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
273                                 $peer_manager.disconnect_all_peers();
274                                 last_ping_call = Instant::now();
275                         } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
276                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
277                                 $peer_manager.timer_tick_occurred();
278                                 last_ping_call = Instant::now();
279                         }
280
281                         // Note that we want to run a graph prune once not long after startup before
282                         // falling back to our usual hourly prunes. This avoids short-lived clients never
283                         // pruning their network graph. We run once 60 seconds after startup before
284                         // continuing our normal cadence.
285                         if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
286                                 // The network graph must not be pruned while rapid sync completion is pending
287                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
288                                         log_trace!($logger, "Pruning and persisting network graph.");
289                                         network_graph.remove_stale_channels_and_tracking();
290
291                                         if let Err(e) = $persister.persist_graph(network_graph) {
292                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
293                                         }
294
295                                         last_prune_call = Instant::now();
296                                         have_pruned = true;
297                                 }
298                         }
299
300                         if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
301                                 if let Some(ref scorer) = $scorer {
302                                         log_trace!($logger, "Persisting scorer");
303                                         if let Err(e) = $persister.persist_scorer(&scorer) {
304                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
305                                         }
306                                 }
307                                 last_scorer_persist_call = Instant::now();
308                         }
309                 }
310
311                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
312                 // some races where users quit while channel updates were in-flight, with
313                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
314                 $persister.persist_manager(&*$channel_manager)?;
315
316                 // Persist Scorer on exit
317                 if let Some(ref scorer) = $scorer {
318                         $persister.persist_scorer(&scorer)?;
319                 }
320
321                 // Persist NetworkGraph on exit
322                 if let Some(network_graph) = $gossip_sync.network_graph() {
323                         $persister.persist_graph(network_graph)?;
324                 }
325
326                 Ok(())
327         } }
328 }
329
330 /// Processes background events in a future.
331 ///
332 /// `sleeper` should return a future which completes in the given amount of time and returns a
333 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
334 /// future which outputs true, the loop will exit and this function's future will complete.
335 ///
336 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
337 #[cfg(feature = "futures")]
338 pub async fn process_events_async<
339         'a,
340         CA: 'static + Deref + Send + Sync,
341         CF: 'static + Deref + Send + Sync,
342         CW: 'static + Deref + Send + Sync,
343         T: 'static + Deref + Send + Sync,
344         K: 'static + Deref + Send + Sync,
345         F: 'static + Deref + Send + Sync,
346         R: 'static + Deref + Send + Sync,
347         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
348         L: 'static + Deref + Send + Sync,
349         P: 'static + Deref + Send + Sync,
350         Descriptor: 'static + SocketDescriptor + Send + Sync,
351         CMH: 'static + Deref + Send + Sync,
352         RMH: 'static + Deref + Send + Sync,
353         OMH: 'static + Deref + Send + Sync,
354         EventHandlerFuture: core::future::Future<Output = ()>,
355         EventHandler: Fn(Event) -> EventHandlerFuture,
356         PS: 'static + Deref + Send,
357         M: 'static + Deref<Target = ChainMonitor<<K::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
358         CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, R, L>> + Send + Sync,
359         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
360         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
361         UMH: 'static + Deref + Send + Sync,
362         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
363         S: 'static + Deref<Target = SC> + Send + Sync,
364         SC: WriteableScore<'a>,
365         SleepFuture: core::future::Future<Output = bool>,
366         Sleeper: Fn(Duration) -> SleepFuture
367 >(
368         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
369         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
370         sleeper: Sleeper,
371 ) -> Result<(), std::io::Error>
372 where
373         CA::Target: 'static + chain::Access,
374         CF::Target: 'static + chain::Filter,
375         CW::Target: 'static + chain::Watch<<K::Target as SignerProvider>::Signer>,
376         T::Target: 'static + BroadcasterInterface,
377         K::Target: 'static + KeysInterface,
378         F::Target: 'static + FeeEstimator,
379         R::Target: 'static + Router,
380         L::Target: 'static + Logger,
381         P::Target: 'static + Persist<<K::Target as SignerProvider>::Signer>,
382         CMH::Target: 'static + ChannelMessageHandler,
383         OMH::Target: 'static + OnionMessageHandler,
384         RMH::Target: 'static + RoutingMessageHandler,
385         UMH::Target: 'static + CustomMessageHandler,
386         PS::Target: 'static + Persister<'a, CW, T, K, F, R, L, SC>,
387 {
388         let mut should_break = true;
389         let async_event_handler = |event| {
390                 let network_graph = gossip_sync.network_graph();
391                 let event_handler = &event_handler;
392                 async move {
393                         if let Some(network_graph) = network_graph {
394                                 handle_network_graph_update(network_graph, &event)
395                         }
396                         event_handler(event).await;
397                 }
398         };
399         define_run_body!(persister,
400                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
401                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
402                 gossip_sync, peer_manager, logger, scorer, should_break, {
403                         select_biased! {
404                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
405                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
406                                         should_break = exit;
407                                         false
408                                 }
409                         }
410                 })
411 }
412
413 impl BackgroundProcessor {
414         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
415         /// documentation].
416         ///
417         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
418         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
419         /// either [`join`] or [`stop`].
420         ///
421         /// # Data Persistence
422         ///
423         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
424         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
425         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
426         /// provided implementation.
427         ///
428         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
429         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
430         /// See the `lightning-persister` crate for LDK's provided implementation.
431         ///
432         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
433         /// error or call [`join`] and handle any error that may arise. For the latter case,
434         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
435         ///
436         /// # Event Handling
437         ///
438         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
439         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
440         /// functionality implemented by other handlers.
441         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
442         ///
443         /// # Rapid Gossip Sync
444         ///
445         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
446         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
447         /// until the [`RapidGossipSync`] instance completes its first sync.
448         ///
449         /// [top-level documentation]: BackgroundProcessor
450         /// [`join`]: Self::join
451         /// [`stop`]: Self::stop
452         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
453         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
454         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
455         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
456         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
457         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
458         pub fn start<
459                 'a,
460                 CA: 'static + Deref + Send + Sync,
461                 CF: 'static + Deref + Send + Sync,
462                 CW: 'static + Deref + Send + Sync,
463                 T: 'static + Deref + Send + Sync,
464                 K: 'static + Deref + Send + Sync,
465                 F: 'static + Deref + Send + Sync,
466                 R: 'static + Deref + Send + Sync,
467                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
468                 L: 'static + Deref + Send + Sync,
469                 P: 'static + Deref + Send + Sync,
470                 Descriptor: 'static + SocketDescriptor + Send + Sync,
471                 CMH: 'static + Deref + Send + Sync,
472                 OMH: 'static + Deref + Send + Sync,
473                 RMH: 'static + Deref + Send + Sync,
474                 EH: 'static + EventHandler + Send,
475                 PS: 'static + Deref + Send,
476                 M: 'static + Deref<Target = ChainMonitor<<K::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
477                 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, R, L>> + Send + Sync,
478                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
479                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
480                 UMH: 'static + Deref + Send + Sync,
481                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
482                 S: 'static + Deref<Target = SC> + Send + Sync,
483                 SC: WriteableScore<'a>,
484         >(
485                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
486                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
487         ) -> Self
488         where
489                 CA::Target: 'static + chain::Access,
490                 CF::Target: 'static + chain::Filter,
491                 CW::Target: 'static + chain::Watch<<K::Target as SignerProvider>::Signer>,
492                 T::Target: 'static + BroadcasterInterface,
493                 K::Target: 'static + KeysInterface,
494                 F::Target: 'static + FeeEstimator,
495                 R::Target: 'static + Router,
496                 L::Target: 'static + Logger,
497                 P::Target: 'static + Persist<<K::Target as SignerProvider>::Signer>,
498                 CMH::Target: 'static + ChannelMessageHandler,
499                 OMH::Target: 'static + OnionMessageHandler,
500                 RMH::Target: 'static + RoutingMessageHandler,
501                 UMH::Target: 'static + CustomMessageHandler,
502                 PS::Target: 'static + Persister<'a, CW, T, K, F, R, L, SC>,
503         {
504                 let stop_thread = Arc::new(AtomicBool::new(false));
505                 let stop_thread_clone = stop_thread.clone();
506                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
507                         let event_handler = |event| {
508                                 let network_graph = gossip_sync.network_graph();
509                                 if let Some(network_graph) = network_graph {
510                                         handle_network_graph_update(network_graph, &event)
511                                 }
512                                 event_handler.handle_event(event);
513                         };
514                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
515                                 channel_manager, channel_manager.process_pending_events(&event_handler),
516                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
517                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
518                 });
519                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
520         }
521
522         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
523         /// [`ChannelManager`].
524         ///
525         /// # Panics
526         ///
527         /// This function panics if the background thread has panicked such as while persisting or
528         /// handling events.
529         ///
530         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
531         pub fn join(mut self) -> Result<(), std::io::Error> {
532                 assert!(self.thread_handle.is_some());
533                 self.join_thread()
534         }
535
536         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
537         /// [`ChannelManager`].
538         ///
539         /// # Panics
540         ///
541         /// This function panics if the background thread has panicked such as while persisting or
542         /// handling events.
543         ///
544         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
545         pub fn stop(mut self) -> Result<(), std::io::Error> {
546                 assert!(self.thread_handle.is_some());
547                 self.stop_and_join_thread()
548         }
549
550         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
551                 self.stop_thread.store(true, Ordering::Release);
552                 self.join_thread()
553         }
554
555         fn join_thread(&mut self) -> Result<(), std::io::Error> {
556                 match self.thread_handle.take() {
557                         Some(handle) => handle.join().unwrap(),
558                         None => Ok(()),
559                 }
560         }
561 }
562
563 impl Drop for BackgroundProcessor {
564         fn drop(&mut self) {
565                 self.stop_and_join_thread().unwrap();
566         }
567 }
568
569 #[cfg(test)]
570 mod tests {
571         use bitcoin::blockdata::block::BlockHeader;
572         use bitcoin::blockdata::constants::genesis_block;
573         use bitcoin::blockdata::locktime::PackedLockTime;
574         use bitcoin::blockdata::transaction::{Transaction, TxOut};
575         use bitcoin::network::constants::Network;
576         use lightning::chain::{BestBlock, Confirm, chainmonitor};
577         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
578         use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysInterface, KeysManager, NodeSigner};
579         use lightning::chain::transaction::OutPoint;
580         use lightning::get_event_msg;
581         use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
582         use lightning::ln::features::ChannelFeatures;
583         use lightning::ln::msgs::{ChannelMessageHandler, Init};
584         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
585         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
586         use lightning::routing::router::DefaultRouter;
587         use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
588         use lightning::util::config::UserConfig;
589         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
590         use lightning::util::ser::Writeable;
591         use lightning::util::test_utils;
592         use lightning::util::persist::KVStorePersister;
593         use lightning_invoice::payment::{InvoicePayer, Retry};
594         use lightning_persister::FilesystemPersister;
595         use std::fs;
596         use std::path::PathBuf;
597         use std::sync::{Arc, Mutex};
598         use std::sync::mpsc::SyncSender;
599         use std::time::Duration;
600         use bitcoin::hashes::Hash;
601         use bitcoin::TxMerkleNode;
602         use lightning_rapid_gossip_sync::RapidGossipSync;
603         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
604
605         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
606
607         #[derive(Clone, Hash, PartialEq, Eq)]
608         struct TestDescriptor{}
609         impl SocketDescriptor for TestDescriptor {
610                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
611                         0
612                 }
613
614                 fn disconnect_socket(&mut self) {}
615         }
616
617         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
618
619         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
620         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
621
622         struct Node {
623                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
624                 p2p_gossip_sync: PGS,
625                 rapid_gossip_sync: RGS,
626                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
627                 chain_monitor: Arc<ChainMonitor>,
628                 persister: Arc<FilesystemPersister>,
629                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
630                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
631                 logger: Arc<test_utils::TestLogger>,
632                 best_block: BestBlock,
633                 scorer: Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
634         }
635
636         impl Node {
637                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
638                         GossipSync::P2P(self.p2p_gossip_sync.clone())
639                 }
640
641                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
642                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
643                 }
644
645                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
646                         GossipSync::None
647                 }
648         }
649
650         impl Drop for Node {
651                 fn drop(&mut self) {
652                         let data_dir = self.persister.get_data_dir();
653                         match fs::remove_dir_all(data_dir.clone()) {
654                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
655                                 _ => {}
656                         }
657                 }
658         }
659
660         struct Persister {
661                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
662                 graph_persistence_notifier: Option<SyncSender<()>>,
663                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
664                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
665                 filesystem_persister: FilesystemPersister,
666         }
667
668         impl Persister {
669                 fn new(data_dir: String) -> Self {
670                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
671                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
672                 }
673
674                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
675                         Self { graph_error: Some((error, message)), ..self }
676                 }
677
678                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
679                         Self { graph_persistence_notifier: Some(sender), ..self }
680                 }
681
682                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
683                         Self { manager_error: Some((error, message)), ..self }
684                 }
685
686                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
687                         Self { scorer_error: Some((error, message)), ..self }
688                 }
689         }
690
691         impl KVStorePersister for Persister {
692                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
693                         if key == "manager" {
694                                 if let Some((error, message)) = self.manager_error {
695                                         return Err(std::io::Error::new(error, message))
696                                 }
697                         }
698
699                         if key == "network_graph" {
700                                 if let Some(sender) = &self.graph_persistence_notifier {
701                                         sender.send(()).unwrap();
702                                 };
703
704                                 if let Some((error, message)) = self.graph_error {
705                                         return Err(std::io::Error::new(error, message))
706                                 }
707                         }
708
709                         if key == "scorer" {
710                                 if let Some((error, message)) = self.scorer_error {
711                                         return Err(std::io::Error::new(error, message))
712                                 }
713                         }
714
715                         self.filesystem_persister.persist(key, object)
716                 }
717         }
718
719         fn get_full_filepath(filepath: String, filename: String) -> String {
720                 let mut path = PathBuf::from(filepath);
721                 path.push(filename);
722                 path.to_str().unwrap().to_string()
723         }
724
725         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
726                 let mut nodes = Vec::new();
727                 for i in 0..num_nodes {
728                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
729                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
730                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
731                         let network = Network::Testnet;
732                         let genesis_block = genesis_block(network);
733                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
734                         let params = ProbabilisticScoringParameters::default();
735                         let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone())));
736                         let seed = [i as u8; 32];
737                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
738                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
739                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
740                         let now = Duration::from_secs(genesis_block.header.time as u64);
741                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
742                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
743                         let best_block = BestBlock::from_genesis(network);
744                         let params = ChainParameters { network, best_block };
745                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
746                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
747                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
748                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
749                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
750                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
751                         nodes.push(node);
752                 }
753
754                 for i in 0..num_nodes {
755                         for j in (i+1)..num_nodes {
756                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
757                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
758                         }
759                 }
760
761                 nodes
762         }
763
764         macro_rules! open_channel {
765                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
766                         begin_open_channel!($node_a, $node_b, $channel_value);
767                         let events = $node_a.node.get_and_clear_pending_events();
768                         assert_eq!(events.len(), 1);
769                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
770                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
771                         tx
772                 }}
773         }
774
775         macro_rules! begin_open_channel {
776                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
777                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
778                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
779                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
780                 }}
781         }
782
783         macro_rules! handle_funding_generation_ready {
784                 ($event: expr, $channel_value: expr) => {{
785                         match $event {
786                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
787                                         assert_eq!(channel_value_satoshis, $channel_value);
788                                         assert_eq!(user_channel_id, 42);
789
790                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
791                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
792                                         }]};
793                                         (temporary_channel_id, tx)
794                                 },
795                                 _ => panic!("Unexpected event"),
796                         }
797                 }}
798         }
799
800         macro_rules! end_open_channel {
801                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
802                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
803                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
804                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
805                 }}
806         }
807
808         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
809                 for i in 1..=depth {
810                         let prev_blockhash = node.best_block.block_hash();
811                         let height = node.best_block.height() + 1;
812                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
813                         let txdata = vec![(0, tx)];
814                         node.best_block = BestBlock::new(header.block_hash(), height);
815                         match i {
816                                 1 => {
817                                         node.node.transactions_confirmed(&header, &txdata, height);
818                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
819                                 },
820                                 x if x == depth => {
821                                         node.node.best_block_updated(&header, height);
822                                         node.chain_monitor.best_block_updated(&header, height);
823                                 },
824                                 _ => {},
825                         }
826                 }
827         }
828         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
829                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
830         }
831
832         #[test]
833         fn test_background_processor() {
834                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
835                 // updates. Also test that when new updates are available, the manager signals that it needs
836                 // re-persistence and is successfully re-persisted.
837                 let nodes = create_nodes(2, "test_background_processor".to_string());
838
839                 // Go through the channel creation process so that each node has something to persist. Since
840                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
841                 // avoid a race with processing events.
842                 let tx = open_channel!(nodes[0], nodes[1], 100000);
843
844                 // Initiate the background processors to watch each node.
845                 let data_dir = nodes[0].persister.get_data_dir();
846                 let persister = Arc::new(Persister::new(data_dir));
847                 let event_handler = |_: _| {};
848                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
849
850                 macro_rules! check_persisted_data {
851                         ($node: expr, $filepath: expr) => {
852                                 let mut expected_bytes = Vec::new();
853                                 loop {
854                                         expected_bytes.clear();
855                                         match $node.write(&mut expected_bytes) {
856                                                 Ok(()) => {
857                                                         match std::fs::read($filepath) {
858                                                                 Ok(bytes) => {
859                                                                         if bytes == expected_bytes {
860                                                                                 break
861                                                                         } else {
862                                                                                 continue
863                                                                         }
864                                                                 },
865                                                                 Err(_) => continue
866                                                         }
867                                                 },
868                                                 Err(e) => panic!("Unexpected error: {}", e)
869                                         }
870                                 }
871                         }
872                 }
873
874                 // Check that the initial channel manager data is persisted as expected.
875                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
876                 check_persisted_data!(nodes[0].node, filepath.clone());
877
878                 loop {
879                         if !nodes[0].node.get_persistence_condvar_value() { break }
880                 }
881
882                 // Force-close the channel.
883                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
884
885                 // Check that the force-close updates are persisted.
886                 check_persisted_data!(nodes[0].node, filepath.clone());
887                 loop {
888                         if !nodes[0].node.get_persistence_condvar_value() { break }
889                 }
890
891                 // Check network graph is persisted
892                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
893                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
894
895                 // Check scorer is persisted
896                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
897                 check_persisted_data!(nodes[0].scorer, filepath.clone());
898
899                 assert!(bg_processor.stop().is_ok());
900         }
901
902         #[test]
903         fn test_timer_tick_called() {
904                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
905                 // `FRESHNESS_TIMER`.
906                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
907                 let data_dir = nodes[0].persister.get_data_dir();
908                 let persister = Arc::new(Persister::new(data_dir));
909                 let event_handler = |_: _| {};
910                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
911                 loop {
912                         let log_entries = nodes[0].logger.lines.lock().unwrap();
913                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
914                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
915                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
916                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
917                                 break
918                         }
919                 }
920
921                 assert!(bg_processor.stop().is_ok());
922         }
923
924         #[test]
925         fn test_channel_manager_persist_error() {
926                 // Test that if we encounter an error during manager persistence, the thread panics.
927                 let nodes = create_nodes(2, "test_persist_error".to_string());
928                 open_channel!(nodes[0], nodes[1], 100000);
929
930                 let data_dir = nodes[0].persister.get_data_dir();
931                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
932                 let event_handler = |_: _| {};
933                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
934                 match bg_processor.join() {
935                         Ok(_) => panic!("Expected error persisting manager"),
936                         Err(e) => {
937                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
938                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
939                         },
940                 }
941         }
942
943         #[test]
944         fn test_network_graph_persist_error() {
945                 // Test that if we encounter an error during network graph persistence, an error gets returned.
946                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
947                 let data_dir = nodes[0].persister.get_data_dir();
948                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
949                 let event_handler = |_: _| {};
950                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
951
952                 match bg_processor.stop() {
953                         Ok(_) => panic!("Expected error persisting network graph"),
954                         Err(e) => {
955                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
956                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
957                         },
958                 }
959         }
960
961         #[test]
962         fn test_scorer_persist_error() {
963                 // Test that if we encounter an error during scorer persistence, an error gets returned.
964                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
965                 let data_dir = nodes[0].persister.get_data_dir();
966                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
967                 let event_handler = |_: _| {};
968                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
969
970                 match bg_processor.stop() {
971                         Ok(_) => panic!("Expected error persisting scorer"),
972                         Err(e) => {
973                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
974                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
975                         },
976                 }
977         }
978
979         #[test]
980         fn test_background_event_handling() {
981                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
982                 let channel_value = 100000;
983                 let data_dir = nodes[0].persister.get_data_dir();
984                 let persister = Arc::new(Persister::new(data_dir.clone()));
985
986                 // Set up a background event handler for FundingGenerationReady events.
987                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
988                 let event_handler = move |event: Event| match event {
989                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
990                         Event::ChannelReady { .. } => {},
991                         _ => panic!("Unexpected event: {:?}", event),
992                 };
993
994                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
995
996                 // Open a channel and check that the FundingGenerationReady event was handled.
997                 begin_open_channel!(nodes[0], nodes[1], channel_value);
998                 let (temporary_channel_id, funding_tx) = receiver
999                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1000                         .expect("FundingGenerationReady not handled within deadline");
1001                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1002
1003                 // Confirm the funding transaction.
1004                 confirm_transaction(&mut nodes[0], &funding_tx);
1005                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1006                 confirm_transaction(&mut nodes[1], &funding_tx);
1007                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1008                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1009                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1010                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1011                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1012
1013                 assert!(bg_processor.stop().is_ok());
1014
1015                 // Set up a background event handler for SpendableOutputs events.
1016                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1017                 let event_handler = move |event: Event| match event {
1018                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1019                         Event::ChannelReady { .. } => {},
1020                         Event::ChannelClosed { .. } => {},
1021                         _ => panic!("Unexpected event: {:?}", event),
1022                 };
1023                 let persister = Arc::new(Persister::new(data_dir));
1024                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1025
1026                 // Force close the channel and check that the SpendableOutputs event was handled.
1027                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1028                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1029                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1030
1031                 let event = receiver
1032                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1033                         .expect("Events not handled within deadline");
1034                 match event {
1035                         Event::SpendableOutputs { .. } => {},
1036                         _ => panic!("Unexpected event: {:?}", event),
1037                 }
1038
1039                 assert!(bg_processor.stop().is_ok());
1040         }
1041
1042         #[test]
1043         fn test_scorer_persistence() {
1044                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1045                 let data_dir = nodes[0].persister.get_data_dir();
1046                 let persister = Arc::new(Persister::new(data_dir));
1047                 let event_handler = |_: _| {};
1048                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1049
1050                 loop {
1051                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1052                         let expected_log = "Persisting scorer".to_string();
1053                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1054                                 break
1055                         }
1056                 }
1057
1058                 assert!(bg_processor.stop().is_ok());
1059         }
1060
1061         #[test]
1062         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1063                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1064                 let data_dir = nodes[0].persister.get_data_dir();
1065                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1066                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1067                 let network_graph = nodes[0].network_graph.clone();
1068                 let features = ChannelFeatures::empty();
1069                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1070                         .expect("Failed to update channel from partial announcement");
1071                 let original_graph_description = network_graph.to_string();
1072                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1073                 assert_eq!(network_graph.read_only().channels().len(), 1);
1074
1075                 let event_handler = |_: _| {};
1076                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1077
1078                 loop {
1079                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1080                         let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1081                         if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1082                                 .unwrap_or(&0) > 1
1083                         {
1084                                 // Wait until the loop has gone around at least twice.
1085                                 break
1086                         }
1087                 }
1088
1089                 let initialization_input = vec![
1090                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1091                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1092                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1093                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1094                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1095                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1096                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1097                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1098                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1099                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1100                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1101                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1102                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1103                 ];
1104                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1105
1106                 // this should have added two channels
1107                 assert_eq!(network_graph.read_only().channels().len(), 3);
1108
1109                 let _ = receiver
1110                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1111                         .expect("Network graph not pruned within deadline");
1112
1113                 background_processor.stop().unwrap();
1114
1115                 // all channels should now be pruned
1116                 assert_eq!(network_graph.read_only().channels().len(), 0);
1117         }
1118
1119         #[test]
1120         fn test_invoice_payer() {
1121                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1122                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1123                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1124
1125                 // Initiate the background processors to watch each node.
1126                 let data_dir = nodes[0].persister.get_data_dir();
1127                 let persister = Arc::new(Persister::new(data_dir));
1128                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1129                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1130                 let event_handler = Arc::clone(&invoice_payer);
1131                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1132                 assert!(bg_processor.stop().is_ok());
1133         }
1134 }