Use the user-provided `SleepFuture` for interval checks in BP
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::router::Router;
26 use lightning::routing::scoring::WriteableScore;
27 use lightning::util::events::{Event, EventHandler, EventsProvider};
28 use lightning::util::logger::Logger;
29 use lightning::util::persist::Persister;
30 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::Arc;
32 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread;
34 use std::thread::JoinHandle;
35 use std::time::{Duration, Instant};
36 use std::ops::Deref;
37
38 #[cfg(feature = "futures")]
39 use futures_util::{select_biased, future::FutureExt, task};
40
41 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
42 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
43 /// responsibilities are:
44 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
45 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
46 ///   writing it to disk/backups by invoking the callback given to it at startup.
47 ///   [`ChannelManager`] persistence should be done in the background.
48 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
49 ///   at the appropriate intervals.
50 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
51 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
52 ///
53 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
54 /// upon as doing so may result in high latency.
55 ///
56 /// # Note
57 ///
58 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
59 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
60 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
61 /// unilateral chain closure fees are at risk.
62 ///
63 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
64 /// [`Event`]: lightning::util::events::Event
65 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
66 pub struct BackgroundProcessor {
67         stop_thread: Arc<AtomicBool>,
68         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
69 }
70
71 #[cfg(not(test))]
72 const FRESHNESS_TIMER: u64 = 60;
73 #[cfg(test)]
74 const FRESHNESS_TIMER: u64 = 1;
75
76 #[cfg(all(not(test), not(debug_assertions)))]
77 const PING_TIMER: u64 = 10;
78 /// Signature operations take a lot longer without compiler optimisations.
79 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
80 /// timeout is reached.
81 #[cfg(all(not(test), debug_assertions))]
82 const PING_TIMER: u64 = 30;
83 #[cfg(test)]
84 const PING_TIMER: u64 = 1;
85
86 /// Prune the network graph of stale entries hourly.
87 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
88
89 #[cfg(not(test))]
90 const SCORER_PERSIST_TIMER: u64 = 30;
91 #[cfg(test)]
92 const SCORER_PERSIST_TIMER: u64 = 1;
93
94 #[cfg(not(test))]
95 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 #[cfg(test)]
97 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98
99 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 pub enum GossipSync<
101         P: Deref<Target = P2PGossipSync<G, A, L>>,
102         R: Deref<Target = RapidGossipSync<G, L>>,
103         G: Deref<Target = NetworkGraph<L>>,
104         A: Deref,
105         L: Deref,
106 >
107 where A::Target: chain::Access, L::Target: Logger {
108         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109         P2P(P),
110         /// Rapid gossip sync from a trusted server.
111         Rapid(R),
112         /// No gossip sync.
113         None,
114 }
115
116 impl<
117         P: Deref<Target = P2PGossipSync<G, A, L>>,
118         R: Deref<Target = RapidGossipSync<G, L>>,
119         G: Deref<Target = NetworkGraph<L>>,
120         A: Deref,
121         L: Deref,
122 > GossipSync<P, R, G, A, L>
123 where A::Target: chain::Access, L::Target: Logger {
124         fn network_graph(&self) -> Option<&G> {
125                 match self {
126                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
128                         GossipSync::None => None,
129                 }
130         }
131
132         fn prunable_network_graph(&self) -> Option<&G> {
133                 match self {
134                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
135                         GossipSync::Rapid(gossip_sync) => {
136                                 if gossip_sync.is_initial_sync_complete() {
137                                         Some(gossip_sync.network_graph())
138                                 } else {
139                                         None
140                                 }
141                         },
142                         GossipSync::None => None,
143                 }
144         }
145 }
146
147 /// (C-not exported) as the bindings concretize everything and have constructors for us
148 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
149         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 where
151         A::Target: chain::Access,
152         L::Target: Logger,
153 {
154         /// Initializes a new [`GossipSync::P2P`] variant.
155         pub fn p2p(gossip_sync: P) -> Self {
156                 GossipSync::P2P(gossip_sync)
157         }
158 }
159
160 /// (C-not exported) as the bindings concretize everything and have constructors for us
161 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162         GossipSync<
163                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
164                 R,
165                 G,
166                 &'a (dyn chain::Access + Send + Sync),
167                 L,
168         >
169 where
170         L::Target: Logger,
171 {
172         /// Initializes a new [`GossipSync::Rapid`] variant.
173         pub fn rapid(gossip_sync: R) -> Self {
174                 GossipSync::Rapid(gossip_sync)
175         }
176 }
177
178 /// (C-not exported) as the bindings concretize everything and have constructors for us
179 impl<'a, L: Deref>
180         GossipSync<
181                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
182                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183                 &'a NetworkGraph<L>,
184                 &'a (dyn chain::Access + Send + Sync),
185                 L,
186         >
187 where
188         L::Target: Logger,
189 {
190         /// Initializes a new [`GossipSync::None`] variant.
191         pub fn none() -> Self {
192                 GossipSync::None
193         }
194 }
195
196 fn handle_network_graph_update<L: Deref>(
197         network_graph: &NetworkGraph<L>, event: &Event
198 ) where L::Target: Logger {
199         if let Event::PaymentPathFailed { ref network_update, .. } = event {
200                 if let Some(network_update) = network_update {
201                         network_graph.handle_network_update(&network_update);
202                 }
203         }
204 }
205
206 macro_rules! define_run_body {
207         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
208          $channel_manager: ident, $process_channel_manager_events: expr,
209          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
210          $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
211         => { {
212                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
213                 $channel_manager.timer_tick_occurred();
214
215                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
216                 let mut last_ping_call = $get_timer(PING_TIMER);
217                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
218                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
219                 let mut have_pruned = false;
220
221                 loop {
222                         $process_channel_manager_events;
223                         $process_chain_monitor_events;
224
225                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
226                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
227                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
228                         // without running the normal event processing above and handing events to users.
229                         //
230                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
231                         // processing a message effectively at any point during this loop. In order to
232                         // minimize the time between such processing completing and persisting the updated
233                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
234                         // generally, and as a fallback place such blocking only immediately before
235                         // persistence.
236                         $peer_manager.process_events();
237
238                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
239                         // see `await_start`'s use below.
240                         let mut await_start = $get_timer(1);
241                         let updates_available = $await;
242                         let await_slow = $timer_elapsed(&mut await_start, 1);
243
244                         if updates_available {
245                                 log_trace!($logger, "Persisting ChannelManager...");
246                                 $persister.persist_manager(&*$channel_manager)?;
247                                 log_trace!($logger, "Done persisting ChannelManager.");
248                         }
249                         // Exit the loop if the background processor was requested to stop.
250                         if $loop_exit_check {
251                                 log_trace!($logger, "Terminating background processor.");
252                                 break;
253                         }
254                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
255                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
256                                 $channel_manager.timer_tick_occurred();
257                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
258                         }
259                         if await_slow {
260                                 // On various platforms, we may be starved of CPU cycles for several reasons.
261                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
262                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
263                                 // may not get any cycles.
264                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
265                                 // full second, at which point we assume sockets may have been killed (they
266                                 // appear to be at least on some platforms, even if it has only been a second).
267                                 // Note that we have to take care to not get here just because user event
268                                 // processing was slow at the top of the loop. For example, the sample client
269                                 // may call Bitcoin Core RPCs during event handling, which very often takes
270                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
271                                 // peers.
272                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
273                                 $peer_manager.disconnect_all_peers();
274                                 last_ping_call = $get_timer(PING_TIMER);
275                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
276                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
277                                 $peer_manager.timer_tick_occurred();
278                                 last_ping_call = $get_timer(PING_TIMER);
279                         }
280
281                         // Note that we want to run a graph prune once not long after startup before
282                         // falling back to our usual hourly prunes. This avoids short-lived clients never
283                         // pruning their network graph. We run once 60 seconds after startup before
284                         // continuing our normal cadence.
285                         if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
286                                 // The network graph must not be pruned while rapid sync completion is pending
287                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
288                                         log_trace!($logger, "Pruning and persisting network graph.");
289                                         network_graph.remove_stale_channels_and_tracking();
290
291                                         if let Err(e) = $persister.persist_graph(network_graph) {
292                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
293                                         }
294
295                                         last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
296                                         have_pruned = true;
297                                 }
298                         }
299
300                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
301                                 if let Some(ref scorer) = $scorer {
302                                         log_trace!($logger, "Persisting scorer");
303                                         if let Err(e) = $persister.persist_scorer(&scorer) {
304                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
305                                         }
306                                 }
307                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
308                         }
309                 }
310
311                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
312                 // some races where users quit while channel updates were in-flight, with
313                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
314                 $persister.persist_manager(&*$channel_manager)?;
315
316                 // Persist Scorer on exit
317                 if let Some(ref scorer) = $scorer {
318                         $persister.persist_scorer(&scorer)?;
319                 }
320
321                 // Persist NetworkGraph on exit
322                 if let Some(network_graph) = $gossip_sync.network_graph() {
323                         $persister.persist_graph(network_graph)?;
324                 }
325
326                 Ok(())
327         } }
328 }
329
330 /// Processes background events in a future.
331 ///
332 /// `sleeper` should return a future which completes in the given amount of time and returns a
333 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
334 /// future which outputs true, the loop will exit and this function's future will complete.
335 ///
336 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
337 #[cfg(feature = "futures")]
338 pub async fn process_events_async<
339         'a,
340         CA: 'static + Deref + Send + Sync,
341         CF: 'static + Deref + Send + Sync,
342         CW: 'static + Deref + Send + Sync,
343         T: 'static + Deref + Send + Sync,
344         ES: 'static + Deref + Send + Sync,
345         NS: 'static + Deref + Send + Sync,
346         SP: 'static + Deref + Send + Sync,
347         F: 'static + Deref + Send + Sync,
348         R: 'static + Deref + Send + Sync,
349         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
350         L: 'static + Deref + Send + Sync,
351         P: 'static + Deref + Send + Sync,
352         Descriptor: 'static + SocketDescriptor + Send + Sync,
353         CMH: 'static + Deref + Send + Sync,
354         RMH: 'static + Deref + Send + Sync,
355         OMH: 'static + Deref + Send + Sync,
356         EventHandlerFuture: core::future::Future<Output = ()>,
357         EventHandler: Fn(Event) -> EventHandlerFuture,
358         PS: 'static + Deref + Send,
359         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
360         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
361         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
362         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
363         UMH: 'static + Deref + Send + Sync,
364         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
365         S: 'static + Deref<Target = SC> + Send + Sync,
366         SC: WriteableScore<'a>,
367         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
368         Sleeper: Fn(Duration) -> SleepFuture
369 >(
370         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
371         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
372         sleeper: Sleeper,
373 ) -> Result<(), std::io::Error>
374 where
375         CA::Target: 'static + chain::Access,
376         CF::Target: 'static + chain::Filter,
377         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
378         T::Target: 'static + BroadcasterInterface,
379         ES::Target: 'static + EntropySource,
380         NS::Target: 'static + NodeSigner,
381         SP::Target: 'static + SignerProvider,
382         F::Target: 'static + FeeEstimator,
383         R::Target: 'static + Router,
384         L::Target: 'static + Logger,
385         P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
386         CMH::Target: 'static + ChannelMessageHandler,
387         OMH::Target: 'static + OnionMessageHandler,
388         RMH::Target: 'static + RoutingMessageHandler,
389         UMH::Target: 'static + CustomMessageHandler,
390         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
391 {
392         let mut should_break = true;
393         let async_event_handler = |event| {
394                 let network_graph = gossip_sync.network_graph();
395                 let event_handler = &event_handler;
396                 async move {
397                         if let Some(network_graph) = network_graph {
398                                 handle_network_graph_update(network_graph, &event)
399                         }
400                         event_handler(event).await;
401                 }
402         };
403         define_run_body!(persister,
404                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
405                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
406                 gossip_sync, peer_manager, logger, scorer, should_break, {
407                         select_biased! {
408                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
409                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
410                                         should_break = exit;
411                                         false
412                                 }
413                         }
414                 }, |t| sleeper(Duration::from_secs(t)),
415                 |fut: &mut SleepFuture, _| {
416                         let mut waker = task::noop_waker();
417                         let mut ctx = task::Context::from_waker(&mut waker);
418                         core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
419                 })
420 }
421
422 impl BackgroundProcessor {
423         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
424         /// documentation].
425         ///
426         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
427         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
428         /// either [`join`] or [`stop`].
429         ///
430         /// # Data Persistence
431         ///
432         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
433         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
434         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
435         /// provided implementation.
436         ///
437         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
438         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
439         /// See the `lightning-persister` crate for LDK's provided implementation.
440         ///
441         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
442         /// error or call [`join`] and handle any error that may arise. For the latter case,
443         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
444         ///
445         /// # Event Handling
446         ///
447         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
448         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
449         /// functionality implemented by other handlers.
450         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
451         ///
452         /// # Rapid Gossip Sync
453         ///
454         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
455         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
456         /// until the [`RapidGossipSync`] instance completes its first sync.
457         ///
458         /// [top-level documentation]: BackgroundProcessor
459         /// [`join`]: Self::join
460         /// [`stop`]: Self::stop
461         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
462         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
463         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
464         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
465         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
466         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
467         pub fn start<
468                 'a,
469                 CA: 'static + Deref + Send + Sync,
470                 CF: 'static + Deref + Send + Sync,
471                 CW: 'static + Deref + Send + Sync,
472                 T: 'static + Deref + Send + Sync,
473                 ES: 'static + Deref + Send + Sync,
474                 NS: 'static + Deref + Send + Sync,
475                 SP: 'static + Deref + Send + Sync,
476                 F: 'static + Deref + Send + Sync,
477                 R: 'static + Deref + Send + Sync,
478                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
479                 L: 'static + Deref + Send + Sync,
480                 P: 'static + Deref + Send + Sync,
481                 Descriptor: 'static + SocketDescriptor + Send + Sync,
482                 CMH: 'static + Deref + Send + Sync,
483                 OMH: 'static + Deref + Send + Sync,
484                 RMH: 'static + Deref + Send + Sync,
485                 EH: 'static + EventHandler + Send,
486                 PS: 'static + Deref + Send,
487                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
488                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
489                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
490                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
491                 UMH: 'static + Deref + Send + Sync,
492                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
493                 S: 'static + Deref<Target = SC> + Send + Sync,
494                 SC: WriteableScore<'a>,
495         >(
496                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
497                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
498         ) -> Self
499         where
500                 CA::Target: 'static + chain::Access,
501                 CF::Target: 'static + chain::Filter,
502                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
503                 T::Target: 'static + BroadcasterInterface,
504                 ES::Target: 'static + EntropySource,
505                 NS::Target: 'static + NodeSigner,
506                 SP::Target: 'static + SignerProvider,
507                 F::Target: 'static + FeeEstimator,
508                 R::Target: 'static + Router,
509                 L::Target: 'static + Logger,
510                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
511                 CMH::Target: 'static + ChannelMessageHandler,
512                 OMH::Target: 'static + OnionMessageHandler,
513                 RMH::Target: 'static + RoutingMessageHandler,
514                 UMH::Target: 'static + CustomMessageHandler,
515                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
516         {
517                 let stop_thread = Arc::new(AtomicBool::new(false));
518                 let stop_thread_clone = stop_thread.clone();
519                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
520                         let event_handler = |event| {
521                                 let network_graph = gossip_sync.network_graph();
522                                 if let Some(network_graph) = network_graph {
523                                         handle_network_graph_update(network_graph, &event)
524                                 }
525                                 event_handler.handle_event(event);
526                         };
527                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
528                                 channel_manager, channel_manager.process_pending_events(&event_handler),
529                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
530                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
531                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
532                 });
533                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
534         }
535
536         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
537         /// [`ChannelManager`].
538         ///
539         /// # Panics
540         ///
541         /// This function panics if the background thread has panicked such as while persisting or
542         /// handling events.
543         ///
544         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
545         pub fn join(mut self) -> Result<(), std::io::Error> {
546                 assert!(self.thread_handle.is_some());
547                 self.join_thread()
548         }
549
550         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
551         /// [`ChannelManager`].
552         ///
553         /// # Panics
554         ///
555         /// This function panics if the background thread has panicked such as while persisting or
556         /// handling events.
557         ///
558         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
559         pub fn stop(mut self) -> Result<(), std::io::Error> {
560                 assert!(self.thread_handle.is_some());
561                 self.stop_and_join_thread()
562         }
563
564         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
565                 self.stop_thread.store(true, Ordering::Release);
566                 self.join_thread()
567         }
568
569         fn join_thread(&mut self) -> Result<(), std::io::Error> {
570                 match self.thread_handle.take() {
571                         Some(handle) => handle.join().unwrap(),
572                         None => Ok(()),
573                 }
574         }
575 }
576
577 impl Drop for BackgroundProcessor {
578         fn drop(&mut self) {
579                 self.stop_and_join_thread().unwrap();
580         }
581 }
582
583 #[cfg(test)]
584 mod tests {
585         use bitcoin::blockdata::block::BlockHeader;
586         use bitcoin::blockdata::constants::genesis_block;
587         use bitcoin::blockdata::locktime::PackedLockTime;
588         use bitcoin::blockdata::transaction::{Transaction, TxOut};
589         use bitcoin::network::constants::Network;
590         use lightning::chain::{BestBlock, Confirm, chainmonitor};
591         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
592         use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysManager, NodeSigner};
593         use lightning::chain::transaction::OutPoint;
594         use lightning::get_event_msg;
595         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
596         use lightning::ln::features::ChannelFeatures;
597         use lightning::ln::msgs::{ChannelMessageHandler, Init};
598         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
599         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
600         use lightning::routing::router::DefaultRouter;
601         use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
602         use lightning::util::config::UserConfig;
603         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
604         use lightning::util::ser::Writeable;
605         use lightning::util::test_utils;
606         use lightning::util::persist::KVStorePersister;
607         use lightning_invoice::payment::{InvoicePayer, Retry};
608         use lightning_persister::FilesystemPersister;
609         use std::fs;
610         use std::path::PathBuf;
611         use std::sync::{Arc, Mutex};
612         use std::sync::mpsc::SyncSender;
613         use std::time::Duration;
614         use bitcoin::hashes::Hash;
615         use bitcoin::TxMerkleNode;
616         use lightning_rapid_gossip_sync::RapidGossipSync;
617         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
618
619         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
620
621         #[derive(Clone, Hash, PartialEq, Eq)]
622         struct TestDescriptor{}
623         impl SocketDescriptor for TestDescriptor {
624                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
625                         0
626                 }
627
628                 fn disconnect_socket(&mut self) {}
629         }
630
631         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
632
633         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
634         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
635
636         struct Node {
637                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
638                 p2p_gossip_sync: PGS,
639                 rapid_gossip_sync: RGS,
640                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
641                 chain_monitor: Arc<ChainMonitor>,
642                 persister: Arc<FilesystemPersister>,
643                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
644                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
645                 logger: Arc<test_utils::TestLogger>,
646                 best_block: BestBlock,
647                 scorer: Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
648         }
649
650         impl Node {
651                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
652                         GossipSync::P2P(self.p2p_gossip_sync.clone())
653                 }
654
655                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
656                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
657                 }
658
659                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
660                         GossipSync::None
661                 }
662         }
663
664         impl Drop for Node {
665                 fn drop(&mut self) {
666                         let data_dir = self.persister.get_data_dir();
667                         match fs::remove_dir_all(data_dir.clone()) {
668                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
669                                 _ => {}
670                         }
671                 }
672         }
673
674         struct Persister {
675                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
676                 graph_persistence_notifier: Option<SyncSender<()>>,
677                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
678                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
679                 filesystem_persister: FilesystemPersister,
680         }
681
682         impl Persister {
683                 fn new(data_dir: String) -> Self {
684                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
685                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
686                 }
687
688                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
689                         Self { graph_error: Some((error, message)), ..self }
690                 }
691
692                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
693                         Self { graph_persistence_notifier: Some(sender), ..self }
694                 }
695
696                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
697                         Self { manager_error: Some((error, message)), ..self }
698                 }
699
700                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
701                         Self { scorer_error: Some((error, message)), ..self }
702                 }
703         }
704
705         impl KVStorePersister for Persister {
706                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
707                         if key == "manager" {
708                                 if let Some((error, message)) = self.manager_error {
709                                         return Err(std::io::Error::new(error, message))
710                                 }
711                         }
712
713                         if key == "network_graph" {
714                                 if let Some(sender) = &self.graph_persistence_notifier {
715                                         sender.send(()).unwrap();
716                                 };
717
718                                 if let Some((error, message)) = self.graph_error {
719                                         return Err(std::io::Error::new(error, message))
720                                 }
721                         }
722
723                         if key == "scorer" {
724                                 if let Some((error, message)) = self.scorer_error {
725                                         return Err(std::io::Error::new(error, message))
726                                 }
727                         }
728
729                         self.filesystem_persister.persist(key, object)
730                 }
731         }
732
733         fn get_full_filepath(filepath: String, filename: String) -> String {
734                 let mut path = PathBuf::from(filepath);
735                 path.push(filename);
736                 path.to_str().unwrap().to_string()
737         }
738
739         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
740                 let mut nodes = Vec::new();
741                 for i in 0..num_nodes {
742                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
743                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
744                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
745                         let network = Network::Testnet;
746                         let genesis_block = genesis_block(network);
747                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
748                         let params = ProbabilisticScoringParameters::default();
749                         let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone())));
750                         let seed = [i as u8; 32];
751                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
752                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
753                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
754                         let now = Duration::from_secs(genesis_block.header.time as u64);
755                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
756                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
757                         let best_block = BestBlock::from_genesis(network);
758                         let params = ChainParameters { network, best_block };
759                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
760                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
761                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
762                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
763                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
764                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
765                         nodes.push(node);
766                 }
767
768                 for i in 0..num_nodes {
769                         for j in (i+1)..num_nodes {
770                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
771                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
772                         }
773                 }
774
775                 nodes
776         }
777
778         macro_rules! open_channel {
779                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
780                         begin_open_channel!($node_a, $node_b, $channel_value);
781                         let events = $node_a.node.get_and_clear_pending_events();
782                         assert_eq!(events.len(), 1);
783                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
784                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
785                         tx
786                 }}
787         }
788
789         macro_rules! begin_open_channel {
790                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
791                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
792                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), $node_a.node.init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
793                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), $node_b.node.init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
794                 }}
795         }
796
797         macro_rules! handle_funding_generation_ready {
798                 ($event: expr, $channel_value: expr) => {{
799                         match $event {
800                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
801                                         assert_eq!(channel_value_satoshis, $channel_value);
802                                         assert_eq!(user_channel_id, 42);
803
804                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
805                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
806                                         }]};
807                                         (temporary_channel_id, tx)
808                                 },
809                                 _ => panic!("Unexpected event"),
810                         }
811                 }}
812         }
813
814         macro_rules! end_open_channel {
815                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
816                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
817                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
818                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
819                 }}
820         }
821
822         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
823                 for i in 1..=depth {
824                         let prev_blockhash = node.best_block.block_hash();
825                         let height = node.best_block.height() + 1;
826                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
827                         let txdata = vec![(0, tx)];
828                         node.best_block = BestBlock::new(header.block_hash(), height);
829                         match i {
830                                 1 => {
831                                         node.node.transactions_confirmed(&header, &txdata, height);
832                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
833                                 },
834                                 x if x == depth => {
835                                         node.node.best_block_updated(&header, height);
836                                         node.chain_monitor.best_block_updated(&header, height);
837                                 },
838                                 _ => {},
839                         }
840                 }
841         }
842         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
843                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
844         }
845
846         #[test]
847         fn test_background_processor() {
848                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
849                 // updates. Also test that when new updates are available, the manager signals that it needs
850                 // re-persistence and is successfully re-persisted.
851                 let nodes = create_nodes(2, "test_background_processor".to_string());
852
853                 // Go through the channel creation process so that each node has something to persist. Since
854                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
855                 // avoid a race with processing events.
856                 let tx = open_channel!(nodes[0], nodes[1], 100000);
857
858                 // Initiate the background processors to watch each node.
859                 let data_dir = nodes[0].persister.get_data_dir();
860                 let persister = Arc::new(Persister::new(data_dir));
861                 let event_handler = |_: _| {};
862                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
863
864                 macro_rules! check_persisted_data {
865                         ($node: expr, $filepath: expr) => {
866                                 let mut expected_bytes = Vec::new();
867                                 loop {
868                                         expected_bytes.clear();
869                                         match $node.write(&mut expected_bytes) {
870                                                 Ok(()) => {
871                                                         match std::fs::read($filepath) {
872                                                                 Ok(bytes) => {
873                                                                         if bytes == expected_bytes {
874                                                                                 break
875                                                                         } else {
876                                                                                 continue
877                                                                         }
878                                                                 },
879                                                                 Err(_) => continue
880                                                         }
881                                                 },
882                                                 Err(e) => panic!("Unexpected error: {}", e)
883                                         }
884                                 }
885                         }
886                 }
887
888                 // Check that the initial channel manager data is persisted as expected.
889                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
890                 check_persisted_data!(nodes[0].node, filepath.clone());
891
892                 loop {
893                         if !nodes[0].node.get_persistence_condvar_value() { break }
894                 }
895
896                 // Force-close the channel.
897                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
898
899                 // Check that the force-close updates are persisted.
900                 check_persisted_data!(nodes[0].node, filepath.clone());
901                 loop {
902                         if !nodes[0].node.get_persistence_condvar_value() { break }
903                 }
904
905                 // Check network graph is persisted
906                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
907                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
908
909                 // Check scorer is persisted
910                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
911                 check_persisted_data!(nodes[0].scorer, filepath.clone());
912
913                 assert!(bg_processor.stop().is_ok());
914         }
915
916         #[test]
917         fn test_timer_tick_called() {
918                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
919                 // `FRESHNESS_TIMER`.
920                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
921                 let data_dir = nodes[0].persister.get_data_dir();
922                 let persister = Arc::new(Persister::new(data_dir));
923                 let event_handler = |_: _| {};
924                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
925                 loop {
926                         let log_entries = nodes[0].logger.lines.lock().unwrap();
927                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
928                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
929                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
930                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
931                                 break
932                         }
933                 }
934
935                 assert!(bg_processor.stop().is_ok());
936         }
937
938         #[test]
939         fn test_channel_manager_persist_error() {
940                 // Test that if we encounter an error during manager persistence, the thread panics.
941                 let nodes = create_nodes(2, "test_persist_error".to_string());
942                 open_channel!(nodes[0], nodes[1], 100000);
943
944                 let data_dir = nodes[0].persister.get_data_dir();
945                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
946                 let event_handler = |_: _| {};
947                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
948                 match bg_processor.join() {
949                         Ok(_) => panic!("Expected error persisting manager"),
950                         Err(e) => {
951                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
952                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
953                         },
954                 }
955         }
956
957         #[test]
958         fn test_network_graph_persist_error() {
959                 // Test that if we encounter an error during network graph persistence, an error gets returned.
960                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
961                 let data_dir = nodes[0].persister.get_data_dir();
962                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
963                 let event_handler = |_: _| {};
964                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
965
966                 match bg_processor.stop() {
967                         Ok(_) => panic!("Expected error persisting network graph"),
968                         Err(e) => {
969                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
970                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
971                         },
972                 }
973         }
974
975         #[test]
976         fn test_scorer_persist_error() {
977                 // Test that if we encounter an error during scorer persistence, an error gets returned.
978                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
979                 let data_dir = nodes[0].persister.get_data_dir();
980                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
981                 let event_handler = |_: _| {};
982                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
983
984                 match bg_processor.stop() {
985                         Ok(_) => panic!("Expected error persisting scorer"),
986                         Err(e) => {
987                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
988                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
989                         },
990                 }
991         }
992
993         #[test]
994         fn test_background_event_handling() {
995                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
996                 let channel_value = 100000;
997                 let data_dir = nodes[0].persister.get_data_dir();
998                 let persister = Arc::new(Persister::new(data_dir.clone()));
999
1000                 // Set up a background event handler for FundingGenerationReady events.
1001                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1002                 let event_handler = move |event: Event| match event {
1003                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1004                         Event::ChannelReady { .. } => {},
1005                         _ => panic!("Unexpected event: {:?}", event),
1006                 };
1007
1008                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1009
1010                 // Open a channel and check that the FundingGenerationReady event was handled.
1011                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1012                 let (temporary_channel_id, funding_tx) = receiver
1013                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1014                         .expect("FundingGenerationReady not handled within deadline");
1015                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1016
1017                 // Confirm the funding transaction.
1018                 confirm_transaction(&mut nodes[0], &funding_tx);
1019                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1020                 confirm_transaction(&mut nodes[1], &funding_tx);
1021                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1022                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1023                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1024                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1025                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1026
1027                 assert!(bg_processor.stop().is_ok());
1028
1029                 // Set up a background event handler for SpendableOutputs events.
1030                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1031                 let event_handler = move |event: Event| match event {
1032                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1033                         Event::ChannelReady { .. } => {},
1034                         Event::ChannelClosed { .. } => {},
1035                         _ => panic!("Unexpected event: {:?}", event),
1036                 };
1037                 let persister = Arc::new(Persister::new(data_dir));
1038                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1039
1040                 // Force close the channel and check that the SpendableOutputs event was handled.
1041                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1042                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1043                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1044
1045                 let event = receiver
1046                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1047                         .expect("Events not handled within deadline");
1048                 match event {
1049                         Event::SpendableOutputs { .. } => {},
1050                         _ => panic!("Unexpected event: {:?}", event),
1051                 }
1052
1053                 assert!(bg_processor.stop().is_ok());
1054         }
1055
1056         #[test]
1057         fn test_scorer_persistence() {
1058                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1059                 let data_dir = nodes[0].persister.get_data_dir();
1060                 let persister = Arc::new(Persister::new(data_dir));
1061                 let event_handler = |_: _| {};
1062                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1063
1064                 loop {
1065                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1066                         let expected_log = "Persisting scorer".to_string();
1067                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1068                                 break
1069                         }
1070                 }
1071
1072                 assert!(bg_processor.stop().is_ok());
1073         }
1074
1075         #[test]
1076         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1077                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1078                 let data_dir = nodes[0].persister.get_data_dir();
1079                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1080                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1081                 let network_graph = nodes[0].network_graph.clone();
1082                 let features = ChannelFeatures::empty();
1083                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1084                         .expect("Failed to update channel from partial announcement");
1085                 let original_graph_description = network_graph.to_string();
1086                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1087                 assert_eq!(network_graph.read_only().channels().len(), 1);
1088
1089                 let event_handler = |_: _| {};
1090                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1091
1092                 loop {
1093                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1094                         let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1095                         if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1096                                 .unwrap_or(&0) > 1
1097                         {
1098                                 // Wait until the loop has gone around at least twice.
1099                                 break
1100                         }
1101                 }
1102
1103                 let initialization_input = vec![
1104                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1105                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1106                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1107                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1108                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1109                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1110                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1111                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1112                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1113                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1114                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1115                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1116                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1117                 ];
1118                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1119
1120                 // this should have added two channels
1121                 assert_eq!(network_graph.read_only().channels().len(), 3);
1122
1123                 let _ = receiver
1124                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1125                         .expect("Network graph not pruned within deadline");
1126
1127                 background_processor.stop().unwrap();
1128
1129                 // all channels should now be pruned
1130                 assert_eq!(network_graph.read_only().channels().len(), 0);
1131         }
1132
1133         #[test]
1134         fn test_invoice_payer() {
1135                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1136                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1137                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1138
1139                 // Initiate the background processors to watch each node.
1140                 let data_dir = nodes[0].persister.get_data_dir();
1141                 let persister = Arc::new(Persister::new(data_dir));
1142                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1143                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1144                 let event_handler = Arc::clone(&invoice_payer);
1145                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1146                 assert!(bg_processor.stop().is_ok());
1147         }
1148 }