]> git.bitcoin.ninja Git - rust-lightning/blob - lightning-background-processor/src/lib.rs
Use TestScorer in BackgroundProcessor testing
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
15
16 #[cfg(any(test, feature = "std"))]
17 extern crate core;
18
19 #[macro_use] extern crate lightning;
20 extern crate lightning_rapid_gossip_sync;
21
22 use lightning::chain;
23 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
24 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
25 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
26 use lightning::ln::channelmanager::ChannelManager;
27 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
28 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
29 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
30 use lightning::routing::router::Router;
31 use lightning::routing::scoring::WriteableScore;
32 use lightning::util::events::{Event, EventHandler, EventsProvider};
33 use lightning::util::logger::Logger;
34 use lightning::util::persist::Persister;
35 use lightning_rapid_gossip_sync::RapidGossipSync;
36 use lightning::io;
37
38 use core::ops::Deref;
39 use core::time::Duration;
40
41 #[cfg(feature = "std")]
42 use std::sync::Arc;
43 #[cfg(feature = "std")]
44 use core::sync::atomic::{AtomicBool, Ordering};
45 #[cfg(feature = "std")]
46 use std::thread::{self, JoinHandle};
47 #[cfg(feature = "std")]
48 use std::time::Instant;
49
50 #[cfg(feature = "futures")]
51 use futures_util::{select_biased, future::FutureExt, task};
52
53 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
54 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
55 /// responsibilities are:
56 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
57 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
58 ///   writing it to disk/backups by invoking the callback given to it at startup.
59 ///   [`ChannelManager`] persistence should be done in the background.
60 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
61 ///   at the appropriate intervals.
62 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
63 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
64 ///
65 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
66 /// upon as doing so may result in high latency.
67 ///
68 /// # Note
69 ///
70 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
71 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
72 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
73 /// unilateral chain closure fees are at risk.
74 ///
75 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
76 /// [`Event`]: lightning::util::events::Event
77 #[cfg(feature = "std")]
78 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
79 pub struct BackgroundProcessor {
80         stop_thread: Arc<AtomicBool>,
81         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
82 }
83
84 #[cfg(not(test))]
85 const FRESHNESS_TIMER: u64 = 60;
86 #[cfg(test)]
87 const FRESHNESS_TIMER: u64 = 1;
88
89 #[cfg(all(not(test), not(debug_assertions)))]
90 const PING_TIMER: u64 = 10;
91 /// Signature operations take a lot longer without compiler optimisations.
92 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
93 /// timeout is reached.
94 #[cfg(all(not(test), debug_assertions))]
95 const PING_TIMER: u64 = 30;
96 #[cfg(test)]
97 const PING_TIMER: u64 = 1;
98
99 /// Prune the network graph of stale entries hourly.
100 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
101
102 #[cfg(not(test))]
103 const SCORER_PERSIST_TIMER: u64 = 30;
104 #[cfg(test)]
105 const SCORER_PERSIST_TIMER: u64 = 1;
106
107 #[cfg(not(test))]
108 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
109 #[cfg(test)]
110 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
111
112 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
113 pub enum GossipSync<
114         P: Deref<Target = P2PGossipSync<G, A, L>>,
115         R: Deref<Target = RapidGossipSync<G, L>>,
116         G: Deref<Target = NetworkGraph<L>>,
117         A: Deref,
118         L: Deref,
119 >
120 where A::Target: chain::Access, L::Target: Logger {
121         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
122         P2P(P),
123         /// Rapid gossip sync from a trusted server.
124         Rapid(R),
125         /// No gossip sync.
126         None,
127 }
128
129 impl<
130         P: Deref<Target = P2PGossipSync<G, A, L>>,
131         R: Deref<Target = RapidGossipSync<G, L>>,
132         G: Deref<Target = NetworkGraph<L>>,
133         A: Deref,
134         L: Deref,
135 > GossipSync<P, R, G, A, L>
136 where A::Target: chain::Access, L::Target: Logger {
137         fn network_graph(&self) -> Option<&G> {
138                 match self {
139                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
140                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
141                         GossipSync::None => None,
142                 }
143         }
144
145         fn prunable_network_graph(&self) -> Option<&G> {
146                 match self {
147                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
148                         GossipSync::Rapid(gossip_sync) => {
149                                 if gossip_sync.is_initial_sync_complete() {
150                                         Some(gossip_sync.network_graph())
151                                 } else {
152                                         None
153                                 }
154                         },
155                         GossipSync::None => None,
156                 }
157         }
158 }
159
160 /// (C-not exported) as the bindings concretize everything and have constructors for us
161 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
162         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
163 where
164         A::Target: chain::Access,
165         L::Target: Logger,
166 {
167         /// Initializes a new [`GossipSync::P2P`] variant.
168         pub fn p2p(gossip_sync: P) -> Self {
169                 GossipSync::P2P(gossip_sync)
170         }
171 }
172
173 /// (C-not exported) as the bindings concretize everything and have constructors for us
174 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
175         GossipSync<
176                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
177                 R,
178                 G,
179                 &'a (dyn chain::Access + Send + Sync),
180                 L,
181         >
182 where
183         L::Target: Logger,
184 {
185         /// Initializes a new [`GossipSync::Rapid`] variant.
186         pub fn rapid(gossip_sync: R) -> Self {
187                 GossipSync::Rapid(gossip_sync)
188         }
189 }
190
191 /// (C-not exported) as the bindings concretize everything and have constructors for us
192 impl<'a, L: Deref>
193         GossipSync<
194                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
195                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
196                 &'a NetworkGraph<L>,
197                 &'a (dyn chain::Access + Send + Sync),
198                 L,
199         >
200 where
201         L::Target: Logger,
202 {
203         /// Initializes a new [`GossipSync::None`] variant.
204         pub fn none() -> Self {
205                 GossipSync::None
206         }
207 }
208
209 fn handle_network_graph_update<L: Deref>(
210         network_graph: &NetworkGraph<L>, event: &Event
211 ) where L::Target: Logger {
212         if let Event::PaymentPathFailed { ref network_update, .. } = event {
213                 if let Some(network_update) = network_update {
214                         network_graph.handle_network_update(&network_update);
215                 }
216         }
217 }
218
219 macro_rules! define_run_body {
220         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
221          $channel_manager: ident, $process_channel_manager_events: expr,
222          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
223          $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
224         => { {
225                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
226                 $channel_manager.timer_tick_occurred();
227
228                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
229                 let mut last_ping_call = $get_timer(PING_TIMER);
230                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
231                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
232                 let mut have_pruned = false;
233
234                 loop {
235                         $process_channel_manager_events;
236                         $process_chain_monitor_events;
237
238                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
239                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
240                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
241                         // without running the normal event processing above and handing events to users.
242                         //
243                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
244                         // processing a message effectively at any point during this loop. In order to
245                         // minimize the time between such processing completing and persisting the updated
246                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
247                         // generally, and as a fallback place such blocking only immediately before
248                         // persistence.
249                         $peer_manager.process_events();
250
251                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
252                         // see `await_start`'s use below.
253                         let mut await_start = $get_timer(1);
254                         let updates_available = $await;
255                         let await_slow = $timer_elapsed(&mut await_start, 1);
256
257                         if updates_available {
258                                 log_trace!($logger, "Persisting ChannelManager...");
259                                 $persister.persist_manager(&*$channel_manager)?;
260                                 log_trace!($logger, "Done persisting ChannelManager.");
261                         }
262                         // Exit the loop if the background processor was requested to stop.
263                         if $loop_exit_check {
264                                 log_trace!($logger, "Terminating background processor.");
265                                 break;
266                         }
267                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
268                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
269                                 $channel_manager.timer_tick_occurred();
270                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
271                         }
272                         if await_slow {
273                                 // On various platforms, we may be starved of CPU cycles for several reasons.
274                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
275                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
276                                 // may not get any cycles.
277                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
278                                 // full second, at which point we assume sockets may have been killed (they
279                                 // appear to be at least on some platforms, even if it has only been a second).
280                                 // Note that we have to take care to not get here just because user event
281                                 // processing was slow at the top of the loop. For example, the sample client
282                                 // may call Bitcoin Core RPCs during event handling, which very often takes
283                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
284                                 // peers.
285                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
286                                 $peer_manager.disconnect_all_peers();
287                                 last_ping_call = $get_timer(PING_TIMER);
288                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
289                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
290                                 $peer_manager.timer_tick_occurred();
291                                 last_ping_call = $get_timer(PING_TIMER);
292                         }
293
294                         // Note that we want to run a graph prune once not long after startup before
295                         // falling back to our usual hourly prunes. This avoids short-lived clients never
296                         // pruning their network graph. We run once 60 seconds after startup before
297                         // continuing our normal cadence.
298                         if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
299                                 // The network graph must not be pruned while rapid sync completion is pending
300                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
301                                         #[cfg(feature = "std")] {
302                                                 log_trace!($logger, "Pruning and persisting network graph.");
303                                                 network_graph.remove_stale_channels_and_tracking();
304                                         }
305                                         #[cfg(not(feature = "std"))] {
306                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
307                                                 log_trace!($logger, "Persisting network graph.");
308                                         }
309
310                                         if let Err(e) = $persister.persist_graph(network_graph) {
311                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
312                                         }
313
314                                         last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
315                                         have_pruned = true;
316                                 }
317                         }
318
319                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
320                                 if let Some(ref scorer) = $scorer {
321                                         log_trace!($logger, "Persisting scorer");
322                                         if let Err(e) = $persister.persist_scorer(&scorer) {
323                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
324                                         }
325                                 }
326                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
327                         }
328                 }
329
330                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
331                 // some races where users quit while channel updates were in-flight, with
332                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
333                 $persister.persist_manager(&*$channel_manager)?;
334
335                 // Persist Scorer on exit
336                 if let Some(ref scorer) = $scorer {
337                         $persister.persist_scorer(&scorer)?;
338                 }
339
340                 // Persist NetworkGraph on exit
341                 if let Some(network_graph) = $gossip_sync.network_graph() {
342                         $persister.persist_graph(network_graph)?;
343                 }
344
345                 Ok(())
346         } }
347 }
348
349 /// Processes background events in a future.
350 ///
351 /// `sleeper` should return a future which completes in the given amount of time and returns a
352 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
353 /// future which outputs true, the loop will exit and this function's future will complete.
354 ///
355 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
356 ///
357 /// Requires the `futures` feature. Note that while this method is available without the `std`
358 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
359 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
360 /// manually instead.
361 #[cfg(feature = "futures")]
362 pub async fn process_events_async<
363         'a,
364         CA: 'static + Deref + Send + Sync,
365         CF: 'static + Deref + Send + Sync,
366         CW: 'static + Deref + Send + Sync,
367         T: 'static + Deref + Send + Sync,
368         ES: 'static + Deref + Send + Sync,
369         NS: 'static + Deref + Send + Sync,
370         SP: 'static + Deref + Send + Sync,
371         F: 'static + Deref + Send + Sync,
372         R: 'static + Deref + Send + Sync,
373         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
374         L: 'static + Deref + Send + Sync,
375         P: 'static + Deref + Send + Sync,
376         Descriptor: 'static + SocketDescriptor + Send + Sync,
377         CMH: 'static + Deref + Send + Sync,
378         RMH: 'static + Deref + Send + Sync,
379         OMH: 'static + Deref + Send + Sync,
380         EventHandlerFuture: core::future::Future<Output = ()>,
381         EventHandler: Fn(Event) -> EventHandlerFuture,
382         PS: 'static + Deref + Send,
383         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
384         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
385         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
386         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
387         UMH: 'static + Deref + Send + Sync,
388         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
389         S: 'static + Deref<Target = SC> + Send + Sync,
390         SC: WriteableScore<'a>,
391         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
392         Sleeper: Fn(Duration) -> SleepFuture
393 >(
394         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
395         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
396         sleeper: Sleeper,
397 ) -> Result<(), io::Error>
398 where
399         CA::Target: 'static + chain::Access,
400         CF::Target: 'static + chain::Filter,
401         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
402         T::Target: 'static + BroadcasterInterface,
403         ES::Target: 'static + EntropySource,
404         NS::Target: 'static + NodeSigner,
405         SP::Target: 'static + SignerProvider,
406         F::Target: 'static + FeeEstimator,
407         R::Target: 'static + Router,
408         L::Target: 'static + Logger,
409         P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
410         CMH::Target: 'static + ChannelMessageHandler,
411         OMH::Target: 'static + OnionMessageHandler,
412         RMH::Target: 'static + RoutingMessageHandler,
413         UMH::Target: 'static + CustomMessageHandler,
414         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
415 {
416         let mut should_break = true;
417         let async_event_handler = |event| {
418                 let network_graph = gossip_sync.network_graph();
419                 let event_handler = &event_handler;
420                 async move {
421                         if let Some(network_graph) = network_graph {
422                                 handle_network_graph_update(network_graph, &event)
423                         }
424                         event_handler(event).await;
425                 }
426         };
427         define_run_body!(persister,
428                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
429                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
430                 gossip_sync, peer_manager, logger, scorer, should_break, {
431                         select_biased! {
432                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
433                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
434                                         should_break = exit;
435                                         false
436                                 }
437                         }
438                 }, |t| sleeper(Duration::from_secs(t)),
439                 |fut: &mut SleepFuture, _| {
440                         let mut waker = task::noop_waker();
441                         let mut ctx = task::Context::from_waker(&mut waker);
442                         core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
443                 })
444 }
445
446 #[cfg(feature = "std")]
447 impl BackgroundProcessor {
448         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
449         /// documentation].
450         ///
451         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
452         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
453         /// either [`join`] or [`stop`].
454         ///
455         /// # Data Persistence
456         ///
457         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
458         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
459         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
460         /// provided implementation.
461         ///
462         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
463         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
464         /// See the `lightning-persister` crate for LDK's provided implementation.
465         ///
466         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
467         /// error or call [`join`] and handle any error that may arise. For the latter case,
468         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
469         ///
470         /// # Event Handling
471         ///
472         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
473         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
474         /// functionality implemented by other handlers.
475         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
476         ///
477         /// # Rapid Gossip Sync
478         ///
479         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
480         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
481         /// until the [`RapidGossipSync`] instance completes its first sync.
482         ///
483         /// [top-level documentation]: BackgroundProcessor
484         /// [`join`]: Self::join
485         /// [`stop`]: Self::stop
486         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
487         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
488         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
489         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
490         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
491         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
492         pub fn start<
493                 'a,
494                 CA: 'static + Deref + Send + Sync,
495                 CF: 'static + Deref + Send + Sync,
496                 CW: 'static + Deref + Send + Sync,
497                 T: 'static + Deref + Send + Sync,
498                 ES: 'static + Deref + Send + Sync,
499                 NS: 'static + Deref + Send + Sync,
500                 SP: 'static + Deref + Send + Sync,
501                 F: 'static + Deref + Send + Sync,
502                 R: 'static + Deref + Send + Sync,
503                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
504                 L: 'static + Deref + Send + Sync,
505                 P: 'static + Deref + Send + Sync,
506                 Descriptor: 'static + SocketDescriptor + Send + Sync,
507                 CMH: 'static + Deref + Send + Sync,
508                 OMH: 'static + Deref + Send + Sync,
509                 RMH: 'static + Deref + Send + Sync,
510                 EH: 'static + EventHandler + Send,
511                 PS: 'static + Deref + Send,
512                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
513                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
514                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
515                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
516                 UMH: 'static + Deref + Send + Sync,
517                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
518                 S: 'static + Deref<Target = SC> + Send + Sync,
519                 SC: WriteableScore<'a>,
520         >(
521                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
522                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
523         ) -> Self
524         where
525                 CA::Target: 'static + chain::Access,
526                 CF::Target: 'static + chain::Filter,
527                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
528                 T::Target: 'static + BroadcasterInterface,
529                 ES::Target: 'static + EntropySource,
530                 NS::Target: 'static + NodeSigner,
531                 SP::Target: 'static + SignerProvider,
532                 F::Target: 'static + FeeEstimator,
533                 R::Target: 'static + Router,
534                 L::Target: 'static + Logger,
535                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
536                 CMH::Target: 'static + ChannelMessageHandler,
537                 OMH::Target: 'static + OnionMessageHandler,
538                 RMH::Target: 'static + RoutingMessageHandler,
539                 UMH::Target: 'static + CustomMessageHandler,
540                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
541         {
542                 let stop_thread = Arc::new(AtomicBool::new(false));
543                 let stop_thread_clone = stop_thread.clone();
544                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
545                         let event_handler = |event| {
546                                 let network_graph = gossip_sync.network_graph();
547                                 if let Some(network_graph) = network_graph {
548                                         handle_network_graph_update(network_graph, &event)
549                                 }
550                                 event_handler.handle_event(event);
551                         };
552                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
553                                 channel_manager, channel_manager.process_pending_events(&event_handler),
554                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
555                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
556                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
557                 });
558                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
559         }
560
561         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
562         /// [`ChannelManager`].
563         ///
564         /// # Panics
565         ///
566         /// This function panics if the background thread has panicked such as while persisting or
567         /// handling events.
568         ///
569         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
570         pub fn join(mut self) -> Result<(), std::io::Error> {
571                 assert!(self.thread_handle.is_some());
572                 self.join_thread()
573         }
574
575         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
576         /// [`ChannelManager`].
577         ///
578         /// # Panics
579         ///
580         /// This function panics if the background thread has panicked such as while persisting or
581         /// handling events.
582         ///
583         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
584         pub fn stop(mut self) -> Result<(), std::io::Error> {
585                 assert!(self.thread_handle.is_some());
586                 self.stop_and_join_thread()
587         }
588
589         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
590                 self.stop_thread.store(true, Ordering::Release);
591                 self.join_thread()
592         }
593
594         fn join_thread(&mut self) -> Result<(), std::io::Error> {
595                 match self.thread_handle.take() {
596                         Some(handle) => handle.join().unwrap(),
597                         None => Ok(()),
598                 }
599         }
600 }
601
602 #[cfg(feature = "std")]
603 impl Drop for BackgroundProcessor {
604         fn drop(&mut self) {
605                 self.stop_and_join_thread().unwrap();
606         }
607 }
608
609 #[cfg(all(feature = "std", test))]
610 mod tests {
611         use bitcoin::blockdata::block::BlockHeader;
612         use bitcoin::blockdata::constants::genesis_block;
613         use bitcoin::blockdata::locktime::PackedLockTime;
614         use bitcoin::blockdata::transaction::{Transaction, TxOut};
615         use bitcoin::network::constants::Network;
616         use lightning::chain::{BestBlock, Confirm, chainmonitor};
617         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
618         use lightning::chain::keysinterface::{InMemorySigner, EntropySource, KeysManager};
619         use lightning::chain::transaction::OutPoint;
620         use lightning::get_event_msg;
621         use lightning::ln::channelmanager;
622         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters};
623         use lightning::ln::features::ChannelFeatures;
624         use lightning::ln::msgs::{ChannelMessageHandler, Init};
625         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
626         use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
627         use lightning::routing::router::{DefaultRouter, RouteHop};
628         use lightning::routing::scoring::{ChannelUsage, Score};
629         use lightning::util::config::UserConfig;
630         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
631         use lightning::util::ser::Writeable;
632         use lightning::util::test_utils;
633         use lightning::util::persist::KVStorePersister;
634         use lightning_invoice::payment::{InvoicePayer, Retry};
635         use lightning_persister::FilesystemPersister;
636         use std::collections::VecDeque;
637         use std::fs;
638         use std::path::PathBuf;
639         use std::sync::{Arc, Mutex};
640         use std::sync::mpsc::SyncSender;
641         use std::time::Duration;
642         use bitcoin::hashes::Hash;
643         use bitcoin::TxMerkleNode;
644         use lightning_rapid_gossip_sync::RapidGossipSync;
645         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
646
647         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
648
649         #[derive(Clone, Hash, PartialEq, Eq)]
650         struct TestDescriptor{}
651         impl SocketDescriptor for TestDescriptor {
652                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
653                         0
654                 }
655
656                 fn disconnect_socket(&mut self) {}
657         }
658
659         type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
660
661         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
662
663         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
664         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
665
666         struct Node {
667                 node: Arc<ChannelManager>,
668                 p2p_gossip_sync: PGS,
669                 rapid_gossip_sync: RGS,
670                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
671                 chain_monitor: Arc<ChainMonitor>,
672                 persister: Arc<FilesystemPersister>,
673                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
674                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
675                 logger: Arc<test_utils::TestLogger>,
676                 best_block: BestBlock,
677                 scorer: Arc<Mutex<TestScorer>>,
678         }
679
680         impl Node {
681                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
682                         GossipSync::P2P(self.p2p_gossip_sync.clone())
683                 }
684
685                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
686                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
687                 }
688
689                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
690                         GossipSync::None
691                 }
692         }
693
694         impl Drop for Node {
695                 fn drop(&mut self) {
696                         let data_dir = self.persister.get_data_dir();
697                         match fs::remove_dir_all(data_dir.clone()) {
698                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
699                                 _ => {}
700                         }
701                 }
702         }
703
704         struct Persister {
705                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
706                 graph_persistence_notifier: Option<SyncSender<()>>,
707                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
708                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
709                 filesystem_persister: FilesystemPersister,
710         }
711
712         impl Persister {
713                 fn new(data_dir: String) -> Self {
714                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
715                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
716                 }
717
718                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
719                         Self { graph_error: Some((error, message)), ..self }
720                 }
721
722                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
723                         Self { graph_persistence_notifier: Some(sender), ..self }
724                 }
725
726                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
727                         Self { manager_error: Some((error, message)), ..self }
728                 }
729
730                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
731                         Self { scorer_error: Some((error, message)), ..self }
732                 }
733         }
734
735         impl KVStorePersister for Persister {
736                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
737                         if key == "manager" {
738                                 if let Some((error, message)) = self.manager_error {
739                                         return Err(std::io::Error::new(error, message))
740                                 }
741                         }
742
743                         if key == "network_graph" {
744                                 if let Some(sender) = &self.graph_persistence_notifier {
745                                         sender.send(()).unwrap();
746                                 };
747
748                                 if let Some((error, message)) = self.graph_error {
749                                         return Err(std::io::Error::new(error, message))
750                                 }
751                         }
752
753                         if key == "scorer" {
754                                 if let Some((error, message)) = self.scorer_error {
755                                         return Err(std::io::Error::new(error, message))
756                                 }
757                         }
758
759                         self.filesystem_persister.persist(key, object)
760                 }
761         }
762
763         struct TestScorer {
764                 event_expectations: Option<VecDeque<TestResult>>,
765         }
766
767         #[derive(Debug)]
768         enum TestResult {
769                 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
770                 PaymentSuccess { path: Vec<RouteHop> },
771                 ProbeFailure { path: Vec<RouteHop> },
772                 ProbeSuccess { path: Vec<RouteHop> },
773         }
774
775         impl TestScorer {
776                 fn new() -> Self {
777                         Self { event_expectations: None }
778                 }
779
780                 fn expect(&mut self, expectation: TestResult) {
781                         self.event_expectations.get_or_insert_with(|| VecDeque::new()).push_back(expectation);
782                 }
783         }
784
785         impl lightning::util::ser::Writeable for TestScorer {
786                 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
787         }
788
789         impl Score for TestScorer {
790                 fn channel_penalty_msat(
791                         &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
792                 ) -> u64 { unimplemented!(); }
793
794                 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
795                         if let Some(expectations) = &mut self.event_expectations {
796                                 match expectations.pop_front().unwrap() {
797                                         TestResult::PaymentFailure { path, short_channel_id } => {
798                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
799                                                 assert_eq!(actual_short_channel_id, short_channel_id);
800                                         },
801                                         TestResult::PaymentSuccess { path } => {
802                                                 panic!("Unexpected successful payment path: {:?}", path)
803                                         },
804                                         TestResult::ProbeFailure { path } => {
805                                                 panic!("Unexpected probe failure: {:?}", path)
806                                         },
807                                         TestResult::ProbeSuccess { path } => {
808                                                 panic!("Unexpected probe success: {:?}", path)
809                                         }
810                                 }
811                         }
812                 }
813
814                 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
815                         if let Some(expectations) = &mut self.event_expectations {
816                                 match expectations.pop_front().unwrap() {
817                                         TestResult::PaymentFailure { path, .. } => {
818                                                 panic!("Unexpected payment path failure: {:?}", path)
819                                         },
820                                         TestResult::PaymentSuccess { path } => {
821                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
822                                         },
823                                         TestResult::ProbeFailure { path } => {
824                                                 panic!("Unexpected probe failure: {:?}", path)
825                                         },
826                                         TestResult::ProbeSuccess { path } => {
827                                                 panic!("Unexpected probe success: {:?}", path)
828                                         }
829                                 }
830                         }
831                 }
832
833                 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
834                         if let Some(expectations) = &mut self.event_expectations {
835                                 match expectations.pop_front().unwrap() {
836                                         TestResult::PaymentFailure { path, .. } => {
837                                                 panic!("Unexpected payment path failure: {:?}", path)
838                                         },
839                                         TestResult::PaymentSuccess { path } => {
840                                                 panic!("Unexpected payment path success: {:?}", path)
841                                         },
842                                         TestResult::ProbeFailure { path } => {
843                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
844                                         },
845                                         TestResult::ProbeSuccess { path } => {
846                                                 panic!("Unexpected probe success: {:?}", path)
847                                         }
848                                 }
849                         }
850                 }
851                 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
852                         if let Some(expectations) = &mut self.event_expectations {
853                                 match expectations.pop_front().unwrap() {
854                                         TestResult::PaymentFailure { path, .. } => {
855                                                 panic!("Unexpected payment path failure: {:?}", path)
856                                         },
857                                         TestResult::PaymentSuccess { path } => {
858                                                 panic!("Unexpected payment path success: {:?}", path)
859                                         },
860                                         TestResult::ProbeFailure { path } => {
861                                                 panic!("Unexpected probe failure: {:?}", path)
862                                         },
863                                         TestResult::ProbeSuccess { path } => {
864                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
865                                         }
866                                 }
867                         }
868                 }
869         }
870
871         impl Drop for TestScorer {
872                 fn drop(&mut self) {
873                         if std::thread::panicking() {
874                                 return;
875                         }
876
877                         if let Some(event_expectations) = &self.event_expectations {
878                                 if !event_expectations.is_empty() {
879                                         panic!("Unsatisfied event expectations: {:?}", event_expectations);
880                                 }
881                         }
882                 }
883         }
884
885         fn get_full_filepath(filepath: String, filename: String) -> String {
886                 let mut path = PathBuf::from(filepath);
887                 path.push(filename);
888                 path.to_str().unwrap().to_string()
889         }
890
891         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
892                 let mut nodes = Vec::new();
893                 for i in 0..num_nodes {
894                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
895                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
896                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
897                         let network = Network::Testnet;
898                         let genesis_block = genesis_block(network);
899                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
900                         let scorer = Arc::new(Mutex::new(TestScorer::new()));
901                         let seed = [i as u8; 32];
902                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
903                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
904                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
905                         let now = Duration::from_secs(genesis_block.header.time as u64);
906                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
907                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
908                         let best_block = BestBlock::from_genesis(network);
909                         let params = ChainParameters { network, best_block };
910                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
911                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
912                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
913                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
914                         let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
915                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
916                         nodes.push(node);
917                 }
918
919                 for i in 0..num_nodes {
920                         for j in (i+1)..num_nodes {
921                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
922                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
923                         }
924                 }
925
926                 nodes
927         }
928
929         macro_rules! open_channel {
930                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
931                         begin_open_channel!($node_a, $node_b, $channel_value);
932                         let events = $node_a.node.get_and_clear_pending_events();
933                         assert_eq!(events.len(), 1);
934                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
935                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
936                         tx
937                 }}
938         }
939
940         macro_rules! begin_open_channel {
941                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
942                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
943                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
944                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
945                 }}
946         }
947
948         macro_rules! handle_funding_generation_ready {
949                 ($event: expr, $channel_value: expr) => {{
950                         match $event {
951                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
952                                         assert_eq!(channel_value_satoshis, $channel_value);
953                                         assert_eq!(user_channel_id, 42);
954
955                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
956                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
957                                         }]};
958                                         (temporary_channel_id, tx)
959                                 },
960                                 _ => panic!("Unexpected event"),
961                         }
962                 }}
963         }
964
965         macro_rules! end_open_channel {
966                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
967                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
968                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
969                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
970                 }}
971         }
972
973         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
974                 for i in 1..=depth {
975                         let prev_blockhash = node.best_block.block_hash();
976                         let height = node.best_block.height() + 1;
977                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
978                         let txdata = vec![(0, tx)];
979                         node.best_block = BestBlock::new(header.block_hash(), height);
980                         match i {
981                                 1 => {
982                                         node.node.transactions_confirmed(&header, &txdata, height);
983                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
984                                 },
985                                 x if x == depth => {
986                                         node.node.best_block_updated(&header, height);
987                                         node.chain_monitor.best_block_updated(&header, height);
988                                 },
989                                 _ => {},
990                         }
991                 }
992         }
993         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
994                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
995         }
996
997         #[test]
998         fn test_background_processor() {
999                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1000                 // updates. Also test that when new updates are available, the manager signals that it needs
1001                 // re-persistence and is successfully re-persisted.
1002                 let nodes = create_nodes(2, "test_background_processor".to_string());
1003
1004                 // Go through the channel creation process so that each node has something to persist. Since
1005                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1006                 // avoid a race with processing events.
1007                 let tx = open_channel!(nodes[0], nodes[1], 100000);
1008
1009                 // Initiate the background processors to watch each node.
1010                 let data_dir = nodes[0].persister.get_data_dir();
1011                 let persister = Arc::new(Persister::new(data_dir));
1012                 let event_handler = |_: _| {};
1013                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1014
1015                 macro_rules! check_persisted_data {
1016                         ($node: expr, $filepath: expr) => {
1017                                 let mut expected_bytes = Vec::new();
1018                                 loop {
1019                                         expected_bytes.clear();
1020                                         match $node.write(&mut expected_bytes) {
1021                                                 Ok(()) => {
1022                                                         match std::fs::read($filepath) {
1023                                                                 Ok(bytes) => {
1024                                                                         if bytes == expected_bytes {
1025                                                                                 break
1026                                                                         } else {
1027                                                                                 continue
1028                                                                         }
1029                                                                 },
1030                                                                 Err(_) => continue
1031                                                         }
1032                                                 },
1033                                                 Err(e) => panic!("Unexpected error: {}", e)
1034                                         }
1035                                 }
1036                         }
1037                 }
1038
1039                 // Check that the initial channel manager data is persisted as expected.
1040                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1041                 check_persisted_data!(nodes[0].node, filepath.clone());
1042
1043                 loop {
1044                         if !nodes[0].node.get_persistence_condvar_value() { break }
1045                 }
1046
1047                 // Force-close the channel.
1048                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1049
1050                 // Check that the force-close updates are persisted.
1051                 check_persisted_data!(nodes[0].node, filepath.clone());
1052                 loop {
1053                         if !nodes[0].node.get_persistence_condvar_value() { break }
1054                 }
1055
1056                 // Check network graph is persisted
1057                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1058                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1059
1060                 // Check scorer is persisted
1061                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1062                 check_persisted_data!(nodes[0].scorer, filepath.clone());
1063
1064                 assert!(bg_processor.stop().is_ok());
1065         }
1066
1067         #[test]
1068         fn test_timer_tick_called() {
1069                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1070                 // `FRESHNESS_TIMER`.
1071                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1072                 let data_dir = nodes[0].persister.get_data_dir();
1073                 let persister = Arc::new(Persister::new(data_dir));
1074                 let event_handler = |_: _| {};
1075                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1076                 loop {
1077                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1078                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1079                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1080                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1081                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1082                                 break
1083                         }
1084                 }
1085
1086                 assert!(bg_processor.stop().is_ok());
1087         }
1088
1089         #[test]
1090         fn test_channel_manager_persist_error() {
1091                 // Test that if we encounter an error during manager persistence, the thread panics.
1092                 let nodes = create_nodes(2, "test_persist_error".to_string());
1093                 open_channel!(nodes[0], nodes[1], 100000);
1094
1095                 let data_dir = nodes[0].persister.get_data_dir();
1096                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1097                 let event_handler = |_: _| {};
1098                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1099                 match bg_processor.join() {
1100                         Ok(_) => panic!("Expected error persisting manager"),
1101                         Err(e) => {
1102                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1103                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1104                         },
1105                 }
1106         }
1107
1108         #[test]
1109         fn test_network_graph_persist_error() {
1110                 // Test that if we encounter an error during network graph persistence, an error gets returned.
1111                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1112                 let data_dir = nodes[0].persister.get_data_dir();
1113                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1114                 let event_handler = |_: _| {};
1115                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1116
1117                 match bg_processor.stop() {
1118                         Ok(_) => panic!("Expected error persisting network graph"),
1119                         Err(e) => {
1120                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1121                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1122                         },
1123                 }
1124         }
1125
1126         #[test]
1127         fn test_scorer_persist_error() {
1128                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1129                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1130                 let data_dir = nodes[0].persister.get_data_dir();
1131                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1132                 let event_handler = |_: _| {};
1133                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1134
1135                 match bg_processor.stop() {
1136                         Ok(_) => panic!("Expected error persisting scorer"),
1137                         Err(e) => {
1138                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1139                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1140                         },
1141                 }
1142         }
1143
1144         #[test]
1145         fn test_background_event_handling() {
1146                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1147                 let channel_value = 100000;
1148                 let data_dir = nodes[0].persister.get_data_dir();
1149                 let persister = Arc::new(Persister::new(data_dir.clone()));
1150
1151                 // Set up a background event handler for FundingGenerationReady events.
1152                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1153                 let event_handler = move |event: Event| match event {
1154                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1155                         Event::ChannelReady { .. } => {},
1156                         _ => panic!("Unexpected event: {:?}", event),
1157                 };
1158
1159                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1160
1161                 // Open a channel and check that the FundingGenerationReady event was handled.
1162                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1163                 let (temporary_channel_id, funding_tx) = receiver
1164                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1165                         .expect("FundingGenerationReady not handled within deadline");
1166                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1167
1168                 // Confirm the funding transaction.
1169                 confirm_transaction(&mut nodes[0], &funding_tx);
1170                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1171                 confirm_transaction(&mut nodes[1], &funding_tx);
1172                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1173                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1174                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1175                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1176                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1177
1178                 assert!(bg_processor.stop().is_ok());
1179
1180                 // Set up a background event handler for SpendableOutputs events.
1181                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1182                 let event_handler = move |event: Event| match event {
1183                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1184                         Event::ChannelReady { .. } => {},
1185                         Event::ChannelClosed { .. } => {},
1186                         _ => panic!("Unexpected event: {:?}", event),
1187                 };
1188                 let persister = Arc::new(Persister::new(data_dir));
1189                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1190
1191                 // Force close the channel and check that the SpendableOutputs event was handled.
1192                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1193                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1194                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1195
1196                 let event = receiver
1197                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1198                         .expect("Events not handled within deadline");
1199                 match event {
1200                         Event::SpendableOutputs { .. } => {},
1201                         _ => panic!("Unexpected event: {:?}", event),
1202                 }
1203
1204                 assert!(bg_processor.stop().is_ok());
1205         }
1206
1207         #[test]
1208         fn test_scorer_persistence() {
1209                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1210                 let data_dir = nodes[0].persister.get_data_dir();
1211                 let persister = Arc::new(Persister::new(data_dir));
1212                 let event_handler = |_: _| {};
1213                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1214
1215                 loop {
1216                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1217                         let expected_log = "Persisting scorer".to_string();
1218                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1219                                 break
1220                         }
1221                 }
1222
1223                 assert!(bg_processor.stop().is_ok());
1224         }
1225
1226         #[test]
1227         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1228                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1229                 let data_dir = nodes[0].persister.get_data_dir();
1230                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1231                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1232                 let network_graph = nodes[0].network_graph.clone();
1233                 let features = ChannelFeatures::empty();
1234                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1235                         .expect("Failed to update channel from partial announcement");
1236                 let original_graph_description = network_graph.to_string();
1237                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1238                 assert_eq!(network_graph.read_only().channels().len(), 1);
1239
1240                 let event_handler = |_: _| {};
1241                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1242
1243                 loop {
1244                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1245                         let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1246                         if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1247                                 .unwrap_or(&0) > 1
1248                         {
1249                                 // Wait until the loop has gone around at least twice.
1250                                 break
1251                         }
1252                 }
1253
1254                 let initialization_input = vec![
1255                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1256                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1257                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1258                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1259                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1260                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1261                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1262                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1263                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1264                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1265                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1266                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1267                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1268                 ];
1269                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1270
1271                 // this should have added two channels
1272                 assert_eq!(network_graph.read_only().channels().len(), 3);
1273
1274                 let _ = receiver
1275                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1276                         .expect("Network graph not pruned within deadline");
1277
1278                 background_processor.stop().unwrap();
1279
1280                 // all channels should now be pruned
1281                 assert_eq!(network_graph.read_only().channels().len(), 0);
1282         }
1283
1284         #[test]
1285         fn test_invoice_payer() {
1286                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1287                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1288                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1289
1290                 // Initiate the background processors to watch each node.
1291                 let data_dir = nodes[0].persister.get_data_dir();
1292                 let persister = Arc::new(Persister::new(data_dir));
1293                 let router = Arc::new(DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer)));
1294                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1295                 let event_handler = Arc::clone(&invoice_payer);
1296                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1297                 assert!(bg_processor.stop().is_ok());
1298         }
1299 }