1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 #![deny(broken_intra_doc_links)]
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
28 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 /// writing it to disk/backups by invoking the callback given to it at startup.
40 /// [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 /// at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
44 /// is provided to [`BackgroundProcessor::start`]).
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60 stop_thread: Arc<AtomicBool>,
61 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 const FRESHNESS_TIMER: u64 = 60;
67 const FRESHNESS_TIMER: u64 = 1;
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
77 const PING_TIMER: u64 = 1;
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
83 const SCORER_PERSIST_TIMER: u64 = 30;
85 const SCORER_PERSIST_TIMER: u64 = 1;
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
92 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
94 P: Deref<Target = P2PGossipSync<G, A, L>>,
95 R: Deref<Target = RapidGossipSync<G, L>>,
96 G: Deref<Target = NetworkGraph<L>>,
100 where A::Target: chain::Access, L::Target: Logger {
101 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
103 /// Rapid gossip sync from a trusted server.
110 P: Deref<Target = P2PGossipSync<G, A, L>>,
111 R: Deref<Target = RapidGossipSync<G, L>>,
112 G: Deref<Target = NetworkGraph<L>>,
115 > GossipSync<P, R, G, A, L>
116 where A::Target: chain::Access, L::Target: Logger {
117 fn network_graph(&self) -> Option<&G> {
119 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
120 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
121 GossipSync::None => None,
125 fn prunable_network_graph(&self) -> Option<&G> {
127 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
128 GossipSync::Rapid(gossip_sync) => {
129 if gossip_sync.is_initial_sync_complete() {
130 Some(gossip_sync.network_graph())
135 GossipSync::None => None,
140 /// (C-not exported) as the bindings concretize everything and have constructors for us
141 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
142 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
144 A::Target: chain::Access,
147 /// Initializes a new [`GossipSync::P2P`] variant.
148 pub fn p2p(gossip_sync: P) -> Self {
149 GossipSync::P2P(gossip_sync)
153 /// (C-not exported) as the bindings concretize everything and have constructors for us
154 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
156 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
159 &'a (dyn chain::Access + Send + Sync),
165 /// Initializes a new [`GossipSync::Rapid`] variant.
166 pub fn rapid(gossip_sync: R) -> Self {
167 GossipSync::Rapid(gossip_sync)
171 /// (C-not exported) as the bindings concretize everything and have constructors for us
174 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
175 &RapidGossipSync<&'a NetworkGraph<L>, L>,
177 &'a (dyn chain::Access + Send + Sync),
183 /// Initializes a new [`GossipSync::None`] variant.
184 pub fn none() -> Self {
189 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
190 struct DecoratingEventHandler<
193 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
194 RGS: Deref<Target = RapidGossipSync<G, L>>,
195 G: Deref<Target = NetworkGraph<L>>,
199 where A::Target: chain::Access, L::Target: Logger {
201 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
207 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
208 RGS: Deref<Target = RapidGossipSync<G, L>>,
209 G: Deref<Target = NetworkGraph<L>>,
212 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
213 where A::Target: chain::Access, L::Target: Logger {
214 fn handle_event(&self, event: &Event) {
215 if let Some(network_graph) = self.gossip_sync.network_graph() {
216 network_graph.handle_event(event);
218 self.event_handler.handle_event(event);
222 impl BackgroundProcessor {
223 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
226 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
227 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
228 /// either [`join`] or [`stop`].
230 /// # Data Persistence
232 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
233 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
234 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
235 /// provided implementation.
237 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
238 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
239 /// See the `lightning-persister` crate for LDK's provided implementation.
241 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
242 /// error or call [`join`] and handle any error that may arise. For the latter case,
243 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
247 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
248 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
249 /// functionality implemented by other handlers.
250 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
252 /// # Rapid Gossip Sync
254 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
255 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
256 /// until the [`RapidGossipSync`] instance completes its first sync.
258 /// [top-level documentation]: BackgroundProcessor
259 /// [`join`]: Self::join
260 /// [`stop`]: Self::stop
261 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
262 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
263 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
264 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
265 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
266 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
269 Signer: 'static + Sign,
270 CA: 'static + Deref + Send + Sync,
271 CF: 'static + Deref + Send + Sync,
272 CW: 'static + Deref + Send + Sync,
273 T: 'static + Deref + Send + Sync,
274 K: 'static + Deref + Send + Sync,
275 F: 'static + Deref + Send + Sync,
276 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
277 L: 'static + Deref + Send + Sync,
278 P: 'static + Deref + Send + Sync,
279 Descriptor: 'static + SocketDescriptor + Send + Sync,
280 CMH: 'static + Deref + Send + Sync,
281 RMH: 'static + Deref + Send + Sync,
282 EH: 'static + EventHandler + Send,
283 PS: 'static + Deref + Send,
284 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
285 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
286 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
287 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
288 UMH: 'static + Deref + Send + Sync,
289 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
290 S: 'static + Deref<Target = SC> + Send + Sync,
291 SC: WriteableScore<'a>,
293 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
294 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
297 CA::Target: 'static + chain::Access,
298 CF::Target: 'static + chain::Filter,
299 CW::Target: 'static + chain::Watch<Signer>,
300 T::Target: 'static + BroadcasterInterface,
301 K::Target: 'static + KeysInterface<Signer = Signer>,
302 F::Target: 'static + FeeEstimator,
303 L::Target: 'static + Logger,
304 P::Target: 'static + Persist<Signer>,
305 CMH::Target: 'static + ChannelMessageHandler,
306 RMH::Target: 'static + RoutingMessageHandler,
307 UMH::Target: 'static + CustomMessageHandler,
308 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
310 let stop_thread = Arc::new(AtomicBool::new(false));
311 let stop_thread_clone = stop_thread.clone();
312 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
313 let event_handler = DecoratingEventHandler {
315 gossip_sync: &gossip_sync,
318 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
319 channel_manager.timer_tick_occurred();
321 let mut last_freshness_call = Instant::now();
322 let mut last_ping_call = Instant::now();
323 let mut last_prune_call = Instant::now();
324 let mut last_scorer_persist_call = Instant::now();
325 let mut have_pruned = false;
328 channel_manager.process_pending_events(&event_handler);
329 chain_monitor.process_pending_events(&event_handler);
331 // Note that the PeerManager::process_events may block on ChannelManager's locks,
332 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
333 // we want to ensure we get into `persist_manager` as quickly as we can, especially
334 // without running the normal event processing above and handing events to users.
336 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
337 // processing a message effectively at any point during this loop. In order to
338 // minimize the time between such processing completing and persisting the updated
339 // ChannelManager, we want to minimize methods blocking on a ChannelManager
340 // generally, and as a fallback place such blocking only immediately before
342 peer_manager.process_events();
344 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
345 // see `await_start`'s use below.
346 let await_start = Instant::now();
347 let updates_available =
348 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
349 let await_time = await_start.elapsed();
351 if updates_available {
352 log_trace!(logger, "Persisting ChannelManager...");
353 persister.persist_manager(&*channel_manager)?;
354 log_trace!(logger, "Done persisting ChannelManager.");
356 // Exit the loop if the background processor was requested to stop.
357 if stop_thread.load(Ordering::Acquire) == true {
358 log_trace!(logger, "Terminating background processor.");
361 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
362 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
363 channel_manager.timer_tick_occurred();
364 last_freshness_call = Instant::now();
366 if await_time > Duration::from_secs(1) {
367 // On various platforms, we may be starved of CPU cycles for several reasons.
368 // E.g. on iOS, if we've been in the background, we will be entirely paused.
369 // Similarly, if we're on a desktop platform and the device has been asleep, we
370 // may not get any cycles.
371 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
372 // full second, at which point we assume sockets may have been killed (they
373 // appear to be at least on some platforms, even if it has only been a second).
374 // Note that we have to take care to not get here just because user event
375 // processing was slow at the top of the loop. For example, the sample client
376 // may call Bitcoin Core RPCs during event handling, which very often takes
377 // more than a handful of seconds to complete, and shouldn't disconnect all our
379 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
380 peer_manager.disconnect_all_peers();
381 last_ping_call = Instant::now();
382 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
383 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
384 peer_manager.timer_tick_occurred();
385 last_ping_call = Instant::now();
388 // Note that we want to run a graph prune once not long after startup before
389 // falling back to our usual hourly prunes. This avoids short-lived clients never
390 // pruning their network graph. We run once 60 seconds after startup before
391 // continuing our normal cadence.
392 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
393 // The network graph must not be pruned while rapid sync completion is pending
394 log_trace!(logger, "Assessing prunability of network graph");
395 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
396 network_graph.remove_stale_channels();
398 if let Err(e) = persister.persist_graph(network_graph) {
399 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
402 last_prune_call = Instant::now();
405 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
409 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
410 if let Some(ref scorer) = scorer {
411 log_trace!(logger, "Persisting scorer");
412 if let Err(e) = persister.persist_scorer(&scorer) {
413 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
416 last_scorer_persist_call = Instant::now();
420 // After we exit, ensure we persist the ChannelManager one final time - this avoids
421 // some races where users quit while channel updates were in-flight, with
422 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
423 persister.persist_manager(&*channel_manager)?;
425 // Persist Scorer on exit
426 if let Some(ref scorer) = scorer {
427 persister.persist_scorer(&scorer)?;
430 // Persist NetworkGraph on exit
431 if let Some(network_graph) = gossip_sync.network_graph() {
432 persister.persist_graph(network_graph)?;
437 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
440 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
441 /// [`ChannelManager`].
445 /// This function panics if the background thread has panicked such as while persisting or
448 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
449 pub fn join(mut self) -> Result<(), std::io::Error> {
450 assert!(self.thread_handle.is_some());
454 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
455 /// [`ChannelManager`].
459 /// This function panics if the background thread has panicked such as while persisting or
462 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
463 pub fn stop(mut self) -> Result<(), std::io::Error> {
464 assert!(self.thread_handle.is_some());
465 self.stop_and_join_thread()
468 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
469 self.stop_thread.store(true, Ordering::Release);
473 fn join_thread(&mut self) -> Result<(), std::io::Error> {
474 match self.thread_handle.take() {
475 Some(handle) => handle.join().unwrap(),
481 impl Drop for BackgroundProcessor {
483 self.stop_and_join_thread().unwrap();
489 use bitcoin::blockdata::block::BlockHeader;
490 use bitcoin::blockdata::constants::genesis_block;
491 use bitcoin::blockdata::transaction::{Transaction, TxOut};
492 use bitcoin::network::constants::Network;
493 use lightning::chain::{BestBlock, Confirm, chainmonitor};
494 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
495 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
496 use lightning::chain::transaction::OutPoint;
497 use lightning::get_event_msg;
498 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
499 use lightning::ln::features::{ChannelFeatures, InitFeatures};
500 use lightning::ln::msgs::{ChannelMessageHandler, Init};
501 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
502 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
503 use lightning::util::config::UserConfig;
504 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
505 use lightning::util::ser::Writeable;
506 use lightning::util::test_utils;
507 use lightning::util::persist::KVStorePersister;
508 use lightning_invoice::payment::{InvoicePayer, Retry};
509 use lightning_invoice::utils::DefaultRouter;
510 use lightning_persister::FilesystemPersister;
512 use std::path::PathBuf;
513 use std::sync::{Arc, Mutex};
514 use std::sync::mpsc::SyncSender;
515 use std::time::Duration;
516 use lightning::routing::scoring::{FixedPenaltyScorer};
517 use lightning_rapid_gossip_sync::RapidGossipSync;
518 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
520 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
522 #[derive(Clone, Eq, Hash, PartialEq)]
523 struct TestDescriptor{}
524 impl SocketDescriptor for TestDescriptor {
525 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
529 fn disconnect_socket(&mut self) {}
532 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
534 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
535 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
538 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
539 p2p_gossip_sync: PGS,
540 rapid_gossip_sync: RGS,
541 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
542 chain_monitor: Arc<ChainMonitor>,
543 persister: Arc<FilesystemPersister>,
544 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
545 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
546 logger: Arc<test_utils::TestLogger>,
547 best_block: BestBlock,
548 scorer: Arc<Mutex<FixedPenaltyScorer>>,
552 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
553 GossipSync::P2P(self.p2p_gossip_sync.clone())
556 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
557 GossipSync::Rapid(self.rapid_gossip_sync.clone())
560 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
567 let data_dir = self.persister.get_data_dir();
568 match fs::remove_dir_all(data_dir.clone()) {
569 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
576 graph_error: Option<(std::io::ErrorKind, &'static str)>,
577 graph_persistence_notifier: Option<SyncSender<()>>,
578 manager_error: Option<(std::io::ErrorKind, &'static str)>,
579 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
580 filesystem_persister: FilesystemPersister,
584 fn new(data_dir: String) -> Self {
585 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
586 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
589 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
590 Self { graph_error: Some((error, message)), ..self }
593 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
594 Self { graph_persistence_notifier: Some(sender), ..self }
597 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
598 Self { manager_error: Some((error, message)), ..self }
601 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
602 Self { scorer_error: Some((error, message)), ..self }
606 impl KVStorePersister for Persister {
607 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
608 if key == "manager" {
609 if let Some((error, message)) = self.manager_error {
610 return Err(std::io::Error::new(error, message))
614 if key == "network_graph" {
615 if let Some(sender) = &self.graph_persistence_notifier {
616 sender.send(()).unwrap();
619 if let Some((error, message)) = self.graph_error {
620 return Err(std::io::Error::new(error, message))
625 if let Some((error, message)) = self.scorer_error {
626 return Err(std::io::Error::new(error, message))
630 self.filesystem_persister.persist(key, object)
634 fn get_full_filepath(filepath: String, filename: String) -> String {
635 let mut path = PathBuf::from(filepath);
637 path.to_str().unwrap().to_string()
640 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
641 let mut nodes = Vec::new();
642 for i in 0..num_nodes {
643 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
644 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
645 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
646 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
647 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
648 let seed = [i as u8; 32];
649 let network = Network::Testnet;
650 let genesis_block = genesis_block(network);
651 let now = Duration::from_secs(genesis_block.header.time as u64);
652 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
653 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
654 let best_block = BestBlock::from_genesis(network);
655 let params = ChainParameters { network, best_block };
656 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
657 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
658 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
659 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
660 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
661 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
662 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
663 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
667 for i in 0..num_nodes {
668 for j in (i+1)..num_nodes {
669 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
670 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
677 macro_rules! open_channel {
678 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
679 begin_open_channel!($node_a, $node_b, $channel_value);
680 let events = $node_a.node.get_and_clear_pending_events();
681 assert_eq!(events.len(), 1);
682 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
683 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
688 macro_rules! begin_open_channel {
689 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
690 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
691 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
692 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
696 macro_rules! handle_funding_generation_ready {
697 ($event: expr, $channel_value: expr) => {{
699 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
700 assert_eq!(channel_value_satoshis, $channel_value);
701 assert_eq!(user_channel_id, 42);
703 let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
704 value: channel_value_satoshis, script_pubkey: output_script.clone(),
706 (temporary_channel_id, tx)
708 _ => panic!("Unexpected event"),
713 macro_rules! end_open_channel {
714 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
715 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
716 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
717 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
721 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
723 let prev_blockhash = node.best_block.block_hash();
724 let height = node.best_block.height() + 1;
725 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
726 let txdata = vec![(0, tx)];
727 node.best_block = BestBlock::new(header.block_hash(), height);
730 node.node.transactions_confirmed(&header, &txdata, height);
731 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
734 node.node.best_block_updated(&header, height);
735 node.chain_monitor.best_block_updated(&header, height);
741 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
742 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
746 fn test_background_processor() {
747 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
748 // updates. Also test that when new updates are available, the manager signals that it needs
749 // re-persistence and is successfully re-persisted.
750 let nodes = create_nodes(2, "test_background_processor".to_string());
752 // Go through the channel creation process so that each node has something to persist. Since
753 // open_channel consumes events, it must complete before starting BackgroundProcessor to
754 // avoid a race with processing events.
755 let tx = open_channel!(nodes[0], nodes[1], 100000);
757 // Initiate the background processors to watch each node.
758 let data_dir = nodes[0].persister.get_data_dir();
759 let persister = Arc::new(Persister::new(data_dir));
760 let event_handler = |_: &_| {};
761 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
763 macro_rules! check_persisted_data {
764 ($node: expr, $filepath: expr) => {
765 let mut expected_bytes = Vec::new();
767 expected_bytes.clear();
768 match $node.write(&mut expected_bytes) {
770 match std::fs::read($filepath) {
772 if bytes == expected_bytes {
781 Err(e) => panic!("Unexpected error: {}", e)
787 // Check that the initial channel manager data is persisted as expected.
788 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
789 check_persisted_data!(nodes[0].node, filepath.clone());
792 if !nodes[0].node.get_persistence_condvar_value() { break }
795 // Force-close the channel.
796 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
798 // Check that the force-close updates are persisted.
799 check_persisted_data!(nodes[0].node, filepath.clone());
801 if !nodes[0].node.get_persistence_condvar_value() { break }
804 // Check network graph is persisted
805 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
806 check_persisted_data!(nodes[0].network_graph, filepath.clone());
808 // Check scorer is persisted
809 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
810 check_persisted_data!(nodes[0].scorer, filepath.clone());
812 assert!(bg_processor.stop().is_ok());
816 fn test_timer_tick_called() {
817 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
818 // `FRESHNESS_TIMER`.
819 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
820 let data_dir = nodes[0].persister.get_data_dir();
821 let persister = Arc::new(Persister::new(data_dir));
822 let event_handler = |_: &_| {};
823 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
825 let log_entries = nodes[0].logger.lines.lock().unwrap();
826 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
827 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
828 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
829 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
834 assert!(bg_processor.stop().is_ok());
838 fn test_channel_manager_persist_error() {
839 // Test that if we encounter an error during manager persistence, the thread panics.
840 let nodes = create_nodes(2, "test_persist_error".to_string());
841 open_channel!(nodes[0], nodes[1], 100000);
843 let data_dir = nodes[0].persister.get_data_dir();
844 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
845 let event_handler = |_: &_| {};
846 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
847 match bg_processor.join() {
848 Ok(_) => panic!("Expected error persisting manager"),
850 assert_eq!(e.kind(), std::io::ErrorKind::Other);
851 assert_eq!(e.get_ref().unwrap().to_string(), "test");
857 fn test_network_graph_persist_error() {
858 // Test that if we encounter an error during network graph persistence, an error gets returned.
859 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
860 let data_dir = nodes[0].persister.get_data_dir();
861 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
862 let event_handler = |_: &_| {};
863 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
865 match bg_processor.stop() {
866 Ok(_) => panic!("Expected error persisting network graph"),
868 assert_eq!(e.kind(), std::io::ErrorKind::Other);
869 assert_eq!(e.get_ref().unwrap().to_string(), "test");
875 fn test_scorer_persist_error() {
876 // Test that if we encounter an error during scorer persistence, an error gets returned.
877 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
878 let data_dir = nodes[0].persister.get_data_dir();
879 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
880 let event_handler = |_: &_| {};
881 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
883 match bg_processor.stop() {
884 Ok(_) => panic!("Expected error persisting scorer"),
886 assert_eq!(e.kind(), std::io::ErrorKind::Other);
887 assert_eq!(e.get_ref().unwrap().to_string(), "test");
893 fn test_background_event_handling() {
894 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
895 let channel_value = 100000;
896 let data_dir = nodes[0].persister.get_data_dir();
897 let persister = Arc::new(Persister::new(data_dir.clone()));
899 // Set up a background event handler for FundingGenerationReady events.
900 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
901 let event_handler = move |event: &Event| {
902 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
904 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
906 // Open a channel and check that the FundingGenerationReady event was handled.
907 begin_open_channel!(nodes[0], nodes[1], channel_value);
908 let (temporary_channel_id, funding_tx) = receiver
909 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
910 .expect("FundingGenerationReady not handled within deadline");
911 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
913 // Confirm the funding transaction.
914 confirm_transaction(&mut nodes[0], &funding_tx);
915 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
916 confirm_transaction(&mut nodes[1], &funding_tx);
917 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
918 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
919 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
920 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
921 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
923 assert!(bg_processor.stop().is_ok());
925 // Set up a background event handler for SpendableOutputs events.
926 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
927 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
928 let persister = Arc::new(Persister::new(data_dir));
929 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
931 // Force close the channel and check that the SpendableOutputs event was handled.
932 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
933 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
934 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
936 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
937 .expect("SpendableOutputs not handled within deadline");
939 Event::SpendableOutputs { .. } => {},
940 Event::ChannelClosed { .. } => {},
941 _ => panic!("Unexpected event: {:?}", event),
944 assert!(bg_processor.stop().is_ok());
948 fn test_scorer_persistence() {
949 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
950 let data_dir = nodes[0].persister.get_data_dir();
951 let persister = Arc::new(Persister::new(data_dir));
952 let event_handler = |_: &_| {};
953 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
956 let log_entries = nodes[0].logger.lines.lock().unwrap();
957 let expected_log = "Persisting scorer".to_string();
958 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
963 assert!(bg_processor.stop().is_ok());
967 fn test_not_pruning_network_graph_until_graph_sync_completion() {
968 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
969 let data_dir = nodes[0].persister.get_data_dir();
970 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
971 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
972 let network_graph = nodes[0].network_graph.clone();
973 let features = ChannelFeatures::empty();
974 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
975 .expect("Failed to update channel from partial announcement");
976 let original_graph_description = network_graph.to_string();
977 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
978 assert_eq!(network_graph.read_only().channels().len(), 1);
980 let event_handler = |_: &_| {};
981 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
984 let log_entries = nodes[0].logger.lines.lock().unwrap();
985 let expected_log_a = "Assessing prunability of network graph".to_string();
986 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
987 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
988 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
993 let initialization_input = vec![
994 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
995 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
996 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
997 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
998 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
999 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1000 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1001 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1002 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1003 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1004 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1005 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1006 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1008 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1010 // this should have added two channels
1011 assert_eq!(network_graph.read_only().channels().len(), 3);
1014 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1015 .expect("Network graph not pruned within deadline");
1017 background_processor.stop().unwrap();
1019 // all channels should now be pruned
1020 assert_eq!(network_graph.read_only().channels().len(), 0);
1024 fn test_invoice_payer() {
1025 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1026 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1027 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1029 // Initiate the background processors to watch each node.
1030 let data_dir = nodes[0].persister.get_data_dir();
1031 let persister = Arc::new(Persister::new(data_dir));
1032 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1033 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1034 let event_handler = Arc::clone(&invoice_payer);
1035 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1036 assert!(bg_processor.stop().is_ok());