MessageRouter trait for OnionMessenger
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
15
16 #[cfg(any(test, feature = "std"))]
17 extern crate core;
18
19 #[cfg(not(feature = "std"))]
20 extern crate alloc;
21
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
24
25 use lightning::chain;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::events::{Event, PathFailure};
30 #[cfg(feature = "std")]
31 use lightning::events::{EventHandler, EventsProvider};
32 use lightning::ln::channelmanager::ChannelManager;
33 use lightning::ln::peer_handler::APeerManager;
34 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
35 use lightning::routing::utxo::UtxoLookup;
36 use lightning::routing::router::Router;
37 use lightning::routing::scoring::{Score, WriteableScore};
38 use lightning::util::logger::Logger;
39 use lightning::util::persist::Persister;
40 #[cfg(feature = "std")]
41 use lightning::util::wakers::Sleeper;
42 use lightning_rapid_gossip_sync::RapidGossipSync;
43
44 use core::ops::Deref;
45 use core::time::Duration;
46
47 #[cfg(feature = "std")]
48 use std::sync::Arc;
49 #[cfg(feature = "std")]
50 use core::sync::atomic::{AtomicBool, Ordering};
51 #[cfg(feature = "std")]
52 use std::thread::{self, JoinHandle};
53 #[cfg(feature = "std")]
54 use std::time::Instant;
55
56 #[cfg(not(feature = "std"))]
57 use alloc::vec::Vec;
58
59 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
60 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
61 /// responsibilities are:
62 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
63 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
64 ///   writing it to disk/backups by invoking the callback given to it at startup.
65 ///   [`ChannelManager`] persistence should be done in the background.
66 /// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
67 ///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
68 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
69 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
70 ///
71 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
72 /// upon as doing so may result in high latency.
73 ///
74 /// # Note
75 ///
76 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
77 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
78 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
79 /// unilateral chain closure fees are at risk.
80 ///
81 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
82 /// [`Event`]: lightning::events::Event
83 /// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
84 /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
85 #[cfg(feature = "std")]
86 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
87 pub struct BackgroundProcessor {
88         stop_thread: Arc<AtomicBool>,
89         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
90 }
91
92 #[cfg(not(test))]
93 const FRESHNESS_TIMER: u64 = 60;
94 #[cfg(test)]
95 const FRESHNESS_TIMER: u64 = 1;
96
97 #[cfg(all(not(test), not(debug_assertions)))]
98 const PING_TIMER: u64 = 10;
99 /// Signature operations take a lot longer without compiler optimisations.
100 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
101 /// timeout is reached.
102 #[cfg(all(not(test), debug_assertions))]
103 const PING_TIMER: u64 = 30;
104 #[cfg(test)]
105 const PING_TIMER: u64 = 1;
106
107 /// Prune the network graph of stale entries hourly.
108 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
109
110 #[cfg(not(test))]
111 const SCORER_PERSIST_TIMER: u64 = 60 * 60;
112 #[cfg(test)]
113 const SCORER_PERSIST_TIMER: u64 = 1;
114
115 #[cfg(not(test))]
116 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
117 #[cfg(test)]
118 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
119
120 #[cfg(not(test))]
121 const REBROADCAST_TIMER: u64 = 30;
122 #[cfg(test)]
123 const REBROADCAST_TIMER: u64 = 1;
124
125 #[cfg(feature = "futures")]
126 /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
127 const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
128 #[cfg(feature = "futures")]
129 const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
130         min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
131
132 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
133 pub enum GossipSync<
134         P: Deref<Target = P2PGossipSync<G, U, L>>,
135         R: Deref<Target = RapidGossipSync<G, L>>,
136         G: Deref<Target = NetworkGraph<L>>,
137         U: Deref,
138         L: Deref,
139 >
140 where U::Target: UtxoLookup, L::Target: Logger {
141         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
142         P2P(P),
143         /// Rapid gossip sync from a trusted server.
144         Rapid(R),
145         /// No gossip sync.
146         None,
147 }
148
149 impl<
150         P: Deref<Target = P2PGossipSync<G, U, L>>,
151         R: Deref<Target = RapidGossipSync<G, L>>,
152         G: Deref<Target = NetworkGraph<L>>,
153         U: Deref,
154         L: Deref,
155 > GossipSync<P, R, G, U, L>
156 where U::Target: UtxoLookup, L::Target: Logger {
157         fn network_graph(&self) -> Option<&G> {
158                 match self {
159                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
160                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
161                         GossipSync::None => None,
162                 }
163         }
164
165         fn prunable_network_graph(&self) -> Option<&G> {
166                 match self {
167                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
168                         GossipSync::Rapid(gossip_sync) => {
169                                 if gossip_sync.is_initial_sync_complete() {
170                                         Some(gossip_sync.network_graph())
171                                 } else {
172                                         None
173                                 }
174                         },
175                         GossipSync::None => None,
176                 }
177         }
178 }
179
180 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
181 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
182         GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
183 where
184         U::Target: UtxoLookup,
185         L::Target: Logger,
186 {
187         /// Initializes a new [`GossipSync::P2P`] variant.
188         pub fn p2p(gossip_sync: P) -> Self {
189                 GossipSync::P2P(gossip_sync)
190         }
191 }
192
193 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
194 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
195         GossipSync<
196                 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
197                 R,
198                 G,
199                 &'a (dyn UtxoLookup + Send + Sync),
200                 L,
201         >
202 where
203         L::Target: Logger,
204 {
205         /// Initializes a new [`GossipSync::Rapid`] variant.
206         pub fn rapid(gossip_sync: R) -> Self {
207                 GossipSync::Rapid(gossip_sync)
208         }
209 }
210
211 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
212 impl<'a, L: Deref>
213         GossipSync<
214                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
215                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
216                 &'a NetworkGraph<L>,
217                 &'a (dyn UtxoLookup + Send + Sync),
218                 L,
219         >
220 where
221         L::Target: Logger,
222 {
223         /// Initializes a new [`GossipSync::None`] variant.
224         pub fn none() -> Self {
225                 GossipSync::None
226         }
227 }
228
229 fn handle_network_graph_update<L: Deref>(
230         network_graph: &NetworkGraph<L>, event: &Event
231 ) where L::Target: Logger {
232         if let Event::PaymentPathFailed {
233                 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
234         {
235                 network_graph.handle_network_update(upd);
236         }
237 }
238
239 /// Updates scorer based on event and returns whether an update occurred so we can decide whether
240 /// to persist.
241 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
242         scorer: &'a S, event: &Event
243 ) -> bool {
244         let mut score = scorer.lock();
245         match event {
246                 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
247                         score.payment_path_failed(path, *scid);
248                 },
249                 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
250                         // Reached if the destination explicitly failed it back. We treat this as a successful probe
251                         // because the payment made it all the way to the destination with sufficient liquidity.
252                         score.probe_successful(path);
253                 },
254                 Event::PaymentPathSuccessful { path, .. } => {
255                         score.payment_path_successful(path);
256                 },
257                 Event::ProbeSuccessful { path, .. } => {
258                         score.probe_successful(path);
259                 },
260                 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
261                         score.probe_failed(path, *scid);
262                 },
263                 _ => return false,
264         }
265         true
266 }
267
268 macro_rules! define_run_body {
269         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
270          $channel_manager: ident, $process_channel_manager_events: expr,
271          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
272          $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
273          $check_slow_await: expr)
274         => { {
275                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
276                 $channel_manager.timer_tick_occurred();
277                 log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
278                 $chain_monitor.rebroadcast_pending_claims();
279
280                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
281                 let mut last_ping_call = $get_timer(PING_TIMER);
282                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
283                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
284                 let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
285                 let mut have_pruned = false;
286
287                 loop {
288                         $process_channel_manager_events;
289                         $process_chain_monitor_events;
290
291                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
292                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
293                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
294                         // without running the normal event processing above and handing events to users.
295                         //
296                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
297                         // processing a message effectively at any point during this loop. In order to
298                         // minimize the time between such processing completing and persisting the updated
299                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
300                         // generally, and as a fallback place such blocking only immediately before
301                         // persistence.
302                         $peer_manager.as_ref().process_events();
303
304                         // Exit the loop if the background processor was requested to stop.
305                         if $loop_exit_check {
306                                 log_trace!($logger, "Terminating background processor.");
307                                 break;
308                         }
309
310                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
311                         // see `await_start`'s use below.
312                         let mut await_start = None;
313                         if $check_slow_await { await_start = Some($get_timer(1)); }
314                         let updates_available = $await;
315                         let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
316
317                         // Exit the loop if the background processor was requested to stop.
318                         if $loop_exit_check {
319                                 log_trace!($logger, "Terminating background processor.");
320                                 break;
321                         }
322
323                         if updates_available {
324                                 log_trace!($logger, "Persisting ChannelManager...");
325                                 $persister.persist_manager(&*$channel_manager)?;
326                                 log_trace!($logger, "Done persisting ChannelManager.");
327                         }
328                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
329                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
330                                 $channel_manager.timer_tick_occurred();
331                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
332                         }
333                         if await_slow {
334                                 // On various platforms, we may be starved of CPU cycles for several reasons.
335                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
336                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
337                                 // may not get any cycles.
338                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
339                                 // full second, at which point we assume sockets may have been killed (they
340                                 // appear to be at least on some platforms, even if it has only been a second).
341                                 // Note that we have to take care to not get here just because user event
342                                 // processing was slow at the top of the loop. For example, the sample client
343                                 // may call Bitcoin Core RPCs during event handling, which very often takes
344                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
345                                 // peers.
346                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
347                                 $peer_manager.as_ref().disconnect_all_peers();
348                                 last_ping_call = $get_timer(PING_TIMER);
349                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
350                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
351                                 $peer_manager.as_ref().timer_tick_occurred();
352                                 last_ping_call = $get_timer(PING_TIMER);
353                         }
354
355                         // Note that we want to run a graph prune once not long after startup before
356                         // falling back to our usual hourly prunes. This avoids short-lived clients never
357                         // pruning their network graph. We run once 60 seconds after startup before
358                         // continuing our normal cadence. For RGS, since 60 seconds is likely too long,
359                         // we prune after an initial sync completes.
360                         let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
361                         let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer);
362                         let should_prune = match $gossip_sync {
363                                 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
364                                 _ => prune_timer_elapsed,
365                         };
366                         if should_prune {
367                                 // The network graph must not be pruned while rapid sync completion is pending
368                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
369                                         #[cfg(feature = "std")] {
370                                                 log_trace!($logger, "Pruning and persisting network graph.");
371                                                 network_graph.remove_stale_channels_and_tracking();
372                                         }
373                                         #[cfg(not(feature = "std"))] {
374                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
375                                                 log_trace!($logger, "Persisting network graph.");
376                                         }
377
378                                         if let Err(e) = $persister.persist_graph(network_graph) {
379                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
380                                         }
381
382                                         have_pruned = true;
383                                 }
384                                 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
385                                 last_prune_call = $get_timer(prune_timer);
386                         }
387
388                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
389                                 if let Some(ref scorer) = $scorer {
390                                         log_trace!($logger, "Persisting scorer");
391                                         if let Err(e) = $persister.persist_scorer(&scorer) {
392                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
393                                         }
394                                 }
395                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
396                         }
397
398                         if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
399                                 log_trace!($logger, "Rebroadcasting monitor's pending claims");
400                                 $chain_monitor.rebroadcast_pending_claims();
401                                 last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
402                         }
403                 }
404
405                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
406                 // some races where users quit while channel updates were in-flight, with
407                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
408                 $persister.persist_manager(&*$channel_manager)?;
409
410                 // Persist Scorer on exit
411                 if let Some(ref scorer) = $scorer {
412                         $persister.persist_scorer(&scorer)?;
413                 }
414
415                 // Persist NetworkGraph on exit
416                 if let Some(network_graph) = $gossip_sync.network_graph() {
417                         $persister.persist_graph(network_graph)?;
418                 }
419
420                 Ok(())
421         } }
422 }
423
424 #[cfg(feature = "futures")]
425 pub(crate) mod futures_util {
426         use core::future::Future;
427         use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
428         use core::pin::Pin;
429         use core::marker::Unpin;
430         pub(crate) struct Selector<
431                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
432         > {
433                 pub a: A,
434                 pub b: B,
435                 pub c: C,
436         }
437         pub(crate) enum SelectorOutput {
438                 A, B, C(bool),
439         }
440
441         impl<
442                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
443         > Future for Selector<A, B, C> {
444                 type Output = SelectorOutput;
445                 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
446                         match Pin::new(&mut self.a).poll(ctx) {
447                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
448                                 Poll::Pending => {},
449                         }
450                         match Pin::new(&mut self.b).poll(ctx) {
451                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
452                                 Poll::Pending => {},
453                         }
454                         match Pin::new(&mut self.c).poll(ctx) {
455                                 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
456                                 Poll::Pending => {},
457                         }
458                         Poll::Pending
459                 }
460         }
461
462         // If we want to poll a future without an async context to figure out if it has completed or
463         // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
464         // but sadly there's a good bit of boilerplate here.
465         fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
466         fn dummy_waker_action(_: *const ()) { }
467
468         const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
469                 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
470         pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
471 }
472 #[cfg(feature = "futures")]
473 use futures_util::{Selector, SelectorOutput, dummy_waker};
474 #[cfg(feature = "futures")]
475 use core::task;
476
477 /// Processes background events in a future.
478 ///
479 /// `sleeper` should return a future which completes in the given amount of time and returns a
480 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
481 /// future which outputs `true`, the loop will exit and this function's future will complete.
482 /// The `sleeper` future is free to return early after it has triggered the exit condition.
483 ///
484 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
485 ///
486 /// Requires the `futures` feature. Note that while this method is available without the `std`
487 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
488 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
489 /// manually instead.
490 ///
491 /// The `mobile_interruptable_platform` flag should be set if we're currently running on a
492 /// mobile device, where we may need to check for interruption of the application regularly. If you
493 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
494 /// are hundreds or thousands of simultaneous process calls running.
495 ///
496 /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
497 /// could setup `process_events_async` like this:
498 /// ```
499 /// # struct MyPersister {}
500 /// # impl lightning::util::persist::KVStorePersister for MyPersister {
501 /// #     fn persist<W: lightning::util::ser::Writeable>(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) }
502 /// # }
503 /// # struct MyEventHandler {}
504 /// # impl MyEventHandler {
505 /// #     async fn handle_event(&self, _: lightning::events::Event) {}
506 /// # }
507 /// # #[derive(Eq, PartialEq, Clone, Hash)]
508 /// # struct MySocketDescriptor {}
509 /// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
510 /// #     fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
511 /// #     fn disconnect_socket(&mut self) {}
512 /// # }
513 /// # use std::sync::{Arc, Mutex};
514 /// # use std::sync::atomic::{AtomicBool, Ordering};
515 /// # use lightning_background_processor::{process_events_async, GossipSync};
516 /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
517 /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
518 /// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
519 /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
520 /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
521 /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
522 /// # type MyMessageRouter = dyn lightning::onion_message::MessageRouter + Send + Sync;
523 /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyPersister>>;
524 /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger, MyMessageRouter>;
525 /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
526 /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
527 /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
528 /// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
529 ///
530 /// # async fn setup_background_processing(my_persister: Arc<MyPersister>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
531 ///     let background_persister = Arc::clone(&my_persister);
532 ///     let background_event_handler = Arc::clone(&my_event_handler);
533 ///     let background_chain_mon = Arc::clone(&my_chain_monitor);
534 ///     let background_chan_man = Arc::clone(&my_channel_manager);
535 ///     let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
536 ///     let background_peer_man = Arc::clone(&my_peer_manager);
537 ///     let background_logger = Arc::clone(&my_logger);
538 ///     let background_scorer = Arc::clone(&my_scorer);
539 ///
540 ///     // Setup the sleeper.
541 ///     let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
542 ///
543 ///     let sleeper = move |d| {
544 ///             let mut receiver = stop_receiver.clone();
545 ///             Box::pin(async move {
546 ///                     tokio::select!{
547 ///                             _ = tokio::time::sleep(d) => false,
548 ///                             _ = receiver.changed() => true,
549 ///                     }
550 ///             })
551 ///     };
552 ///
553 ///     let mobile_interruptable_platform = false;
554 ///
555 ///     let handle = tokio::spawn(async move {
556 ///             process_events_async(
557 ///                     background_persister,
558 ///                     |e| background_event_handler.handle_event(e),
559 ///                     background_chain_mon,
560 ///                     background_chan_man,
561 ///                     background_gossip_sync,
562 ///                     background_peer_man,
563 ///                     background_logger,
564 ///                     Some(background_scorer),
565 ///                     sleeper,
566 ///                     mobile_interruptable_platform,
567 ///                     )
568 ///                     .await
569 ///                     .expect("Failed to process events");
570 ///     });
571 ///
572 ///     // Stop the background processing.
573 ///     stop_sender.send(()).unwrap();
574 ///     handle.await.unwrap();
575 ///     # }
576 ///```
577 #[cfg(feature = "futures")]
578 pub async fn process_events_async<
579         'a,
580         UL: 'static + Deref + Send + Sync,
581         CF: 'static + Deref + Send + Sync,
582         CW: 'static + Deref + Send + Sync,
583         T: 'static + Deref + Send + Sync,
584         ES: 'static + Deref + Send + Sync,
585         NS: 'static + Deref + Send + Sync,
586         SP: 'static + Deref + Send + Sync,
587         F: 'static + Deref + Send + Sync,
588         R: 'static + Deref + Send + Sync,
589         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
590         L: 'static + Deref + Send + Sync,
591         P: 'static + Deref + Send + Sync,
592         EventHandlerFuture: core::future::Future<Output = ()>,
593         EventHandler: Fn(Event) -> EventHandlerFuture,
594         PS: 'static + Deref + Send,
595         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
596         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
597         PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
598         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
599         APM: APeerManager + Send + Sync,
600         PM: 'static + Deref<Target = APM> + Send + Sync,
601         S: 'static + Deref<Target = SC> + Send + Sync,
602         SC: for<'b> WriteableScore<'b>,
603         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
604         Sleeper: Fn(Duration) -> SleepFuture
605 >(
606         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
607         gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
608         sleeper: Sleeper, mobile_interruptable_platform: bool,
609 ) -> Result<(), lightning::io::Error>
610 where
611         UL::Target: 'static + UtxoLookup,
612         CF::Target: 'static + chain::Filter,
613         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
614         T::Target: 'static + BroadcasterInterface,
615         ES::Target: 'static + EntropySource,
616         NS::Target: 'static + NodeSigner,
617         SP::Target: 'static + SignerProvider,
618         F::Target: 'static + FeeEstimator,
619         R::Target: 'static + Router,
620         L::Target: 'static + Logger,
621         P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
622         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
623 {
624         let mut should_break = false;
625         let async_event_handler = |event| {
626                 let network_graph = gossip_sync.network_graph();
627                 let event_handler = &event_handler;
628                 let scorer = &scorer;
629                 let logger = &logger;
630                 let persister = &persister;
631                 async move {
632                         if let Some(network_graph) = network_graph {
633                                 handle_network_graph_update(network_graph, &event)
634                         }
635                         if let Some(ref scorer) = scorer {
636                                 if update_scorer(scorer, &event) {
637                                         log_trace!(logger, "Persisting scorer after update");
638                                         if let Err(e) = persister.persist_scorer(&scorer) {
639                                                 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
640                                         }
641                                 }
642                         }
643                         event_handler(event).await;
644                 }
645         };
646         define_run_body!(persister,
647                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
648                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
649                 gossip_sync, peer_manager, logger, scorer, should_break, {
650                         let fut = Selector {
651                                 a: channel_manager.get_persistable_update_future(),
652                                 b: chain_monitor.get_update_future(),
653                                 c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
654                         };
655                         match fut.await {
656                                 SelectorOutput::A => true,
657                                 SelectorOutput::B => false,
658                                 SelectorOutput::C(exit) => {
659                                         should_break = exit;
660                                         false
661                                 }
662                         }
663                 }, |t| sleeper(Duration::from_secs(t)),
664                 |fut: &mut SleepFuture, _| {
665                         let mut waker = dummy_waker();
666                         let mut ctx = task::Context::from_waker(&mut waker);
667                         match core::pin::Pin::new(fut).poll(&mut ctx) {
668                                 task::Poll::Ready(exit) => { should_break = exit; true },
669                                 task::Poll::Pending => false,
670                         }
671                 }, mobile_interruptable_platform)
672 }
673
674 #[cfg(feature = "std")]
675 impl BackgroundProcessor {
676         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
677         /// documentation].
678         ///
679         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
680         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
681         /// either [`join`] or [`stop`].
682         ///
683         /// # Data Persistence
684         ///
685         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
686         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
687         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
688         /// provided implementation.
689         ///
690         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
691         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
692         /// See the `lightning-persister` crate for LDK's provided implementation.
693         ///
694         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
695         /// error or call [`join`] and handle any error that may arise. For the latter case,
696         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
697         ///
698         /// # Event Handling
699         ///
700         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
701         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
702         /// functionality implemented by other handlers.
703         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
704         ///
705         /// # Rapid Gossip Sync
706         ///
707         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
708         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
709         /// until the [`RapidGossipSync`] instance completes its first sync.
710         ///
711         /// [top-level documentation]: BackgroundProcessor
712         /// [`join`]: Self::join
713         /// [`stop`]: Self::stop
714         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
715         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
716         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
717         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
718         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
719         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
720         pub fn start<
721                 'a,
722                 UL: 'static + Deref + Send + Sync,
723                 CF: 'static + Deref + Send + Sync,
724                 CW: 'static + Deref + Send + Sync,
725                 T: 'static + Deref + Send + Sync,
726                 ES: 'static + Deref + Send + Sync,
727                 NS: 'static + Deref + Send + Sync,
728                 SP: 'static + Deref + Send + Sync,
729                 F: 'static + Deref + Send + Sync,
730                 R: 'static + Deref + Send + Sync,
731                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
732                 L: 'static + Deref + Send + Sync,
733                 P: 'static + Deref + Send + Sync,
734                 EH: 'static + EventHandler + Send,
735                 PS: 'static + Deref + Send,
736                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
737                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
738                 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
739                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
740                 APM: APeerManager + Send + Sync,
741                 PM: 'static + Deref<Target = APM> + Send + Sync,
742                 S: 'static + Deref<Target = SC> + Send + Sync,
743                 SC: for <'b> WriteableScore<'b>,
744         >(
745                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
746                 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
747         ) -> Self
748         where
749                 UL::Target: 'static + UtxoLookup,
750                 CF::Target: 'static + chain::Filter,
751                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
752                 T::Target: 'static + BroadcasterInterface,
753                 ES::Target: 'static + EntropySource,
754                 NS::Target: 'static + NodeSigner,
755                 SP::Target: 'static + SignerProvider,
756                 F::Target: 'static + FeeEstimator,
757                 R::Target: 'static + Router,
758                 L::Target: 'static + Logger,
759                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
760                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
761         {
762                 let stop_thread = Arc::new(AtomicBool::new(false));
763                 let stop_thread_clone = stop_thread.clone();
764                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
765                         let event_handler = |event| {
766                                 let network_graph = gossip_sync.network_graph();
767                                 if let Some(network_graph) = network_graph {
768                                         handle_network_graph_update(network_graph, &event)
769                                 }
770                                 if let Some(ref scorer) = scorer {
771                                         if update_scorer(scorer, &event) {
772                                                 log_trace!(logger, "Persisting scorer after update");
773                                                 if let Err(e) = persister.persist_scorer(&scorer) {
774                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
775                                                 }
776                                         }
777                                 }
778                                 event_handler.handle_event(event);
779                         };
780                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
781                                 channel_manager, channel_manager.process_pending_events(&event_handler),
782                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
783                                 Sleeper::from_two_futures(
784                                         channel_manager.get_persistable_update_future(),
785                                         chain_monitor.get_update_future()
786                                 ).wait_timeout(Duration::from_millis(100)),
787                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
788                 });
789                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
790         }
791
792         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
793         /// [`ChannelManager`].
794         ///
795         /// # Panics
796         ///
797         /// This function panics if the background thread has panicked such as while persisting or
798         /// handling events.
799         ///
800         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
801         pub fn join(mut self) -> Result<(), std::io::Error> {
802                 assert!(self.thread_handle.is_some());
803                 self.join_thread()
804         }
805
806         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
807         /// [`ChannelManager`].
808         ///
809         /// # Panics
810         ///
811         /// This function panics if the background thread has panicked such as while persisting or
812         /// handling events.
813         ///
814         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
815         pub fn stop(mut self) -> Result<(), std::io::Error> {
816                 assert!(self.thread_handle.is_some());
817                 self.stop_and_join_thread()
818         }
819
820         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
821                 self.stop_thread.store(true, Ordering::Release);
822                 self.join_thread()
823         }
824
825         fn join_thread(&mut self) -> Result<(), std::io::Error> {
826                 match self.thread_handle.take() {
827                         Some(handle) => handle.join().unwrap(),
828                         None => Ok(()),
829                 }
830         }
831 }
832
833 #[cfg(feature = "std")]
834 impl Drop for BackgroundProcessor {
835         fn drop(&mut self) {
836                 self.stop_and_join_thread().unwrap();
837         }
838 }
839
840 #[cfg(all(feature = "std", test))]
841 mod tests {
842         use bitcoin::blockdata::constants::{genesis_block, ChainHash};
843         use bitcoin::blockdata::locktime::PackedLockTime;
844         use bitcoin::blockdata::transaction::{Transaction, TxOut};
845         use bitcoin::network::constants::Network;
846         use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
847         use lightning::chain::{BestBlock, Confirm, chainmonitor};
848         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
849         use lightning::sign::{InMemorySigner, KeysManager};
850         use lightning::chain::transaction::OutPoint;
851         use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
852         use lightning::{get_event_msg, get_event};
853         use lightning::ln::PaymentHash;
854         use lightning::ln::channelmanager;
855         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
856         use lightning::ln::features::{ChannelFeatures, NodeFeatures};
857         use lightning::ln::functional_test_utils::*;
858         use lightning::ln::msgs::{ChannelMessageHandler, Init};
859         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
860         use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
861         use lightning::routing::router::{DefaultRouter, Path, RouteHop};
862         use lightning::routing::scoring::{ChannelUsage, Score};
863         use lightning::util::config::UserConfig;
864         use lightning::util::ser::Writeable;
865         use lightning::util::test_utils;
866         use lightning::util::persist::KVStorePersister;
867         use lightning_persister::FilesystemPersister;
868         use std::collections::VecDeque;
869         use std::{fs, env};
870         use std::path::PathBuf;
871         use std::sync::{Arc, Mutex};
872         use std::sync::mpsc::SyncSender;
873         use std::time::Duration;
874         use lightning_rapid_gossip_sync::RapidGossipSync;
875         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
876
877         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
878
879         #[derive(Clone, Hash, PartialEq, Eq)]
880         struct TestDescriptor{}
881         impl SocketDescriptor for TestDescriptor {
882                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
883                         0
884                 }
885
886                 fn disconnect_socket(&mut self) {}
887         }
888
889         type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>, (), TestScorer>>, Arc<test_utils::TestLogger>>;
890
891         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
892
893         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
894         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
895
896         struct Node {
897                 node: Arc<ChannelManager>,
898                 p2p_gossip_sync: PGS,
899                 rapid_gossip_sync: RGS,
900                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
901                 chain_monitor: Arc<ChainMonitor>,
902                 persister: Arc<FilesystemPersister>,
903                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
904                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
905                 logger: Arc<test_utils::TestLogger>,
906                 best_block: BestBlock,
907                 scorer: Arc<Mutex<TestScorer>>,
908         }
909
910         impl Node {
911                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
912                         GossipSync::P2P(self.p2p_gossip_sync.clone())
913                 }
914
915                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
916                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
917                 }
918
919                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
920                         GossipSync::None
921                 }
922         }
923
924         impl Drop for Node {
925                 fn drop(&mut self) {
926                         let data_dir = self.persister.get_data_dir();
927                         match fs::remove_dir_all(data_dir.clone()) {
928                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
929                                 _ => {}
930                         }
931                 }
932         }
933
934         struct Persister {
935                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
936                 graph_persistence_notifier: Option<SyncSender<()>>,
937                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
938                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
939                 filesystem_persister: FilesystemPersister,
940         }
941
942         impl Persister {
943                 fn new(data_dir: String) -> Self {
944                         let filesystem_persister = FilesystemPersister::new(data_dir);
945                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
946                 }
947
948                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
949                         Self { graph_error: Some((error, message)), ..self }
950                 }
951
952                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
953                         Self { graph_persistence_notifier: Some(sender), ..self }
954                 }
955
956                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
957                         Self { manager_error: Some((error, message)), ..self }
958                 }
959
960                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
961                         Self { scorer_error: Some((error, message)), ..self }
962                 }
963         }
964
965         impl KVStorePersister for Persister {
966                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
967                         if key == "manager" {
968                                 if let Some((error, message)) = self.manager_error {
969                                         return Err(std::io::Error::new(error, message))
970                                 }
971                         }
972
973                         if key == "network_graph" {
974                                 if let Some(sender) = &self.graph_persistence_notifier {
975                                         match sender.send(()) {
976                                                 Ok(()) => {},
977                                                 Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
978                                         }
979                                 };
980
981                                 if let Some((error, message)) = self.graph_error {
982                                         return Err(std::io::Error::new(error, message))
983                                 }
984                         }
985
986                         if key == "scorer" {
987                                 if let Some((error, message)) = self.scorer_error {
988                                         return Err(std::io::Error::new(error, message))
989                                 }
990                         }
991
992                         self.filesystem_persister.persist(key, object)
993                 }
994         }
995
996         struct TestScorer {
997                 event_expectations: Option<VecDeque<TestResult>>,
998         }
999
1000         #[derive(Debug)]
1001         enum TestResult {
1002                 PaymentFailure { path: Path, short_channel_id: u64 },
1003                 PaymentSuccess { path: Path },
1004                 ProbeFailure { path: Path },
1005                 ProbeSuccess { path: Path },
1006         }
1007
1008         impl TestScorer {
1009                 fn new() -> Self {
1010                         Self { event_expectations: None }
1011                 }
1012
1013                 fn expect(&mut self, expectation: TestResult) {
1014                         self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
1015                 }
1016         }
1017
1018         impl lightning::util::ser::Writeable for TestScorer {
1019                 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
1020         }
1021
1022         impl Score for TestScorer {
1023                 type ScoreParams = ();
1024                 fn channel_penalty_msat(
1025                         &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams
1026                 ) -> u64 { unimplemented!(); }
1027
1028                 fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
1029                         if let Some(expectations) = &mut self.event_expectations {
1030                                 match expectations.pop_front().unwrap() {
1031                                         TestResult::PaymentFailure { path, short_channel_id } => {
1032                                                 assert_eq!(actual_path, &path);
1033                                                 assert_eq!(actual_short_channel_id, short_channel_id);
1034                                         },
1035                                         TestResult::PaymentSuccess { path } => {
1036                                                 panic!("Unexpected successful payment path: {:?}", path)
1037                                         },
1038                                         TestResult::ProbeFailure { path } => {
1039                                                 panic!("Unexpected probe failure: {:?}", path)
1040                                         },
1041                                         TestResult::ProbeSuccess { path } => {
1042                                                 panic!("Unexpected probe success: {:?}", path)
1043                                         }
1044                                 }
1045                         }
1046                 }
1047
1048                 fn payment_path_successful(&mut self, actual_path: &Path) {
1049                         if let Some(expectations) = &mut self.event_expectations {
1050                                 match expectations.pop_front().unwrap() {
1051                                         TestResult::PaymentFailure { path, .. } => {
1052                                                 panic!("Unexpected payment path failure: {:?}", path)
1053                                         },
1054                                         TestResult::PaymentSuccess { path } => {
1055                                                 assert_eq!(actual_path, &path);
1056                                         },
1057                                         TestResult::ProbeFailure { path } => {
1058                                                 panic!("Unexpected probe failure: {:?}", path)
1059                                         },
1060                                         TestResult::ProbeSuccess { path } => {
1061                                                 panic!("Unexpected probe success: {:?}", path)
1062                                         }
1063                                 }
1064                         }
1065                 }
1066
1067                 fn probe_failed(&mut self, actual_path: &Path, _: u64) {
1068                         if let Some(expectations) = &mut self.event_expectations {
1069                                 match expectations.pop_front().unwrap() {
1070                                         TestResult::PaymentFailure { path, .. } => {
1071                                                 panic!("Unexpected payment path failure: {:?}", path)
1072                                         },
1073                                         TestResult::PaymentSuccess { path } => {
1074                                                 panic!("Unexpected payment path success: {:?}", path)
1075                                         },
1076                                         TestResult::ProbeFailure { path } => {
1077                                                 assert_eq!(actual_path, &path);
1078                                         },
1079                                         TestResult::ProbeSuccess { path } => {
1080                                                 panic!("Unexpected probe success: {:?}", path)
1081                                         }
1082                                 }
1083                         }
1084                 }
1085                 fn probe_successful(&mut self, actual_path: &Path) {
1086                         if let Some(expectations) = &mut self.event_expectations {
1087                                 match expectations.pop_front().unwrap() {
1088                                         TestResult::PaymentFailure { path, .. } => {
1089                                                 panic!("Unexpected payment path failure: {:?}", path)
1090                                         },
1091                                         TestResult::PaymentSuccess { path } => {
1092                                                 panic!("Unexpected payment path success: {:?}", path)
1093                                         },
1094                                         TestResult::ProbeFailure { path } => {
1095                                                 panic!("Unexpected probe failure: {:?}", path)
1096                                         },
1097                                         TestResult::ProbeSuccess { path } => {
1098                                                 assert_eq!(actual_path, &path);
1099                                         }
1100                                 }
1101                         }
1102                 }
1103         }
1104
1105         impl Drop for TestScorer {
1106                 fn drop(&mut self) {
1107                         if std::thread::panicking() {
1108                                 return;
1109                         }
1110
1111                         if let Some(event_expectations) = &self.event_expectations {
1112                                 if !event_expectations.is_empty() {
1113                                         panic!("Unsatisfied event expectations: {:?}", event_expectations);
1114                                 }
1115                         }
1116                 }
1117         }
1118
1119         fn get_full_filepath(filepath: String, filename: String) -> String {
1120                 let mut path = PathBuf::from(filepath);
1121                 path.push(filename);
1122                 path.to_str().unwrap().to_string()
1123         }
1124
1125         fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
1126                 let persist_temp_path = env::temp_dir().join(persist_dir);
1127                 let persist_dir = persist_temp_path.to_string_lossy().to_string();
1128                 let network = Network::Bitcoin;
1129                 let mut nodes = Vec::new();
1130                 for i in 0..num_nodes {
1131                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
1132                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
1133                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1134                         let genesis_block = genesis_block(network);
1135                         let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1136                         let scorer = Arc::new(Mutex::new(TestScorer::new()));
1137                         let seed = [i as u8; 32];
1138                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ()));
1139                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
1140                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", &persist_dir, i)));
1141                         let now = Duration::from_secs(genesis_block.header.time as u64);
1142                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1143                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
1144                         let best_block = BestBlock::from_network(network);
1145                         let params = ChainParameters { network, best_block };
1146                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
1147                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1148                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1149                         let msg_handler = MessageHandler {
1150                                 chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))),
1151                                 route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
1152                                 onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
1153                         };
1154                         let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
1155                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
1156                         nodes.push(node);
1157                 }
1158
1159                 for i in 0..num_nodes {
1160                         for j in (i+1)..num_nodes {
1161                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init {
1162                                         features: nodes[j].node.init_features(), networks: None, remote_network_address: None
1163                                 }, true).unwrap();
1164                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init {
1165                                         features: nodes[i].node.init_features(), networks: None, remote_network_address: None
1166                                 }, false).unwrap();
1167                         }
1168                 }
1169
1170                 (persist_dir, nodes)
1171         }
1172
1173         macro_rules! open_channel {
1174                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1175                         begin_open_channel!($node_a, $node_b, $channel_value);
1176                         let events = $node_a.node.get_and_clear_pending_events();
1177                         assert_eq!(events.len(), 1);
1178                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1179                         $node_a.node.funding_transaction_generated(&temporary_channel_id, &$node_b.node.get_our_node_id(), tx.clone()).unwrap();
1180                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1181                         get_event!($node_b, Event::ChannelPending);
1182                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1183                         get_event!($node_a, Event::ChannelPending);
1184                         tx
1185                 }}
1186         }
1187
1188         macro_rules! begin_open_channel {
1189                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1190                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
1191                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1192                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1193                 }}
1194         }
1195
1196         macro_rules! handle_funding_generation_ready {
1197                 ($event: expr, $channel_value: expr) => {{
1198                         match $event {
1199                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1200                                         assert_eq!(channel_value_satoshis, $channel_value);
1201                                         assert_eq!(user_channel_id, 42);
1202
1203                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1204                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1205                                         }]};
1206                                         (temporary_channel_id, tx)
1207                                 },
1208                                 _ => panic!("Unexpected event"),
1209                         }
1210                 }}
1211         }
1212
1213         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1214                 for i in 1..=depth {
1215                         let prev_blockhash = node.best_block.block_hash();
1216                         let height = node.best_block.height() + 1;
1217                         let header = create_dummy_header(prev_blockhash, height);
1218                         let txdata = vec![(0, tx)];
1219                         node.best_block = BestBlock::new(header.block_hash(), height);
1220                         match i {
1221                                 1 => {
1222                                         node.node.transactions_confirmed(&header, &txdata, height);
1223                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1224                                 },
1225                                 x if x == depth => {
1226                                         node.node.best_block_updated(&header, height);
1227                                         node.chain_monitor.best_block_updated(&header, height);
1228                                 },
1229                                 _ => {},
1230                         }
1231                 }
1232         }
1233         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1234                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1235         }
1236
1237         #[test]
1238         fn test_background_processor() {
1239                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1240                 // updates. Also test that when new updates are available, the manager signals that it needs
1241                 // re-persistence and is successfully re-persisted.
1242                 let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
1243
1244                 // Go through the channel creation process so that each node has something to persist. Since
1245                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1246                 // avoid a race with processing events.
1247                 let tx = open_channel!(nodes[0], nodes[1], 100000);
1248
1249                 // Initiate the background processors to watch each node.
1250                 let data_dir = nodes[0].persister.get_data_dir();
1251                 let persister = Arc::new(Persister::new(data_dir));
1252                 let event_handler = |_: _| {};
1253                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1254
1255                 macro_rules! check_persisted_data {
1256                         ($node: expr, $filepath: expr) => {
1257                                 let mut expected_bytes = Vec::new();
1258                                 loop {
1259                                         expected_bytes.clear();
1260                                         match $node.write(&mut expected_bytes) {
1261                                                 Ok(()) => {
1262                                                         match std::fs::read($filepath) {
1263                                                                 Ok(bytes) => {
1264                                                                         if bytes == expected_bytes {
1265                                                                                 break
1266                                                                         } else {
1267                                                                                 continue
1268                                                                         }
1269                                                                 },
1270                                                                 Err(_) => continue
1271                                                         }
1272                                                 },
1273                                                 Err(e) => panic!("Unexpected error: {}", e)
1274                                         }
1275                                 }
1276                         }
1277                 }
1278
1279                 // Check that the initial channel manager data is persisted as expected.
1280                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
1281                 check_persisted_data!(nodes[0].node, filepath.clone());
1282
1283                 loop {
1284                         if !nodes[0].node.get_persistence_condvar_value() { break }
1285                 }
1286
1287                 // Force-close the channel.
1288                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1289
1290                 // Check that the force-close updates are persisted.
1291                 check_persisted_data!(nodes[0].node, filepath.clone());
1292                 loop {
1293                         if !nodes[0].node.get_persistence_condvar_value() { break }
1294                 }
1295
1296                 // Check network graph is persisted
1297                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
1298                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1299
1300                 // Check scorer is persisted
1301                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
1302                 check_persisted_data!(nodes[0].scorer, filepath.clone());
1303
1304                 if !std::thread::panicking() {
1305                         bg_processor.stop().unwrap();
1306                 }
1307         }
1308
1309         #[test]
1310         fn test_timer_tick_called() {
1311                 // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1312                 // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
1313                 // `PeerManager::timer_tick_occurred` every `PING_TIMER`.
1314                 let (_, nodes) = create_nodes(1, "test_timer_tick_called");
1315                 let data_dir = nodes[0].persister.get_data_dir();
1316                 let persister = Arc::new(Persister::new(data_dir));
1317                 let event_handler = |_: _| {};
1318                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1319                 loop {
1320                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1321                         let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1322                         let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1323                         let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1324                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() &&
1325                                 log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() &&
1326                                 log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() {
1327                                 break
1328                         }
1329                 }
1330
1331                 if !std::thread::panicking() {
1332                         bg_processor.stop().unwrap();
1333                 }
1334         }
1335
1336         #[test]
1337         fn test_channel_manager_persist_error() {
1338                 // Test that if we encounter an error during manager persistence, the thread panics.
1339                 let (_, nodes) = create_nodes(2, "test_persist_error");
1340                 open_channel!(nodes[0], nodes[1], 100000);
1341
1342                 let data_dir = nodes[0].persister.get_data_dir();
1343                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1344                 let event_handler = |_: _| {};
1345                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1346                 match bg_processor.join() {
1347                         Ok(_) => panic!("Expected error persisting manager"),
1348                         Err(e) => {
1349                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1350                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1351                         },
1352                 }
1353         }
1354
1355         #[tokio::test]
1356         #[cfg(feature = "futures")]
1357         async fn test_channel_manager_persist_error_async() {
1358                 // Test that if we encounter an error during manager persistence, the thread panics.
1359                 let (_, nodes) = create_nodes(2, "test_persist_error_sync");
1360                 open_channel!(nodes[0], nodes[1], 100000);
1361
1362                 let data_dir = nodes[0].persister.get_data_dir();
1363                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1364
1365                 let bp_future = super::process_events_async(
1366                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1367                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1368                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1369                                 Box::pin(async move {
1370                                         tokio::time::sleep(dur).await;
1371                                         false // Never exit
1372                                 })
1373                         }, false,
1374                 );
1375                 match bp_future.await {
1376                         Ok(_) => panic!("Expected error persisting manager"),
1377                         Err(e) => {
1378                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1379                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1380                         },
1381                 }
1382         }
1383
1384         #[test]
1385         fn test_network_graph_persist_error() {
1386                 // Test that if we encounter an error during network graph persistence, an error gets returned.
1387                 let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
1388                 let data_dir = nodes[0].persister.get_data_dir();
1389                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1390                 let event_handler = |_: _| {};
1391                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1392
1393                 match bg_processor.stop() {
1394                         Ok(_) => panic!("Expected error persisting network graph"),
1395                         Err(e) => {
1396                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1397                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1398                         },
1399                 }
1400         }
1401
1402         #[test]
1403         fn test_scorer_persist_error() {
1404                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1405                 let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
1406                 let data_dir = nodes[0].persister.get_data_dir();
1407                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1408                 let event_handler = |_: _| {};
1409                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1410
1411                 match bg_processor.stop() {
1412                         Ok(_) => panic!("Expected error persisting scorer"),
1413                         Err(e) => {
1414                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1415                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1416                         },
1417                 }
1418         }
1419
1420         #[test]
1421         fn test_background_event_handling() {
1422                 let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
1423                 let channel_value = 100000;
1424                 let data_dir = nodes[0].persister.get_data_dir();
1425                 let persister = Arc::new(Persister::new(data_dir.clone()));
1426
1427                 // Set up a background event handler for FundingGenerationReady events.
1428                 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
1429                 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1430                 let event_handler = move |event: Event| match event {
1431                         Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1432                         Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1433                         Event::ChannelReady { .. } => {},
1434                         _ => panic!("Unexpected event: {:?}", event),
1435                 };
1436
1437                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1438
1439                 // Open a channel and check that the FundingGenerationReady event was handled.
1440                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1441                 let (temporary_channel_id, funding_tx) = funding_generation_recv
1442                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1443                         .expect("FundingGenerationReady not handled within deadline");
1444                 nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), funding_tx.clone()).unwrap();
1445                 nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id()));
1446                 get_event!(nodes[1], Event::ChannelPending);
1447                 nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id()));
1448                 let _ = channel_pending_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1449                         .expect("ChannelPending not handled within deadline");
1450
1451                 // Confirm the funding transaction.
1452                 confirm_transaction(&mut nodes[0], &funding_tx);
1453                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1454                 confirm_transaction(&mut nodes[1], &funding_tx);
1455                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1456                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1457                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1458                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1459                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1460
1461                 if !std::thread::panicking() {
1462                         bg_processor.stop().unwrap();
1463                 }
1464
1465                 // Set up a background event handler for SpendableOutputs events.
1466                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1467                 let event_handler = move |event: Event| match event {
1468                         Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1469                         Event::ChannelReady { .. } => {},
1470                         Event::ChannelClosed { .. } => {},
1471                         _ => panic!("Unexpected event: {:?}", event),
1472                 };
1473                 let persister = Arc::new(Persister::new(data_dir));
1474                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1475
1476                 // Force close the channel and check that the SpendableOutputs event was handled.
1477                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1478                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1479                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1480
1481                 let event = receiver
1482                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1483                         .expect("Events not handled within deadline");
1484                 match event {
1485                         Event::SpendableOutputs { .. } => {},
1486                         _ => panic!("Unexpected event: {:?}", event),
1487                 }
1488
1489                 if !std::thread::panicking() {
1490                         bg_processor.stop().unwrap();
1491                 }
1492         }
1493
1494         #[test]
1495         fn test_scorer_persistence() {
1496                 let (_, nodes) = create_nodes(2, "test_scorer_persistence");
1497                 let data_dir = nodes[0].persister.get_data_dir();
1498                 let persister = Arc::new(Persister::new(data_dir));
1499                 let event_handler = |_: _| {};
1500                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1501
1502                 loop {
1503                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1504                         let expected_log = "Persisting scorer".to_string();
1505                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1506                                 break
1507                         }
1508                 }
1509
1510                 if !std::thread::panicking() {
1511                         bg_processor.stop().unwrap();
1512                 }
1513         }
1514
1515         macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
1516                 ($nodes: expr, $receive: expr, $sleep: expr) => {
1517                         let features = ChannelFeatures::empty();
1518                         $nodes[0].network_graph.add_channel_from_partial_announcement(
1519                                 42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id()
1520                         ).expect("Failed to update channel from partial announcement");
1521                         let original_graph_description = $nodes[0].network_graph.to_string();
1522                         assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1523                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
1524
1525                         loop {
1526                                 $sleep;
1527                                 let log_entries = $nodes[0].logger.lines.lock().unwrap();
1528                                 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1529                                 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1530                                         .unwrap_or(&0) > 1
1531                                 {
1532                                         // Wait until the loop has gone around at least twice.
1533                                         break
1534                                 }
1535                         }
1536
1537                         let initialization_input = vec![
1538                                 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1539                                 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1540                                 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1541                                 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1542                                 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1543                                 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1544                                 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1545                                 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1546                                 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1547                                 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1548                                 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1549                                 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1550                                 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1551                         ];
1552                         $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1553
1554                         // this should have added two channels and pruned the previous one.
1555                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
1556
1557                         $receive.expect("Network graph not pruned within deadline");
1558
1559                         // all channels should now be pruned
1560                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
1561                 }
1562         }
1563
1564         #[test]
1565         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1566                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1567
1568                 let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
1569                 let data_dir = nodes[0].persister.get_data_dir();
1570                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1571
1572                 let event_handler = |_: _| {};
1573                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1574
1575                 do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
1576                         receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
1577                         std::thread::sleep(Duration::from_millis(1)));
1578
1579                 background_processor.stop().unwrap();
1580         }
1581
1582         #[tokio::test]
1583         #[cfg(feature = "futures")]
1584         async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
1585                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1586
1587                 let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
1588                 let data_dir = nodes[0].persister.get_data_dir();
1589                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1590
1591                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1592                 let bp_future = super::process_events_async(
1593                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1594                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1595                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1596                                 let mut exit_receiver = exit_receiver.clone();
1597                                 Box::pin(async move {
1598                                         tokio::select! {
1599                                                 _ = tokio::time::sleep(dur) => false,
1600                                                 _ = exit_receiver.changed() => true,
1601                                         }
1602                                 })
1603                         }, false,
1604                 );
1605
1606                 let t1 = tokio::spawn(bp_future);
1607                 let t2 = tokio::spawn(async move {
1608                         do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
1609                                 let mut i = 0;
1610                                 loop {
1611                                         tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await;
1612                                         if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); }
1613                                         assert!(i < 5);
1614                                         i += 1;
1615                                 }
1616                         }, tokio::time::sleep(Duration::from_millis(1)).await);
1617                         exit_sender.send(()).unwrap();
1618                 });
1619                 let (r1, r2) = tokio::join!(t1, t2);
1620                 r1.unwrap().unwrap();
1621                 r2.unwrap()
1622         }
1623
1624         macro_rules! do_test_payment_path_scoring {
1625                 ($nodes: expr, $receive: expr) => {
1626                         // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1627                         // that we update the scorer upon a payment path succeeding (note that the channel must be
1628                         // public or else we won't score it).
1629                         // A background event handler for FundingGenerationReady events must be hooked up to a
1630                         // running background processor.
1631                         let scored_scid = 4242;
1632                         let secp_ctx = Secp256k1::new();
1633                         let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1634                         let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1635
1636                         let path = Path { hops: vec![RouteHop {
1637                                 pubkey: node_1_id,
1638                                 node_features: NodeFeatures::empty(),
1639                                 short_channel_id: scored_scid,
1640                                 channel_features: ChannelFeatures::empty(),
1641                                 fee_msat: 0,
1642                                 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1643                         }], blinded_tail: None };
1644
1645                         $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1646                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1647                                 payment_id: None,
1648                                 payment_hash: PaymentHash([42; 32]),
1649                                 payment_failed_permanently: false,
1650                                 failure: PathFailure::OnPath { network_update: None },
1651                                 path: path.clone(),
1652                                 short_channel_id: Some(scored_scid),
1653                         });
1654                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1655                         match event {
1656                                 Event::PaymentPathFailed { .. } => {},
1657                                 _ => panic!("Unexpected event"),
1658                         }
1659
1660                         // Ensure we'll score payments that were explicitly failed back by the destination as
1661                         // ProbeSuccess.
1662                         $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1663                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1664                                 payment_id: None,
1665                                 payment_hash: PaymentHash([42; 32]),
1666                                 payment_failed_permanently: true,
1667                                 failure: PathFailure::OnPath { network_update: None },
1668                                 path: path.clone(),
1669                                 short_channel_id: None,
1670                         });
1671                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1672                         match event {
1673                                 Event::PaymentPathFailed { .. } => {},
1674                                 _ => panic!("Unexpected event"),
1675                         }
1676
1677                         $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1678                         $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1679                                 payment_id: PaymentId([42; 32]),
1680                                 payment_hash: None,
1681                                 path: path.clone(),
1682                         });
1683                         let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
1684                         match event {
1685                                 Event::PaymentPathSuccessful { .. } => {},
1686                                 _ => panic!("Unexpected event"),
1687                         }
1688
1689                         $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1690                         $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1691                                 payment_id: PaymentId([42; 32]),
1692                                 payment_hash: PaymentHash([42; 32]),
1693                                 path: path.clone(),
1694                         });
1695                         let event = $receive.expect("ProbeSuccessful not handled within deadline");
1696                         match event {
1697                                 Event::ProbeSuccessful  { .. } => {},
1698                                 _ => panic!("Unexpected event"),
1699                         }
1700
1701                         $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1702                         $nodes[0].node.push_pending_event(Event::ProbeFailed {
1703                                 payment_id: PaymentId([42; 32]),
1704                                 payment_hash: PaymentHash([42; 32]),
1705                                 path,
1706                                 short_channel_id: Some(scored_scid),
1707                         });
1708                         let event = $receive.expect("ProbeFailure not handled within deadline");
1709                         match event {
1710                                 Event::ProbeFailed { .. } => {},
1711                                 _ => panic!("Unexpected event"),
1712                         }
1713                 }
1714         }
1715
1716         #[test]
1717         fn test_payment_path_scoring() {
1718                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1719                 let event_handler = move |event: Event| match event {
1720                         Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1721                         Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1722                         Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1723                         Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1724                         _ => panic!("Unexpected event: {:?}", event),
1725                 };
1726
1727                 let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
1728                 let data_dir = nodes[0].persister.get_data_dir();
1729                 let persister = Arc::new(Persister::new(data_dir));
1730                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1731
1732                 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
1733
1734                 if !std::thread::panicking() {
1735                         bg_processor.stop().unwrap();
1736                 }
1737
1738                 let log_entries = nodes[0].logger.lines.lock().unwrap();
1739                 let expected_log = "Persisting scorer after update".to_string();
1740                 assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5);
1741         }
1742
1743         #[tokio::test]
1744         #[cfg(feature = "futures")]
1745         async fn test_payment_path_scoring_async() {
1746                 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
1747                 let event_handler = move |event: Event| {
1748                         let sender_ref = sender.clone();
1749                         async move {
1750                                 match event {
1751                                         Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
1752                                         Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1753                                         Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1754                                         Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
1755                                         _ => panic!("Unexpected event: {:?}", event),
1756                                 }
1757                         }
1758                 };
1759
1760                 let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
1761                 let data_dir = nodes[0].persister.get_data_dir();
1762                 let persister = Arc::new(Persister::new(data_dir));
1763
1764                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1765
1766                 let bp_future = super::process_events_async(
1767                         persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1768                         nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1769                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1770                                 let mut exit_receiver = exit_receiver.clone();
1771                                 Box::pin(async move {
1772                                         tokio::select! {
1773                                                 _ = tokio::time::sleep(dur) => false,
1774                                                 _ = exit_receiver.changed() => true,
1775                                         }
1776                                 })
1777                         }, false,
1778                 );
1779                 let t1 = tokio::spawn(bp_future);
1780                 let t2 = tokio::spawn(async move {
1781                         do_test_payment_path_scoring!(nodes, receiver.recv().await);
1782                         exit_sender.send(()).unwrap();
1783
1784                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1785                         let expected_log = "Persisting scorer after update".to_string();
1786                         assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5);
1787                 });
1788
1789                 let (r1, r2) = tokio::join!(t1, t2);
1790                 r1.unwrap().unwrap();
1791                 r2.unwrap()
1792         }
1793 }