60a63d8533e56bdc8d31071a000b84aa13e2e843
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
15
16 #[cfg(any(test, feature = "std"))]
17 extern crate core;
18
19 #[macro_use] extern crate lightning;
20 extern crate lightning_rapid_gossip_sync;
21
22 use lightning::chain;
23 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
24 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
25 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
26 use lightning::ln::channelmanager::ChannelManager;
27 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
28 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
29 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
30 use lightning::routing::router::Router;
31 use lightning::routing::scoring::WriteableScore;
32 use lightning::util::events::{Event, EventHandler, EventsProvider};
33 use lightning::util::logger::Logger;
34 use lightning::util::persist::Persister;
35 use lightning_rapid_gossip_sync::RapidGossipSync;
36 use lightning::io;
37
38 use core::ops::Deref;
39 use core::time::Duration;
40
41 #[cfg(feature = "std")]
42 use std::sync::Arc;
43 #[cfg(feature = "std")]
44 use core::sync::atomic::{AtomicBool, Ordering};
45 #[cfg(feature = "std")]
46 use std::thread::{self, JoinHandle};
47 #[cfg(feature = "std")]
48 use std::time::Instant;
49
50 #[cfg(feature = "futures")]
51 use futures_util::{select_biased, future::FutureExt, task};
52
53 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
54 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
55 /// responsibilities are:
56 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
57 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
58 ///   writing it to disk/backups by invoking the callback given to it at startup.
59 ///   [`ChannelManager`] persistence should be done in the background.
60 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
61 ///   at the appropriate intervals.
62 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
63 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
64 ///
65 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
66 /// upon as doing so may result in high latency.
67 ///
68 /// # Note
69 ///
70 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
71 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
72 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
73 /// unilateral chain closure fees are at risk.
74 ///
75 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
76 /// [`Event`]: lightning::util::events::Event
77 #[cfg(feature = "std")]
78 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
79 pub struct BackgroundProcessor {
80         stop_thread: Arc<AtomicBool>,
81         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
82 }
83
84 #[cfg(not(test))]
85 const FRESHNESS_TIMER: u64 = 60;
86 #[cfg(test)]
87 const FRESHNESS_TIMER: u64 = 1;
88
89 #[cfg(all(not(test), not(debug_assertions)))]
90 const PING_TIMER: u64 = 10;
91 /// Signature operations take a lot longer without compiler optimisations.
92 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
93 /// timeout is reached.
94 #[cfg(all(not(test), debug_assertions))]
95 const PING_TIMER: u64 = 30;
96 #[cfg(test)]
97 const PING_TIMER: u64 = 1;
98
99 /// Prune the network graph of stale entries hourly.
100 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
101
102 #[cfg(not(test))]
103 const SCORER_PERSIST_TIMER: u64 = 30;
104 #[cfg(test)]
105 const SCORER_PERSIST_TIMER: u64 = 1;
106
107 #[cfg(not(test))]
108 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
109 #[cfg(test)]
110 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
111
112 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
113 pub enum GossipSync<
114         P: Deref<Target = P2PGossipSync<G, A, L>>,
115         R: Deref<Target = RapidGossipSync<G, L>>,
116         G: Deref<Target = NetworkGraph<L>>,
117         A: Deref,
118         L: Deref,
119 >
120 where A::Target: chain::Access, L::Target: Logger {
121         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
122         P2P(P),
123         /// Rapid gossip sync from a trusted server.
124         Rapid(R),
125         /// No gossip sync.
126         None,
127 }
128
129 impl<
130         P: Deref<Target = P2PGossipSync<G, A, L>>,
131         R: Deref<Target = RapidGossipSync<G, L>>,
132         G: Deref<Target = NetworkGraph<L>>,
133         A: Deref,
134         L: Deref,
135 > GossipSync<P, R, G, A, L>
136 where A::Target: chain::Access, L::Target: Logger {
137         fn network_graph(&self) -> Option<&G> {
138                 match self {
139                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
140                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
141                         GossipSync::None => None,
142                 }
143         }
144
145         fn prunable_network_graph(&self) -> Option<&G> {
146                 match self {
147                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
148                         GossipSync::Rapid(gossip_sync) => {
149                                 if gossip_sync.is_initial_sync_complete() {
150                                         Some(gossip_sync.network_graph())
151                                 } else {
152                                         None
153                                 }
154                         },
155                         GossipSync::None => None,
156                 }
157         }
158 }
159
160 /// (C-not exported) as the bindings concretize everything and have constructors for us
161 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
162         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
163 where
164         A::Target: chain::Access,
165         L::Target: Logger,
166 {
167         /// Initializes a new [`GossipSync::P2P`] variant.
168         pub fn p2p(gossip_sync: P) -> Self {
169                 GossipSync::P2P(gossip_sync)
170         }
171 }
172
173 /// (C-not exported) as the bindings concretize everything and have constructors for us
174 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
175         GossipSync<
176                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
177                 R,
178                 G,
179                 &'a (dyn chain::Access + Send + Sync),
180                 L,
181         >
182 where
183         L::Target: Logger,
184 {
185         /// Initializes a new [`GossipSync::Rapid`] variant.
186         pub fn rapid(gossip_sync: R) -> Self {
187                 GossipSync::Rapid(gossip_sync)
188         }
189 }
190
191 /// (C-not exported) as the bindings concretize everything and have constructors for us
192 impl<'a, L: Deref>
193         GossipSync<
194                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
195                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
196                 &'a NetworkGraph<L>,
197                 &'a (dyn chain::Access + Send + Sync),
198                 L,
199         >
200 where
201         L::Target: Logger,
202 {
203         /// Initializes a new [`GossipSync::None`] variant.
204         pub fn none() -> Self {
205                 GossipSync::None
206         }
207 }
208
209 fn handle_network_graph_update<L: Deref>(
210         network_graph: &NetworkGraph<L>, event: &Event
211 ) where L::Target: Logger {
212         if let Event::PaymentPathFailed { ref network_update, .. } = event {
213                 if let Some(network_update) = network_update {
214                         network_graph.handle_network_update(&network_update);
215                 }
216         }
217 }
218
219 macro_rules! define_run_body {
220         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
221          $channel_manager: ident, $process_channel_manager_events: expr,
222          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
223          $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
224         => { {
225                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
226                 $channel_manager.timer_tick_occurred();
227
228                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
229                 let mut last_ping_call = $get_timer(PING_TIMER);
230                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
231                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
232                 let mut have_pruned = false;
233
234                 loop {
235                         $process_channel_manager_events;
236                         $process_chain_monitor_events;
237
238                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
239                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
240                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
241                         // without running the normal event processing above and handing events to users.
242                         //
243                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
244                         // processing a message effectively at any point during this loop. In order to
245                         // minimize the time between such processing completing and persisting the updated
246                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
247                         // generally, and as a fallback place such blocking only immediately before
248                         // persistence.
249                         $peer_manager.process_events();
250
251                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
252                         // see `await_start`'s use below.
253                         let mut await_start = $get_timer(1);
254                         let updates_available = $await;
255                         let await_slow = $timer_elapsed(&mut await_start, 1);
256
257                         if updates_available {
258                                 log_trace!($logger, "Persisting ChannelManager...");
259                                 $persister.persist_manager(&*$channel_manager)?;
260                                 log_trace!($logger, "Done persisting ChannelManager.");
261                         }
262                         // Exit the loop if the background processor was requested to stop.
263                         if $loop_exit_check {
264                                 log_trace!($logger, "Terminating background processor.");
265                                 break;
266                         }
267                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
268                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
269                                 $channel_manager.timer_tick_occurred();
270                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
271                         }
272                         if await_slow {
273                                 // On various platforms, we may be starved of CPU cycles for several reasons.
274                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
275                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
276                                 // may not get any cycles.
277                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
278                                 // full second, at which point we assume sockets may have been killed (they
279                                 // appear to be at least on some platforms, even if it has only been a second).
280                                 // Note that we have to take care to not get here just because user event
281                                 // processing was slow at the top of the loop. For example, the sample client
282                                 // may call Bitcoin Core RPCs during event handling, which very often takes
283                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
284                                 // peers.
285                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
286                                 $peer_manager.disconnect_all_peers();
287                                 last_ping_call = $get_timer(PING_TIMER);
288                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
289                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
290                                 $peer_manager.timer_tick_occurred();
291                                 last_ping_call = $get_timer(PING_TIMER);
292                         }
293
294                         // Note that we want to run a graph prune once not long after startup before
295                         // falling back to our usual hourly prunes. This avoids short-lived clients never
296                         // pruning their network graph. We run once 60 seconds after startup before
297                         // continuing our normal cadence.
298                         if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
299                                 // The network graph must not be pruned while rapid sync completion is pending
300                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
301                                         #[cfg(feature = "std")] {
302                                                 log_trace!($logger, "Pruning and persisting network graph.");
303                                                 network_graph.remove_stale_channels_and_tracking();
304                                         }
305                                         #[cfg(not(feature = "std"))] {
306                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
307                                                 log_trace!($logger, "Persisting network graph.");
308                                         }
309
310                                         if let Err(e) = $persister.persist_graph(network_graph) {
311                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
312                                         }
313
314                                         last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
315                                         have_pruned = true;
316                                 }
317                         }
318
319                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
320                                 if let Some(ref scorer) = $scorer {
321                                         log_trace!($logger, "Persisting scorer");
322                                         if let Err(e) = $persister.persist_scorer(&scorer) {
323                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
324                                         }
325                                 }
326                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
327                         }
328                 }
329
330                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
331                 // some races where users quit while channel updates were in-flight, with
332                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
333                 $persister.persist_manager(&*$channel_manager)?;
334
335                 // Persist Scorer on exit
336                 if let Some(ref scorer) = $scorer {
337                         $persister.persist_scorer(&scorer)?;
338                 }
339
340                 // Persist NetworkGraph on exit
341                 if let Some(network_graph) = $gossip_sync.network_graph() {
342                         $persister.persist_graph(network_graph)?;
343                 }
344
345                 Ok(())
346         } }
347 }
348
349 /// Processes background events in a future.
350 ///
351 /// `sleeper` should return a future which completes in the given amount of time and returns a
352 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
353 /// future which outputs true, the loop will exit and this function's future will complete.
354 ///
355 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
356 ///
357 /// Requires the `futures` feature. Note that while this method is available without the `std`
358 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
359 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
360 /// manually instead.
361 #[cfg(feature = "futures")]
362 pub async fn process_events_async<
363         'a,
364         CA: 'static + Deref + Send + Sync,
365         CF: 'static + Deref + Send + Sync,
366         CW: 'static + Deref + Send + Sync,
367         T: 'static + Deref + Send + Sync,
368         ES: 'static + Deref + Send + Sync,
369         NS: 'static + Deref + Send + Sync,
370         SP: 'static + Deref + Send + Sync,
371         F: 'static + Deref + Send + Sync,
372         R: 'static + Deref + Send + Sync,
373         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
374         L: 'static + Deref + Send + Sync,
375         P: 'static + Deref + Send + Sync,
376         Descriptor: 'static + SocketDescriptor + Send + Sync,
377         CMH: 'static + Deref + Send + Sync,
378         RMH: 'static + Deref + Send + Sync,
379         OMH: 'static + Deref + Send + Sync,
380         EventHandlerFuture: core::future::Future<Output = ()>,
381         EventHandler: Fn(Event) -> EventHandlerFuture,
382         PS: 'static + Deref + Send,
383         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
384         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
385         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
386         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
387         UMH: 'static + Deref + Send + Sync,
388         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
389         S: 'static + Deref<Target = SC> + Send + Sync,
390         SC: WriteableScore<'a>,
391         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
392         Sleeper: Fn(Duration) -> SleepFuture
393 >(
394         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
395         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
396         sleeper: Sleeper,
397 ) -> Result<(), io::Error>
398 where
399         CA::Target: 'static + chain::Access,
400         CF::Target: 'static + chain::Filter,
401         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
402         T::Target: 'static + BroadcasterInterface,
403         ES::Target: 'static + EntropySource,
404         NS::Target: 'static + NodeSigner,
405         SP::Target: 'static + SignerProvider,
406         F::Target: 'static + FeeEstimator,
407         R::Target: 'static + Router,
408         L::Target: 'static + Logger,
409         P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
410         CMH::Target: 'static + ChannelMessageHandler,
411         OMH::Target: 'static + OnionMessageHandler,
412         RMH::Target: 'static + RoutingMessageHandler,
413         UMH::Target: 'static + CustomMessageHandler,
414         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
415 {
416         let mut should_break = true;
417         let async_event_handler = |event| {
418                 let network_graph = gossip_sync.network_graph();
419                 let event_handler = &event_handler;
420                 async move {
421                         if let Some(network_graph) = network_graph {
422                                 handle_network_graph_update(network_graph, &event)
423                         }
424                         event_handler(event).await;
425                 }
426         };
427         define_run_body!(persister,
428                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
429                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
430                 gossip_sync, peer_manager, logger, scorer, should_break, {
431                         select_biased! {
432                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
433                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
434                                         should_break = exit;
435                                         false
436                                 }
437                         }
438                 }, |t| sleeper(Duration::from_secs(t)),
439                 |fut: &mut SleepFuture, _| {
440                         let mut waker = task::noop_waker();
441                         let mut ctx = task::Context::from_waker(&mut waker);
442                         core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
443                 })
444 }
445
446 #[cfg(feature = "std")]
447 impl BackgroundProcessor {
448         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
449         /// documentation].
450         ///
451         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
452         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
453         /// either [`join`] or [`stop`].
454         ///
455         /// # Data Persistence
456         ///
457         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
458         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
459         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
460         /// provided implementation.
461         ///
462         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
463         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
464         /// See the `lightning-persister` crate for LDK's provided implementation.
465         ///
466         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
467         /// error or call [`join`] and handle any error that may arise. For the latter case,
468         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
469         ///
470         /// # Event Handling
471         ///
472         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
473         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
474         /// functionality implemented by other handlers.
475         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
476         ///
477         /// # Rapid Gossip Sync
478         ///
479         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
480         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
481         /// until the [`RapidGossipSync`] instance completes its first sync.
482         ///
483         /// [top-level documentation]: BackgroundProcessor
484         /// [`join`]: Self::join
485         /// [`stop`]: Self::stop
486         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
487         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
488         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
489         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
490         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
491         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
492         pub fn start<
493                 'a,
494                 CA: 'static + Deref + Send + Sync,
495                 CF: 'static + Deref + Send + Sync,
496                 CW: 'static + Deref + Send + Sync,
497                 T: 'static + Deref + Send + Sync,
498                 ES: 'static + Deref + Send + Sync,
499                 NS: 'static + Deref + Send + Sync,
500                 SP: 'static + Deref + Send + Sync,
501                 F: 'static + Deref + Send + Sync,
502                 R: 'static + Deref + Send + Sync,
503                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
504                 L: 'static + Deref + Send + Sync,
505                 P: 'static + Deref + Send + Sync,
506                 Descriptor: 'static + SocketDescriptor + Send + Sync,
507                 CMH: 'static + Deref + Send + Sync,
508                 OMH: 'static + Deref + Send + Sync,
509                 RMH: 'static + Deref + Send + Sync,
510                 EH: 'static + EventHandler + Send,
511                 PS: 'static + Deref + Send,
512                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
513                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
514                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
515                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
516                 UMH: 'static + Deref + Send + Sync,
517                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
518                 S: 'static + Deref<Target = SC> + Send + Sync,
519                 SC: WriteableScore<'a>,
520         >(
521                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
522                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
523         ) -> Self
524         where
525                 CA::Target: 'static + chain::Access,
526                 CF::Target: 'static + chain::Filter,
527                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
528                 T::Target: 'static + BroadcasterInterface,
529                 ES::Target: 'static + EntropySource,
530                 NS::Target: 'static + NodeSigner,
531                 SP::Target: 'static + SignerProvider,
532                 F::Target: 'static + FeeEstimator,
533                 R::Target: 'static + Router,
534                 L::Target: 'static + Logger,
535                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
536                 CMH::Target: 'static + ChannelMessageHandler,
537                 OMH::Target: 'static + OnionMessageHandler,
538                 RMH::Target: 'static + RoutingMessageHandler,
539                 UMH::Target: 'static + CustomMessageHandler,
540                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
541         {
542                 let stop_thread = Arc::new(AtomicBool::new(false));
543                 let stop_thread_clone = stop_thread.clone();
544                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
545                         let event_handler = |event| {
546                                 let network_graph = gossip_sync.network_graph();
547                                 if let Some(network_graph) = network_graph {
548                                         handle_network_graph_update(network_graph, &event)
549                                 }
550                                 event_handler.handle_event(event);
551                         };
552                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
553                                 channel_manager, channel_manager.process_pending_events(&event_handler),
554                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
555                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
556                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
557                 });
558                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
559         }
560
561         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
562         /// [`ChannelManager`].
563         ///
564         /// # Panics
565         ///
566         /// This function panics if the background thread has panicked such as while persisting or
567         /// handling events.
568         ///
569         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
570         pub fn join(mut self) -> Result<(), std::io::Error> {
571                 assert!(self.thread_handle.is_some());
572                 self.join_thread()
573         }
574
575         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
576         /// [`ChannelManager`].
577         ///
578         /// # Panics
579         ///
580         /// This function panics if the background thread has panicked such as while persisting or
581         /// handling events.
582         ///
583         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
584         pub fn stop(mut self) -> Result<(), std::io::Error> {
585                 assert!(self.thread_handle.is_some());
586                 self.stop_and_join_thread()
587         }
588
589         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
590                 self.stop_thread.store(true, Ordering::Release);
591                 self.join_thread()
592         }
593
594         fn join_thread(&mut self) -> Result<(), std::io::Error> {
595                 match self.thread_handle.take() {
596                         Some(handle) => handle.join().unwrap(),
597                         None => Ok(()),
598                 }
599         }
600 }
601
602 #[cfg(feature = "std")]
603 impl Drop for BackgroundProcessor {
604         fn drop(&mut self) {
605                 self.stop_and_join_thread().unwrap();
606         }
607 }
608
609 #[cfg(all(feature = "std", test))]
610 mod tests {
611         use bitcoin::blockdata::block::BlockHeader;
612         use bitcoin::blockdata::constants::genesis_block;
613         use bitcoin::blockdata::locktime::PackedLockTime;
614         use bitcoin::blockdata::transaction::{Transaction, TxOut};
615         use bitcoin::network::constants::Network;
616         use lightning::chain::{BestBlock, Confirm, chainmonitor};
617         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
618         use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysManager, NodeSigner};
619         use lightning::chain::transaction::OutPoint;
620         use lightning::get_event_msg;
621         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
622         use lightning::ln::features::ChannelFeatures;
623         use lightning::ln::msgs::{ChannelMessageHandler, Init};
624         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
625         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
626         use lightning::routing::router::DefaultRouter;
627         use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
628         use lightning::util::config::UserConfig;
629         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
630         use lightning::util::ser::Writeable;
631         use lightning::util::test_utils;
632         use lightning::util::persist::KVStorePersister;
633         use lightning_invoice::payment::{InvoicePayer, Retry};
634         use lightning_persister::FilesystemPersister;
635         use std::fs;
636         use std::path::PathBuf;
637         use std::sync::{Arc, Mutex};
638         use std::sync::mpsc::SyncSender;
639         use std::time::Duration;
640         use bitcoin::hashes::Hash;
641         use bitcoin::TxMerkleNode;
642         use lightning_rapid_gossip_sync::RapidGossipSync;
643         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
644
645         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
646
647         #[derive(Clone, Hash, PartialEq, Eq)]
648         struct TestDescriptor{}
649         impl SocketDescriptor for TestDescriptor {
650                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
651                         0
652                 }
653
654                 fn disconnect_socket(&mut self) {}
655         }
656
657         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
658
659         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
660         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
661
662         struct Node {
663                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
664                 p2p_gossip_sync: PGS,
665                 rapid_gossip_sync: RGS,
666                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
667                 chain_monitor: Arc<ChainMonitor>,
668                 persister: Arc<FilesystemPersister>,
669                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
670                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
671                 logger: Arc<test_utils::TestLogger>,
672                 best_block: BestBlock,
673                 scorer: Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
674         }
675
676         impl Node {
677                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
678                         GossipSync::P2P(self.p2p_gossip_sync.clone())
679                 }
680
681                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
682                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
683                 }
684
685                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
686                         GossipSync::None
687                 }
688         }
689
690         impl Drop for Node {
691                 fn drop(&mut self) {
692                         let data_dir = self.persister.get_data_dir();
693                         match fs::remove_dir_all(data_dir.clone()) {
694                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
695                                 _ => {}
696                         }
697                 }
698         }
699
700         struct Persister {
701                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
702                 graph_persistence_notifier: Option<SyncSender<()>>,
703                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
704                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
705                 filesystem_persister: FilesystemPersister,
706         }
707
708         impl Persister {
709                 fn new(data_dir: String) -> Self {
710                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
711                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
712                 }
713
714                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
715                         Self { graph_error: Some((error, message)), ..self }
716                 }
717
718                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
719                         Self { graph_persistence_notifier: Some(sender), ..self }
720                 }
721
722                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
723                         Self { manager_error: Some((error, message)), ..self }
724                 }
725
726                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
727                         Self { scorer_error: Some((error, message)), ..self }
728                 }
729         }
730
731         impl KVStorePersister for Persister {
732                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
733                         if key == "manager" {
734                                 if let Some((error, message)) = self.manager_error {
735                                         return Err(std::io::Error::new(error, message))
736                                 }
737                         }
738
739                         if key == "network_graph" {
740                                 if let Some(sender) = &self.graph_persistence_notifier {
741                                         sender.send(()).unwrap();
742                                 };
743
744                                 if let Some((error, message)) = self.graph_error {
745                                         return Err(std::io::Error::new(error, message))
746                                 }
747                         }
748
749                         if key == "scorer" {
750                                 if let Some((error, message)) = self.scorer_error {
751                                         return Err(std::io::Error::new(error, message))
752                                 }
753                         }
754
755                         self.filesystem_persister.persist(key, object)
756                 }
757         }
758
759         fn get_full_filepath(filepath: String, filename: String) -> String {
760                 let mut path = PathBuf::from(filepath);
761                 path.push(filename);
762                 path.to_str().unwrap().to_string()
763         }
764
765         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
766                 let mut nodes = Vec::new();
767                 for i in 0..num_nodes {
768                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
769                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
770                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
771                         let network = Network::Testnet;
772                         let genesis_block = genesis_block(network);
773                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
774                         let params = ProbabilisticScoringParameters::default();
775                         let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone())));
776                         let seed = [i as u8; 32];
777                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
778                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
779                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
780                         let now = Duration::from_secs(genesis_block.header.time as u64);
781                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
782                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
783                         let best_block = BestBlock::from_genesis(network);
784                         let params = ChainParameters { network, best_block };
785                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
786                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
787                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
788                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
789                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
790                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
791                         nodes.push(node);
792                 }
793
794                 for i in 0..num_nodes {
795                         for j in (i+1)..num_nodes {
796                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
797                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
798                         }
799                 }
800
801                 nodes
802         }
803
804         macro_rules! open_channel {
805                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
806                         begin_open_channel!($node_a, $node_b, $channel_value);
807                         let events = $node_a.node.get_and_clear_pending_events();
808                         assert_eq!(events.len(), 1);
809                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
810                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
811                         tx
812                 }}
813         }
814
815         macro_rules! begin_open_channel {
816                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
817                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
818                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), $node_a.node.init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
819                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), $node_b.node.init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
820                 }}
821         }
822
823         macro_rules! handle_funding_generation_ready {
824                 ($event: expr, $channel_value: expr) => {{
825                         match $event {
826                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
827                                         assert_eq!(channel_value_satoshis, $channel_value);
828                                         assert_eq!(user_channel_id, 42);
829
830                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
831                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
832                                         }]};
833                                         (temporary_channel_id, tx)
834                                 },
835                                 _ => panic!("Unexpected event"),
836                         }
837                 }}
838         }
839
840         macro_rules! end_open_channel {
841                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
842                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
843                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
844                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
845                 }}
846         }
847
848         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
849                 for i in 1..=depth {
850                         let prev_blockhash = node.best_block.block_hash();
851                         let height = node.best_block.height() + 1;
852                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
853                         let txdata = vec![(0, tx)];
854                         node.best_block = BestBlock::new(header.block_hash(), height);
855                         match i {
856                                 1 => {
857                                         node.node.transactions_confirmed(&header, &txdata, height);
858                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
859                                 },
860                                 x if x == depth => {
861                                         node.node.best_block_updated(&header, height);
862                                         node.chain_monitor.best_block_updated(&header, height);
863                                 },
864                                 _ => {},
865                         }
866                 }
867         }
868         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
869                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
870         }
871
872         #[test]
873         fn test_background_processor() {
874                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
875                 // updates. Also test that when new updates are available, the manager signals that it needs
876                 // re-persistence and is successfully re-persisted.
877                 let nodes = create_nodes(2, "test_background_processor".to_string());
878
879                 // Go through the channel creation process so that each node has something to persist. Since
880                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
881                 // avoid a race with processing events.
882                 let tx = open_channel!(nodes[0], nodes[1], 100000);
883
884                 // Initiate the background processors to watch each node.
885                 let data_dir = nodes[0].persister.get_data_dir();
886                 let persister = Arc::new(Persister::new(data_dir));
887                 let event_handler = |_: _| {};
888                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
889
890                 macro_rules! check_persisted_data {
891                         ($node: expr, $filepath: expr) => {
892                                 let mut expected_bytes = Vec::new();
893                                 loop {
894                                         expected_bytes.clear();
895                                         match $node.write(&mut expected_bytes) {
896                                                 Ok(()) => {
897                                                         match std::fs::read($filepath) {
898                                                                 Ok(bytes) => {
899                                                                         if bytes == expected_bytes {
900                                                                                 break
901                                                                         } else {
902                                                                                 continue
903                                                                         }
904                                                                 },
905                                                                 Err(_) => continue
906                                                         }
907                                                 },
908                                                 Err(e) => panic!("Unexpected error: {}", e)
909                                         }
910                                 }
911                         }
912                 }
913
914                 // Check that the initial channel manager data is persisted as expected.
915                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
916                 check_persisted_data!(nodes[0].node, filepath.clone());
917
918                 loop {
919                         if !nodes[0].node.get_persistence_condvar_value() { break }
920                 }
921
922                 // Force-close the channel.
923                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
924
925                 // Check that the force-close updates are persisted.
926                 check_persisted_data!(nodes[0].node, filepath.clone());
927                 loop {
928                         if !nodes[0].node.get_persistence_condvar_value() { break }
929                 }
930
931                 // Check network graph is persisted
932                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
933                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
934
935                 // Check scorer is persisted
936                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
937                 check_persisted_data!(nodes[0].scorer, filepath.clone());
938
939                 assert!(bg_processor.stop().is_ok());
940         }
941
942         #[test]
943         fn test_timer_tick_called() {
944                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
945                 // `FRESHNESS_TIMER`.
946                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
947                 let data_dir = nodes[0].persister.get_data_dir();
948                 let persister = Arc::new(Persister::new(data_dir));
949                 let event_handler = |_: _| {};
950                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
951                 loop {
952                         let log_entries = nodes[0].logger.lines.lock().unwrap();
953                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
954                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
955                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
956                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
957                                 break
958                         }
959                 }
960
961                 assert!(bg_processor.stop().is_ok());
962         }
963
964         #[test]
965         fn test_channel_manager_persist_error() {
966                 // Test that if we encounter an error during manager persistence, the thread panics.
967                 let nodes = create_nodes(2, "test_persist_error".to_string());
968                 open_channel!(nodes[0], nodes[1], 100000);
969
970                 let data_dir = nodes[0].persister.get_data_dir();
971                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
972                 let event_handler = |_: _| {};
973                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
974                 match bg_processor.join() {
975                         Ok(_) => panic!("Expected error persisting manager"),
976                         Err(e) => {
977                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
978                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
979                         },
980                 }
981         }
982
983         #[test]
984         fn test_network_graph_persist_error() {
985                 // Test that if we encounter an error during network graph persistence, an error gets returned.
986                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
987                 let data_dir = nodes[0].persister.get_data_dir();
988                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
989                 let event_handler = |_: _| {};
990                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
991
992                 match bg_processor.stop() {
993                         Ok(_) => panic!("Expected error persisting network graph"),
994                         Err(e) => {
995                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
996                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
997                         },
998                 }
999         }
1000
1001         #[test]
1002         fn test_scorer_persist_error() {
1003                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1004                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1005                 let data_dir = nodes[0].persister.get_data_dir();
1006                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1007                 let event_handler = |_: _| {};
1008                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1009
1010                 match bg_processor.stop() {
1011                         Ok(_) => panic!("Expected error persisting scorer"),
1012                         Err(e) => {
1013                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1014                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1015                         },
1016                 }
1017         }
1018
1019         #[test]
1020         fn test_background_event_handling() {
1021                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1022                 let channel_value = 100000;
1023                 let data_dir = nodes[0].persister.get_data_dir();
1024                 let persister = Arc::new(Persister::new(data_dir.clone()));
1025
1026                 // Set up a background event handler for FundingGenerationReady events.
1027                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1028                 let event_handler = move |event: Event| match event {
1029                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1030                         Event::ChannelReady { .. } => {},
1031                         _ => panic!("Unexpected event: {:?}", event),
1032                 };
1033
1034                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1035
1036                 // Open a channel and check that the FundingGenerationReady event was handled.
1037                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1038                 let (temporary_channel_id, funding_tx) = receiver
1039                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1040                         .expect("FundingGenerationReady not handled within deadline");
1041                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1042
1043                 // Confirm the funding transaction.
1044                 confirm_transaction(&mut nodes[0], &funding_tx);
1045                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1046                 confirm_transaction(&mut nodes[1], &funding_tx);
1047                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1048                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1049                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1050                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1051                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1052
1053                 assert!(bg_processor.stop().is_ok());
1054
1055                 // Set up a background event handler for SpendableOutputs events.
1056                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1057                 let event_handler = move |event: Event| match event {
1058                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1059                         Event::ChannelReady { .. } => {},
1060                         Event::ChannelClosed { .. } => {},
1061                         _ => panic!("Unexpected event: {:?}", event),
1062                 };
1063                 let persister = Arc::new(Persister::new(data_dir));
1064                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1065
1066                 // Force close the channel and check that the SpendableOutputs event was handled.
1067                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1068                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1069                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1070
1071                 let event = receiver
1072                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1073                         .expect("Events not handled within deadline");
1074                 match event {
1075                         Event::SpendableOutputs { .. } => {},
1076                         _ => panic!("Unexpected event: {:?}", event),
1077                 }
1078
1079                 assert!(bg_processor.stop().is_ok());
1080         }
1081
1082         #[test]
1083         fn test_scorer_persistence() {
1084                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1085                 let data_dir = nodes[0].persister.get_data_dir();
1086                 let persister = Arc::new(Persister::new(data_dir));
1087                 let event_handler = |_: _| {};
1088                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1089
1090                 loop {
1091                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1092                         let expected_log = "Persisting scorer".to_string();
1093                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1094                                 break
1095                         }
1096                 }
1097
1098                 assert!(bg_processor.stop().is_ok());
1099         }
1100
1101         #[test]
1102         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1103                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1104                 let data_dir = nodes[0].persister.get_data_dir();
1105                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1106                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1107                 let network_graph = nodes[0].network_graph.clone();
1108                 let features = ChannelFeatures::empty();
1109                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1110                         .expect("Failed to update channel from partial announcement");
1111                 let original_graph_description = network_graph.to_string();
1112                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1113                 assert_eq!(network_graph.read_only().channels().len(), 1);
1114
1115                 let event_handler = |_: _| {};
1116                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1117
1118                 loop {
1119                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1120                         let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1121                         if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1122                                 .unwrap_or(&0) > 1
1123                         {
1124                                 // Wait until the loop has gone around at least twice.
1125                                 break
1126                         }
1127                 }
1128
1129                 let initialization_input = vec![
1130                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1131                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1132                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1133                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1134                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1135                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1136                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1137                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1138                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1139                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1140                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1141                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1142                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1143                 ];
1144                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1145
1146                 // this should have added two channels
1147                 assert_eq!(network_graph.read_only().channels().len(), 3);
1148
1149                 let _ = receiver
1150                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1151                         .expect("Network graph not pruned within deadline");
1152
1153                 background_processor.stop().unwrap();
1154
1155                 // all channels should now be pruned
1156                 assert_eq!(network_graph.read_only().channels().len(), 0);
1157         }
1158
1159         #[test]
1160         fn test_invoice_payer() {
1161                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1162                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1163                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1164
1165                 // Initiate the background processors to watch each node.
1166                 let data_dir = nodes[0].persister.get_data_dir();
1167                 let persister = Arc::new(Persister::new(data_dir));
1168                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1169                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1170                 let event_handler = Arc::clone(&invoice_payer);
1171                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1172                 assert!(bg_processor.stop().is_ok());
1173         }
1174 }