Merge pull request #1787 from wpaulino/async-event-handler
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
39
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 ///   writing it to disk/backups by invoking the callback given to it at startup.
46 ///   [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 ///   at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
51 ///
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
54 ///
55 /// # Note
56 ///
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
61 ///
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66         stop_thread: Arc<AtomicBool>,
67         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 }
69
70 #[cfg(not(test))]
71 const FRESHNESS_TIMER: u64 = 60;
72 #[cfg(test)]
73 const FRESHNESS_TIMER: u64 = 1;
74
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
82 #[cfg(test)]
83 const PING_TIMER: u64 = 1;
84
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
87
88 #[cfg(not(test))]
89 const SCORER_PERSIST_TIMER: u64 = 30;
90 #[cfg(test)]
91 const SCORER_PERSIST_TIMER: u64 = 1;
92
93 #[cfg(not(test))]
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
95 #[cfg(test)]
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
97
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
99 pub enum GossipSync<
100         P: Deref<Target = P2PGossipSync<G, A, L>>,
101         R: Deref<Target = RapidGossipSync<G, L>>,
102         G: Deref<Target = NetworkGraph<L>>,
103         A: Deref,
104         L: Deref,
105 >
106 where A::Target: chain::Access, L::Target: Logger {
107         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
108         P2P(P),
109         /// Rapid gossip sync from a trusted server.
110         Rapid(R),
111         /// No gossip sync.
112         None,
113 }
114
115 impl<
116         P: Deref<Target = P2PGossipSync<G, A, L>>,
117         R: Deref<Target = RapidGossipSync<G, L>>,
118         G: Deref<Target = NetworkGraph<L>>,
119         A: Deref,
120         L: Deref,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123         fn network_graph(&self) -> Option<&G> {
124                 match self {
125                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::None => None,
128                 }
129         }
130
131         fn prunable_network_graph(&self) -> Option<&G> {
132                 match self {
133                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134                         GossipSync::Rapid(gossip_sync) => {
135                                 if gossip_sync.is_initial_sync_complete() {
136                                         Some(gossip_sync.network_graph())
137                                 } else {
138                                         None
139                                 }
140                         },
141                         GossipSync::None => None,
142                 }
143         }
144 }
145
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
149 where
150         A::Target: chain::Access,
151         L::Target: Logger,
152 {
153         /// Initializes a new [`GossipSync::P2P`] variant.
154         pub fn p2p(gossip_sync: P) -> Self {
155                 GossipSync::P2P(gossip_sync)
156         }
157 }
158
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
161         GossipSync<
162                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
163                 R,
164                 G,
165                 &'a (dyn chain::Access + Send + Sync),
166                 L,
167         >
168 where
169         L::Target: Logger,
170 {
171         /// Initializes a new [`GossipSync::Rapid`] variant.
172         pub fn rapid(gossip_sync: R) -> Self {
173                 GossipSync::Rapid(gossip_sync)
174         }
175 }
176
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
178 impl<'a, L: Deref>
179         GossipSync<
180                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
182                 &'a NetworkGraph<L>,
183                 &'a (dyn chain::Access + Send + Sync),
184                 L,
185         >
186 where
187         L::Target: Logger,
188 {
189         /// Initializes a new [`GossipSync::None`] variant.
190         pub fn none() -> Self {
191                 GossipSync::None
192         }
193 }
194
195 fn handle_network_graph_update<L: Deref>(
196         network_graph: &NetworkGraph<L>, event: &Event
197 ) where L::Target: Logger {
198         if let Event::PaymentPathFailed { ref network_update, .. } = event {
199                 if let Some(network_update) = network_update {
200                         network_graph.handle_network_update(&network_update);
201                 }
202         }
203 }
204
205 macro_rules! define_run_body {
206         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
207          $channel_manager: ident, $process_channel_manager_events: expr,
208          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
209          $loop_exit_check: expr, $await: expr)
210         => { {
211                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
212                 $channel_manager.timer_tick_occurred();
213
214                 let mut last_freshness_call = Instant::now();
215                 let mut last_ping_call = Instant::now();
216                 let mut last_prune_call = Instant::now();
217                 let mut last_scorer_persist_call = Instant::now();
218                 let mut have_pruned = false;
219
220                 loop {
221                         $process_channel_manager_events;
222                         $process_chain_monitor_events;
223
224                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
225                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
226                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
227                         // without running the normal event processing above and handing events to users.
228                         //
229                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
230                         // processing a message effectively at any point during this loop. In order to
231                         // minimize the time between such processing completing and persisting the updated
232                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
233                         // generally, and as a fallback place such blocking only immediately before
234                         // persistence.
235                         $peer_manager.process_events();
236
237                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
238                         // see `await_start`'s use below.
239                         let await_start = Instant::now();
240                         let updates_available = $await;
241                         let await_time = await_start.elapsed();
242
243                         if updates_available {
244                                 log_trace!($logger, "Persisting ChannelManager...");
245                                 $persister.persist_manager(&*$channel_manager)?;
246                                 log_trace!($logger, "Done persisting ChannelManager.");
247                         }
248                         // Exit the loop if the background processor was requested to stop.
249                         if $loop_exit_check {
250                                 log_trace!($logger, "Terminating background processor.");
251                                 break;
252                         }
253                         if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
254                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
255                                 $channel_manager.timer_tick_occurred();
256                                 last_freshness_call = Instant::now();
257                         }
258                         if await_time > Duration::from_secs(1) {
259                                 // On various platforms, we may be starved of CPU cycles for several reasons.
260                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
261                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
262                                 // may not get any cycles.
263                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
264                                 // full second, at which point we assume sockets may have been killed (they
265                                 // appear to be at least on some platforms, even if it has only been a second).
266                                 // Note that we have to take care to not get here just because user event
267                                 // processing was slow at the top of the loop. For example, the sample client
268                                 // may call Bitcoin Core RPCs during event handling, which very often takes
269                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
270                                 // peers.
271                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
272                                 $peer_manager.disconnect_all_peers();
273                                 last_ping_call = Instant::now();
274                         } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
275                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
276                                 $peer_manager.timer_tick_occurred();
277                                 last_ping_call = Instant::now();
278                         }
279
280                         // Note that we want to run a graph prune once not long after startup before
281                         // falling back to our usual hourly prunes. This avoids short-lived clients never
282                         // pruning their network graph. We run once 60 seconds after startup before
283                         // continuing our normal cadence.
284                         if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
285                                 // The network graph must not be pruned while rapid sync completion is pending
286                                 log_trace!($logger, "Assessing prunability of network graph");
287                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
288                                         network_graph.remove_stale_channels_and_tracking();
289
290                                         if let Err(e) = $persister.persist_graph(network_graph) {
291                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
292                                         }
293
294                                         last_prune_call = Instant::now();
295                                         have_pruned = true;
296                                 } else {
297                                         log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
298                                 }
299                         }
300
301                         if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
302                                 if let Some(ref scorer) = $scorer {
303                                         log_trace!($logger, "Persisting scorer");
304                                         if let Err(e) = $persister.persist_scorer(&scorer) {
305                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
306                                         }
307                                 }
308                                 last_scorer_persist_call = Instant::now();
309                         }
310                 }
311
312                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
313                 // some races where users quit while channel updates were in-flight, with
314                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
315                 $persister.persist_manager(&*$channel_manager)?;
316
317                 // Persist Scorer on exit
318                 if let Some(ref scorer) = $scorer {
319                         $persister.persist_scorer(&scorer)?;
320                 }
321
322                 // Persist NetworkGraph on exit
323                 if let Some(network_graph) = $gossip_sync.network_graph() {
324                         $persister.persist_graph(network_graph)?;
325                 }
326
327                 Ok(())
328         } }
329 }
330
331 /// Processes background events in a future.
332 ///
333 /// `sleeper` should return a future which completes in the given amount of time and returns a
334 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
335 /// future which outputs true, the loop will exit and this function's future will complete.
336 ///
337 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
338 #[cfg(feature = "futures")]
339 pub async fn process_events_async<
340         'a,
341         Signer: 'static + Sign,
342         CA: 'static + Deref + Send + Sync,
343         CF: 'static + Deref + Send + Sync,
344         CW: 'static + Deref + Send + Sync,
345         T: 'static + Deref + Send + Sync,
346         K: 'static + Deref + Send + Sync,
347         F: 'static + Deref + Send + Sync,
348         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
349         L: 'static + Deref + Send + Sync,
350         P: 'static + Deref + Send + Sync,
351         Descriptor: 'static + SocketDescriptor + Send + Sync,
352         CMH: 'static + Deref + Send + Sync,
353         RMH: 'static + Deref + Send + Sync,
354         OMH: 'static + Deref + Send + Sync,
355         EventHandlerFuture: core::future::Future<Output = ()>,
356         EventHandler: Fn(Event) -> EventHandlerFuture,
357         PS: 'static + Deref + Send,
358         M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
359         CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
360         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
361         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
362         UMH: 'static + Deref + Send + Sync,
363         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
364         S: 'static + Deref<Target = SC> + Send + Sync,
365         SC: WriteableScore<'a>,
366         SleepFuture: core::future::Future<Output = bool>,
367         Sleeper: Fn(Duration) -> SleepFuture
368 >(
369         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
370         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
371         sleeper: Sleeper,
372 ) -> Result<(), std::io::Error>
373 where
374         CA::Target: 'static + chain::Access,
375         CF::Target: 'static + chain::Filter,
376         CW::Target: 'static + chain::Watch<Signer>,
377         T::Target: 'static + BroadcasterInterface,
378         K::Target: 'static + KeysInterface<Signer = Signer>,
379         F::Target: 'static + FeeEstimator,
380         L::Target: 'static + Logger,
381         P::Target: 'static + Persist<Signer>,
382         CMH::Target: 'static + ChannelMessageHandler,
383         OMH::Target: 'static + OnionMessageHandler,
384         RMH::Target: 'static + RoutingMessageHandler,
385         UMH::Target: 'static + CustomMessageHandler,
386         PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
387 {
388         let mut should_break = true;
389         let async_event_handler = |event| {
390                 let network_graph = gossip_sync.network_graph();
391                 let event_handler = &event_handler;
392                 async move {
393                         if let Some(network_graph) = network_graph {
394                                 handle_network_graph_update(network_graph, &event)
395                         }
396                         event_handler(event).await;
397                 }
398         };
399         define_run_body!(persister,
400                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
401                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
402                 gossip_sync, peer_manager, logger, scorer, should_break, {
403                         select_biased! {
404                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
405                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
406                                         should_break = exit;
407                                         false
408                                 }
409                         }
410                 })
411 }
412
413 impl BackgroundProcessor {
414         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
415         /// documentation].
416         ///
417         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
418         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
419         /// either [`join`] or [`stop`].
420         ///
421         /// # Data Persistence
422         ///
423         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
424         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
425         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
426         /// provided implementation.
427         ///
428         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
429         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
430         /// See the `lightning-persister` crate for LDK's provided implementation.
431         ///
432         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
433         /// error or call [`join`] and handle any error that may arise. For the latter case,
434         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
435         ///
436         /// # Event Handling
437         ///
438         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
439         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
440         /// functionality implemented by other handlers.
441         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
442         ///
443         /// # Rapid Gossip Sync
444         ///
445         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
446         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
447         /// until the [`RapidGossipSync`] instance completes its first sync.
448         ///
449         /// [top-level documentation]: BackgroundProcessor
450         /// [`join`]: Self::join
451         /// [`stop`]: Self::stop
452         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
453         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
454         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
455         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
456         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
457         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
458         pub fn start<
459                 'a,
460                 Signer: 'static + Sign,
461                 CA: 'static + Deref + Send + Sync,
462                 CF: 'static + Deref + Send + Sync,
463                 CW: 'static + Deref + Send + Sync,
464                 T: 'static + Deref + Send + Sync,
465                 K: 'static + Deref + Send + Sync,
466                 F: 'static + Deref + Send + Sync,
467                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
468                 L: 'static + Deref + Send + Sync,
469                 P: 'static + Deref + Send + Sync,
470                 Descriptor: 'static + SocketDescriptor + Send + Sync,
471                 CMH: 'static + Deref + Send + Sync,
472                 OMH: 'static + Deref + Send + Sync,
473                 RMH: 'static + Deref + Send + Sync,
474                 EH: 'static + EventHandler + Send,
475                 PS: 'static + Deref + Send,
476                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
477                 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
478                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
479                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
480                 UMH: 'static + Deref + Send + Sync,
481                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
482                 S: 'static + Deref<Target = SC> + Send + Sync,
483                 SC: WriteableScore<'a>,
484         >(
485                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
486                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
487         ) -> Self
488         where
489                 CA::Target: 'static + chain::Access,
490                 CF::Target: 'static + chain::Filter,
491                 CW::Target: 'static + chain::Watch<Signer>,
492                 T::Target: 'static + BroadcasterInterface,
493                 K::Target: 'static + KeysInterface<Signer = Signer>,
494                 F::Target: 'static + FeeEstimator,
495                 L::Target: 'static + Logger,
496                 P::Target: 'static + Persist<Signer>,
497                 CMH::Target: 'static + ChannelMessageHandler,
498                 OMH::Target: 'static + OnionMessageHandler,
499                 RMH::Target: 'static + RoutingMessageHandler,
500                 UMH::Target: 'static + CustomMessageHandler,
501                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
502         {
503                 let stop_thread = Arc::new(AtomicBool::new(false));
504                 let stop_thread_clone = stop_thread.clone();
505                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
506                         let event_handler = |event| {
507                                 let network_graph = gossip_sync.network_graph();
508                                 if let Some(network_graph) = network_graph {
509                                         handle_network_graph_update(network_graph, &event)
510                                 }
511                                 event_handler.handle_event(event);
512                         };
513                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
514                                 channel_manager, channel_manager.process_pending_events(&event_handler),
515                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
516                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
517                 });
518                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
519         }
520
521         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
522         /// [`ChannelManager`].
523         ///
524         /// # Panics
525         ///
526         /// This function panics if the background thread has panicked such as while persisting or
527         /// handling events.
528         ///
529         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
530         pub fn join(mut self) -> Result<(), std::io::Error> {
531                 assert!(self.thread_handle.is_some());
532                 self.join_thread()
533         }
534
535         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
536         /// [`ChannelManager`].
537         ///
538         /// # Panics
539         ///
540         /// This function panics if the background thread has panicked such as while persisting or
541         /// handling events.
542         ///
543         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
544         pub fn stop(mut self) -> Result<(), std::io::Error> {
545                 assert!(self.thread_handle.is_some());
546                 self.stop_and_join_thread()
547         }
548
549         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
550                 self.stop_thread.store(true, Ordering::Release);
551                 self.join_thread()
552         }
553
554         fn join_thread(&mut self) -> Result<(), std::io::Error> {
555                 match self.thread_handle.take() {
556                         Some(handle) => handle.join().unwrap(),
557                         None => Ok(()),
558                 }
559         }
560 }
561
562 impl Drop for BackgroundProcessor {
563         fn drop(&mut self) {
564                 self.stop_and_join_thread().unwrap();
565         }
566 }
567
568 #[cfg(test)]
569 mod tests {
570         use bitcoin::blockdata::block::BlockHeader;
571         use bitcoin::blockdata::constants::genesis_block;
572         use bitcoin::blockdata::locktime::PackedLockTime;
573         use bitcoin::blockdata::transaction::{Transaction, TxOut};
574         use bitcoin::network::constants::Network;
575         use lightning::chain::{BestBlock, Confirm, chainmonitor};
576         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
577         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
578         use lightning::chain::transaction::OutPoint;
579         use lightning::get_event_msg;
580         use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
581         use lightning::ln::features::ChannelFeatures;
582         use lightning::ln::msgs::{ChannelMessageHandler, Init};
583         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
584         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
585         use lightning::util::config::UserConfig;
586         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
587         use lightning::util::ser::Writeable;
588         use lightning::util::test_utils;
589         use lightning::util::persist::KVStorePersister;
590         use lightning_invoice::payment::{InvoicePayer, Retry};
591         use lightning_invoice::utils::DefaultRouter;
592         use lightning_persister::FilesystemPersister;
593         use std::fs;
594         use std::path::PathBuf;
595         use std::sync::{Arc, Mutex};
596         use std::sync::mpsc::SyncSender;
597         use std::time::Duration;
598         use bitcoin::hashes::Hash;
599         use bitcoin::TxMerkleNode;
600         use lightning::routing::scoring::{FixedPenaltyScorer};
601         use lightning_rapid_gossip_sync::RapidGossipSync;
602         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
603
604         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
605
606         #[derive(Clone, Hash, PartialEq, Eq)]
607         struct TestDescriptor{}
608         impl SocketDescriptor for TestDescriptor {
609                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
610                         0
611                 }
612
613                 fn disconnect_socket(&mut self) {}
614         }
615
616         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
617
618         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
619         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
620
621         struct Node {
622                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
623                 p2p_gossip_sync: PGS,
624                 rapid_gossip_sync: RGS,
625                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
626                 chain_monitor: Arc<ChainMonitor>,
627                 persister: Arc<FilesystemPersister>,
628                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
629                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
630                 logger: Arc<test_utils::TestLogger>,
631                 best_block: BestBlock,
632                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
633         }
634
635         impl Node {
636                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
637                         GossipSync::P2P(self.p2p_gossip_sync.clone())
638                 }
639
640                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
641                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
642                 }
643
644                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
645                         GossipSync::None
646                 }
647         }
648
649         impl Drop for Node {
650                 fn drop(&mut self) {
651                         let data_dir = self.persister.get_data_dir();
652                         match fs::remove_dir_all(data_dir.clone()) {
653                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
654                                 _ => {}
655                         }
656                 }
657         }
658
659         struct Persister {
660                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
661                 graph_persistence_notifier: Option<SyncSender<()>>,
662                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
663                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
664                 filesystem_persister: FilesystemPersister,
665         }
666
667         impl Persister {
668                 fn new(data_dir: String) -> Self {
669                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
670                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
671                 }
672
673                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
674                         Self { graph_error: Some((error, message)), ..self }
675                 }
676
677                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
678                         Self { graph_persistence_notifier: Some(sender), ..self }
679                 }
680
681                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
682                         Self { manager_error: Some((error, message)), ..self }
683                 }
684
685                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
686                         Self { scorer_error: Some((error, message)), ..self }
687                 }
688         }
689
690         impl KVStorePersister for Persister {
691                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
692                         if key == "manager" {
693                                 if let Some((error, message)) = self.manager_error {
694                                         return Err(std::io::Error::new(error, message))
695                                 }
696                         }
697
698                         if key == "network_graph" {
699                                 if let Some(sender) = &self.graph_persistence_notifier {
700                                         sender.send(()).unwrap();
701                                 };
702
703                                 if let Some((error, message)) = self.graph_error {
704                                         return Err(std::io::Error::new(error, message))
705                                 }
706                         }
707
708                         if key == "scorer" {
709                                 if let Some((error, message)) = self.scorer_error {
710                                         return Err(std::io::Error::new(error, message))
711                                 }
712                         }
713
714                         self.filesystem_persister.persist(key, object)
715                 }
716         }
717
718         fn get_full_filepath(filepath: String, filename: String) -> String {
719                 let mut path = PathBuf::from(filepath);
720                 path.push(filename);
721                 path.to_str().unwrap().to_string()
722         }
723
724         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
725                 let mut nodes = Vec::new();
726                 for i in 0..num_nodes {
727                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
728                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
729                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
730                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
731                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
732                         let seed = [i as u8; 32];
733                         let network = Network::Testnet;
734                         let genesis_block = genesis_block(network);
735                         let now = Duration::from_secs(genesis_block.header.time as u64);
736                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
737                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
738                         let best_block = BestBlock::from_genesis(network);
739                         let params = ChainParameters { network, best_block };
740                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
741                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
742                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
743                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
744                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
745                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
746                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
747                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
748                         nodes.push(node);
749                 }
750
751                 for i in 0..num_nodes {
752                         for j in (i+1)..num_nodes {
753                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
754                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
755                         }
756                 }
757
758                 nodes
759         }
760
761         macro_rules! open_channel {
762                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
763                         begin_open_channel!($node_a, $node_b, $channel_value);
764                         let events = $node_a.node.get_and_clear_pending_events();
765                         assert_eq!(events.len(), 1);
766                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
767                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
768                         tx
769                 }}
770         }
771
772         macro_rules! begin_open_channel {
773                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
774                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
775                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
776                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
777                 }}
778         }
779
780         macro_rules! handle_funding_generation_ready {
781                 ($event: expr, $channel_value: expr) => {{
782                         match $event {
783                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
784                                         assert_eq!(channel_value_satoshis, $channel_value);
785                                         assert_eq!(user_channel_id, 42);
786
787                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
788                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
789                                         }]};
790                                         (temporary_channel_id, tx)
791                                 },
792                                 _ => panic!("Unexpected event"),
793                         }
794                 }}
795         }
796
797         macro_rules! end_open_channel {
798                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
799                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
800                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
801                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
802                 }}
803         }
804
805         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
806                 for i in 1..=depth {
807                         let prev_blockhash = node.best_block.block_hash();
808                         let height = node.best_block.height() + 1;
809                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
810                         let txdata = vec![(0, tx)];
811                         node.best_block = BestBlock::new(header.block_hash(), height);
812                         match i {
813                                 1 => {
814                                         node.node.transactions_confirmed(&header, &txdata, height);
815                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
816                                 },
817                                 x if x == depth => {
818                                         node.node.best_block_updated(&header, height);
819                                         node.chain_monitor.best_block_updated(&header, height);
820                                 },
821                                 _ => {},
822                         }
823                 }
824         }
825         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
826                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
827         }
828
829         #[test]
830         fn test_background_processor() {
831                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
832                 // updates. Also test that when new updates are available, the manager signals that it needs
833                 // re-persistence and is successfully re-persisted.
834                 let nodes = create_nodes(2, "test_background_processor".to_string());
835
836                 // Go through the channel creation process so that each node has something to persist. Since
837                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
838                 // avoid a race with processing events.
839                 let tx = open_channel!(nodes[0], nodes[1], 100000);
840
841                 // Initiate the background processors to watch each node.
842                 let data_dir = nodes[0].persister.get_data_dir();
843                 let persister = Arc::new(Persister::new(data_dir));
844                 let event_handler = |_: _| {};
845                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
846
847                 macro_rules! check_persisted_data {
848                         ($node: expr, $filepath: expr) => {
849                                 let mut expected_bytes = Vec::new();
850                                 loop {
851                                         expected_bytes.clear();
852                                         match $node.write(&mut expected_bytes) {
853                                                 Ok(()) => {
854                                                         match std::fs::read($filepath) {
855                                                                 Ok(bytes) => {
856                                                                         if bytes == expected_bytes {
857                                                                                 break
858                                                                         } else {
859                                                                                 continue
860                                                                         }
861                                                                 },
862                                                                 Err(_) => continue
863                                                         }
864                                                 },
865                                                 Err(e) => panic!("Unexpected error: {}", e)
866                                         }
867                                 }
868                         }
869                 }
870
871                 // Check that the initial channel manager data is persisted as expected.
872                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
873                 check_persisted_data!(nodes[0].node, filepath.clone());
874
875                 loop {
876                         if !nodes[0].node.get_persistence_condvar_value() { break }
877                 }
878
879                 // Force-close the channel.
880                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
881
882                 // Check that the force-close updates are persisted.
883                 check_persisted_data!(nodes[0].node, filepath.clone());
884                 loop {
885                         if !nodes[0].node.get_persistence_condvar_value() { break }
886                 }
887
888                 // Check network graph is persisted
889                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
890                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
891
892                 // Check scorer is persisted
893                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
894                 check_persisted_data!(nodes[0].scorer, filepath.clone());
895
896                 assert!(bg_processor.stop().is_ok());
897         }
898
899         #[test]
900         fn test_timer_tick_called() {
901                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
902                 // `FRESHNESS_TIMER`.
903                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
904                 let data_dir = nodes[0].persister.get_data_dir();
905                 let persister = Arc::new(Persister::new(data_dir));
906                 let event_handler = |_: _| {};
907                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
908                 loop {
909                         let log_entries = nodes[0].logger.lines.lock().unwrap();
910                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
911                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
912                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
913                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
914                                 break
915                         }
916                 }
917
918                 assert!(bg_processor.stop().is_ok());
919         }
920
921         #[test]
922         fn test_channel_manager_persist_error() {
923                 // Test that if we encounter an error during manager persistence, the thread panics.
924                 let nodes = create_nodes(2, "test_persist_error".to_string());
925                 open_channel!(nodes[0], nodes[1], 100000);
926
927                 let data_dir = nodes[0].persister.get_data_dir();
928                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
929                 let event_handler = |_: _| {};
930                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
931                 match bg_processor.join() {
932                         Ok(_) => panic!("Expected error persisting manager"),
933                         Err(e) => {
934                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
935                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
936                         },
937                 }
938         }
939
940         #[test]
941         fn test_network_graph_persist_error() {
942                 // Test that if we encounter an error during network graph persistence, an error gets returned.
943                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
944                 let data_dir = nodes[0].persister.get_data_dir();
945                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
946                 let event_handler = |_: _| {};
947                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
948
949                 match bg_processor.stop() {
950                         Ok(_) => panic!("Expected error persisting network graph"),
951                         Err(e) => {
952                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
953                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
954                         },
955                 }
956         }
957
958         #[test]
959         fn test_scorer_persist_error() {
960                 // Test that if we encounter an error during scorer persistence, an error gets returned.
961                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
962                 let data_dir = nodes[0].persister.get_data_dir();
963                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
964                 let event_handler = |_: _| {};
965                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
966
967                 match bg_processor.stop() {
968                         Ok(_) => panic!("Expected error persisting scorer"),
969                         Err(e) => {
970                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
971                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
972                         },
973                 }
974         }
975
976         #[test]
977         fn test_background_event_handling() {
978                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
979                 let channel_value = 100000;
980                 let data_dir = nodes[0].persister.get_data_dir();
981                 let persister = Arc::new(Persister::new(data_dir.clone()));
982
983                 // Set up a background event handler for FundingGenerationReady events.
984                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
985                 let event_handler = move |event: Event| match event {
986                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
987                         Event::ChannelReady { .. } => {},
988                         _ => panic!("Unexpected event: {:?}", event),
989                 };
990
991                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
992
993                 // Open a channel and check that the FundingGenerationReady event was handled.
994                 begin_open_channel!(nodes[0], nodes[1], channel_value);
995                 let (temporary_channel_id, funding_tx) = receiver
996                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
997                         .expect("FundingGenerationReady not handled within deadline");
998                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
999
1000                 // Confirm the funding transaction.
1001                 confirm_transaction(&mut nodes[0], &funding_tx);
1002                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1003                 confirm_transaction(&mut nodes[1], &funding_tx);
1004                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1005                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1006                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1007                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1008                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1009
1010                 assert!(bg_processor.stop().is_ok());
1011
1012                 // Set up a background event handler for SpendableOutputs events.
1013                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1014                 let event_handler = move |event: Event| match event {
1015                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1016                         Event::ChannelReady { .. } => {},
1017                         Event::ChannelClosed { .. } => {},
1018                         _ => panic!("Unexpected event: {:?}", event),
1019                 };
1020                 let persister = Arc::new(Persister::new(data_dir));
1021                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1022
1023                 // Force close the channel and check that the SpendableOutputs event was handled.
1024                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1025                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1026                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1027
1028                 let event = receiver
1029                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1030                         .expect("Events not handled within deadline");
1031                 match event {
1032                         Event::SpendableOutputs { .. } => {},
1033                         _ => panic!("Unexpected event: {:?}", event),
1034                 }
1035
1036                 assert!(bg_processor.stop().is_ok());
1037         }
1038
1039         #[test]
1040         fn test_scorer_persistence() {
1041                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1042                 let data_dir = nodes[0].persister.get_data_dir();
1043                 let persister = Arc::new(Persister::new(data_dir));
1044                 let event_handler = |_: _| {};
1045                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1046
1047                 loop {
1048                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1049                         let expected_log = "Persisting scorer".to_string();
1050                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1051                                 break
1052                         }
1053                 }
1054
1055                 assert!(bg_processor.stop().is_ok());
1056         }
1057
1058         #[test]
1059         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1060                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1061                 let data_dir = nodes[0].persister.get_data_dir();
1062                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1063                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1064                 let network_graph = nodes[0].network_graph.clone();
1065                 let features = ChannelFeatures::empty();
1066                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1067                         .expect("Failed to update channel from partial announcement");
1068                 let original_graph_description = network_graph.to_string();
1069                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1070                 assert_eq!(network_graph.read_only().channels().len(), 1);
1071
1072                 let event_handler = |_: _| {};
1073                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1074
1075                 loop {
1076                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1077                         let expected_log_a = "Assessing prunability of network graph".to_string();
1078                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1079                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1080                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1081                                 break
1082                         }
1083                 }
1084
1085                 let initialization_input = vec![
1086                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1087                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1088                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1089                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1090                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1091                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1092                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1093                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1094                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1095                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1096                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1097                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1098                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1099                 ];
1100                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1101
1102                 // this should have added two channels
1103                 assert_eq!(network_graph.read_only().channels().len(), 3);
1104
1105                 let _ = receiver
1106                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1107                         .expect("Network graph not pruned within deadline");
1108
1109                 background_processor.stop().unwrap();
1110
1111                 // all channels should now be pruned
1112                 assert_eq!(network_graph.read_only().channels().len(), 0);
1113         }
1114
1115         #[test]
1116         fn test_invoice_payer() {
1117                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1118                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1119                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1120
1121                 // Initiate the background processors to watch each node.
1122                 let data_dir = nodes[0].persister.get_data_dir();
1123                 let persister = Arc::new(Persister::new(data_dir));
1124                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1125                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1126                 let event_handler = Arc::clone(&invoice_payer);
1127                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1128                 assert!(bg_processor.stop().is_ok());
1129         }
1130 }