1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 #![deny(broken_intra_doc_links)]
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::{Score, MultiThreadedLockableScore};
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
28 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 /// writing it to disk/backups by invoking the callback given to it at startup.
40 /// [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 /// at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
44 /// is provided to [`BackgroundProcessor::start`]).
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60 stop_thread: Arc<AtomicBool>,
61 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 const FRESHNESS_TIMER: u64 = 60;
67 const FRESHNESS_TIMER: u64 = 1;
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
77 const PING_TIMER: u64 = 1;
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
83 const SCORER_PERSIST_TIMER: u64 = 30;
85 const SCORER_PERSIST_TIMER: u64 = 1;
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
92 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
94 P: Deref<Target = P2PGossipSync<G, A, L>>,
95 R: Deref<Target = RapidGossipSync<G, L>>,
96 G: Deref<Target = NetworkGraph<L>>,
100 where A::Target: chain::Access, L::Target: Logger {
101 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
103 /// Rapid gossip sync from a trusted server.
110 P: Deref<Target = P2PGossipSync<G, A, L>>,
111 R: Deref<Target = RapidGossipSync<G, L>>,
112 G: Deref<Target = NetworkGraph<L>>,
115 > GossipSync<P, R, G, A, L>
116 where A::Target: chain::Access, L::Target: Logger {
117 fn network_graph(&self) -> Option<&G> {
119 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
120 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
121 GossipSync::None => None,
125 fn prunable_network_graph(&self) -> Option<&G> {
127 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
128 GossipSync::Rapid(gossip_sync) => {
129 if gossip_sync.is_initial_sync_complete() {
130 Some(gossip_sync.network_graph())
135 GossipSync::None => None,
140 /// (C-not exported) as the bindings concretize everything and have constructors for us
141 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
142 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
144 A::Target: chain::Access,
147 /// Initializes a new [`GossipSync::P2P`] variant.
148 pub fn p2p(gossip_sync: P) -> Self {
149 GossipSync::P2P(gossip_sync)
153 /// (C-not exported) as the bindings concretize everything and have constructors for us
154 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
156 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
159 &'a (dyn chain::Access + Send + Sync),
165 /// Initializes a new [`GossipSync::Rapid`] variant.
166 pub fn rapid(gossip_sync: R) -> Self {
167 GossipSync::Rapid(gossip_sync)
171 /// (C-not exported) as the bindings concretize everything and have constructors for us
174 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
175 &RapidGossipSync<&'a NetworkGraph<L>, L>,
177 &'a (dyn chain::Access + Send + Sync),
183 /// Initializes a new [`GossipSync::None`] variant.
184 pub fn none() -> Self {
189 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
190 struct DecoratingEventHandler<
193 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
194 RGS: Deref<Target = RapidGossipSync<G, L>>,
195 G: Deref<Target = NetworkGraph<L>>,
199 where A::Target: chain::Access, L::Target: Logger {
201 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
207 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
208 RGS: Deref<Target = RapidGossipSync<G, L>>,
209 G: Deref<Target = NetworkGraph<L>>,
212 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
213 where A::Target: chain::Access, L::Target: Logger {
214 fn handle_event(&self, event: &Event) {
215 if let Some(network_graph) = self.gossip_sync.network_graph() {
216 network_graph.handle_event(event);
218 self.event_handler.handle_event(event);
222 impl BackgroundProcessor {
223 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
226 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
227 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
228 /// either [`join`] or [`stop`].
230 /// # Data Persistence
232 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
233 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
234 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
235 /// provided implementation.
237 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
238 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
239 /// See the `lightning-persister` crate for LDK's provided implementation.
241 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
242 /// error or call [`join`] and handle any error that may arise. For the latter case,
243 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
247 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
248 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
249 /// functionality implemented by other handlers.
250 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
252 /// # Rapid Gossip Sync
254 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
255 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
256 /// until the [`RapidGossipSync`] instance completes its first sync.
258 /// [top-level documentation]: BackgroundProcessor
259 /// [`join`]: Self::join
260 /// [`stop`]: Self::stop
261 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
262 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
263 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
264 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
265 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
266 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
269 Signer: 'static + Sign,
270 CA: 'static + Deref + Send + Sync,
271 CF: 'static + Deref + Send + Sync,
272 CW: 'static + Deref + Send + Sync,
273 T: 'static + Deref + Send + Sync,
274 K: 'static + Deref + Send + Sync,
275 F: 'static + Deref + Send + Sync,
276 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
277 L: 'static + Deref + Send + Sync,
278 P: 'static + Deref + Send + Sync,
279 Descriptor: 'static + SocketDescriptor + Send + Sync,
280 CMH: 'static + Deref + Send + Sync,
281 RMH: 'static + Deref + Send + Sync,
282 EH: 'static + EventHandler + Send,
283 PS: 'static + Deref + Send,
284 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
285 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
286 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
287 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
288 UMH: 'static + Deref + Send + Sync,
289 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
292 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
293 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<&'static MultiThreadedLockableScore<SC>>,
296 CA::Target: 'static + chain::Access,
297 CF::Target: 'static + chain::Filter,
298 CW::Target: 'static + chain::Watch<Signer>,
299 T::Target: 'static + BroadcasterInterface,
300 K::Target: 'static + KeysInterface<Signer = Signer>,
301 F::Target: 'static + FeeEstimator,
302 L::Target: 'static + Logger,
303 P::Target: 'static + Persist<Signer>,
304 CMH::Target: 'static + ChannelMessageHandler,
305 RMH::Target: 'static + RoutingMessageHandler,
306 UMH::Target: 'static + CustomMessageHandler,
307 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
309 let stop_thread = Arc::new(AtomicBool::new(false));
310 let stop_thread_clone = stop_thread.clone();
311 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
312 let event_handler = DecoratingEventHandler {
314 gossip_sync: &gossip_sync,
317 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
318 channel_manager.timer_tick_occurred();
320 let mut last_freshness_call = Instant::now();
321 let mut last_ping_call = Instant::now();
322 let mut last_prune_call = Instant::now();
323 let mut last_scorer_persist_call = Instant::now();
324 let mut have_pruned = false;
327 channel_manager.process_pending_events(&event_handler);
328 chain_monitor.process_pending_events(&event_handler);
330 // Note that the PeerManager::process_events may block on ChannelManager's locks,
331 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
332 // we want to ensure we get into `persist_manager` as quickly as we can, especially
333 // without running the normal event processing above and handing events to users.
335 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
336 // processing a message effectively at any point during this loop. In order to
337 // minimize the time between such processing completing and persisting the updated
338 // ChannelManager, we want to minimize methods blocking on a ChannelManager
339 // generally, and as a fallback place such blocking only immediately before
341 peer_manager.process_events();
343 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
344 // see `await_start`'s use below.
345 let await_start = Instant::now();
346 let updates_available =
347 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
348 let await_time = await_start.elapsed();
350 if updates_available {
351 log_trace!(logger, "Persisting ChannelManager...");
352 persister.persist_manager(&*channel_manager)?;
353 log_trace!(logger, "Done persisting ChannelManager.");
355 // Exit the loop if the background processor was requested to stop.
356 if stop_thread.load(Ordering::Acquire) == true {
357 log_trace!(logger, "Terminating background processor.");
360 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
361 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
362 channel_manager.timer_tick_occurred();
363 last_freshness_call = Instant::now();
365 if await_time > Duration::from_secs(1) {
366 // On various platforms, we may be starved of CPU cycles for several reasons.
367 // E.g. on iOS, if we've been in the background, we will be entirely paused.
368 // Similarly, if we're on a desktop platform and the device has been asleep, we
369 // may not get any cycles.
370 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
371 // full second, at which point we assume sockets may have been killed (they
372 // appear to be at least on some platforms, even if it has only been a second).
373 // Note that we have to take care to not get here just because user event
374 // processing was slow at the top of the loop. For example, the sample client
375 // may call Bitcoin Core RPCs during event handling, which very often takes
376 // more than a handful of seconds to complete, and shouldn't disconnect all our
378 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
379 peer_manager.disconnect_all_peers();
380 last_ping_call = Instant::now();
381 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
382 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
383 peer_manager.timer_tick_occurred();
384 last_ping_call = Instant::now();
387 // Note that we want to run a graph prune once not long after startup before
388 // falling back to our usual hourly prunes. This avoids short-lived clients never
389 // pruning their network graph. We run once 60 seconds after startup before
390 // continuing our normal cadence.
391 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
392 // The network graph must not be pruned while rapid sync completion is pending
393 log_trace!(logger, "Assessing prunability of network graph");
394 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
395 network_graph.remove_stale_channels();
397 if let Err(e) = persister.persist_graph(network_graph) {
398 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
401 last_prune_call = Instant::now();
404 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
408 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
409 if let Some(ref scorer) = scorer {
410 log_trace!(logger, "Persisting scorer");
411 if let Err(e) = persister.persist_scorer(&scorer) {
412 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
415 last_scorer_persist_call = Instant::now();
419 // After we exit, ensure we persist the ChannelManager one final time - this avoids
420 // some races where users quit while channel updates were in-flight, with
421 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
422 persister.persist_manager(&*channel_manager)?;
424 // Persist Scorer on exit
425 if let Some(ref scorer) = scorer {
426 persister.persist_scorer(&scorer)?;
429 // Persist NetworkGraph on exit
430 if let Some(network_graph) = gossip_sync.network_graph() {
431 persister.persist_graph(network_graph)?;
436 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
439 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
440 /// [`ChannelManager`].
444 /// This function panics if the background thread has panicked such as while persisting or
447 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
448 pub fn join(mut self) -> Result<(), std::io::Error> {
449 assert!(self.thread_handle.is_some());
453 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
454 /// [`ChannelManager`].
458 /// This function panics if the background thread has panicked such as while persisting or
461 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
462 pub fn stop(mut self) -> Result<(), std::io::Error> {
463 assert!(self.thread_handle.is_some());
464 self.stop_and_join_thread()
467 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
468 self.stop_thread.store(true, Ordering::Release);
472 fn join_thread(&mut self) -> Result<(), std::io::Error> {
473 match self.thread_handle.take() {
474 Some(handle) => handle.join().unwrap(),
480 impl Drop for BackgroundProcessor {
482 self.stop_and_join_thread().unwrap();
488 use bitcoin::blockdata::block::BlockHeader;
489 use bitcoin::blockdata::constants::genesis_block;
490 use bitcoin::blockdata::transaction::{Transaction, TxOut};
491 use bitcoin::network::constants::Network;
492 use lightning::chain::{BestBlock, Confirm, chainmonitor};
493 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
494 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
495 use lightning::chain::transaction::OutPoint;
496 use lightning::get_event_msg;
497 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
498 use lightning::ln::features::{ChannelFeatures, InitFeatures};
499 use lightning::ln::msgs::{ChannelMessageHandler, Init};
500 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
501 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
502 use lightning::util::config::UserConfig;
503 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
504 use lightning::util::ser::Writeable;
505 use lightning::util::test_utils;
506 use lightning::util::persist::KVStorePersister;
507 use lightning_invoice::payment::{InvoicePayer, Retry};
508 use lightning_invoice::utils::DefaultRouter;
509 use lightning_persister::FilesystemPersister;
511 use std::path::PathBuf;
512 use std::sync::{Arc, Mutex};
513 use std::sync::mpsc::SyncSender;
514 use std::time::Duration;
515 use lightning::routing::scoring::{FixedPenaltyScorer};
516 use lightning_rapid_gossip_sync::RapidGossipSync;
517 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
519 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
521 #[derive(Clone, Eq, Hash, PartialEq)]
522 struct TestDescriptor{}
523 impl SocketDescriptor for TestDescriptor {
524 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
528 fn disconnect_socket(&mut self) {}
531 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
533 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
534 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
537 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
538 p2p_gossip_sync: PGS,
539 rapid_gossip_sync: RGS,
540 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
541 chain_monitor: Arc<ChainMonitor>,
542 persister: Arc<FilesystemPersister>,
543 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
544 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
545 logger: Arc<test_utils::TestLogger>,
546 best_block: BestBlock,
547 scorer: Arc<Mutex<FixedPenaltyScorer>>,
551 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
552 GossipSync::P2P(self.p2p_gossip_sync.clone())
555 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
556 GossipSync::Rapid(self.rapid_gossip_sync.clone())
559 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
566 let data_dir = self.persister.get_data_dir();
567 match fs::remove_dir_all(data_dir.clone()) {
568 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
575 graph_error: Option<(std::io::ErrorKind, &'static str)>,
576 graph_persistence_notifier: Option<SyncSender<()>>,
577 manager_error: Option<(std::io::ErrorKind, &'static str)>,
578 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
579 filesystem_persister: FilesystemPersister,
583 fn new(data_dir: String) -> Self {
584 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
585 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
588 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
589 Self { graph_error: Some((error, message)), ..self }
592 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
593 Self { graph_persistence_notifier: Some(sender), ..self }
596 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
597 Self { manager_error: Some((error, message)), ..self }
600 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
601 Self { scorer_error: Some((error, message)), ..self }
605 impl KVStorePersister for Persister {
606 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
607 if key == "manager" {
608 if let Some((error, message)) = self.manager_error {
609 return Err(std::io::Error::new(error, message))
613 if key == "network_graph" {
614 if let Some(sender) = &self.graph_persistence_notifier {
615 sender.send(()).unwrap();
618 if let Some((error, message)) = self.graph_error {
619 return Err(std::io::Error::new(error, message))
624 if let Some((error, message)) = self.scorer_error {
625 return Err(std::io::Error::new(error, message))
629 self.filesystem_persister.persist(key, object)
633 fn get_full_filepath(filepath: String, filename: String) -> String {
634 let mut path = PathBuf::from(filepath);
636 path.to_str().unwrap().to_string()
639 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
640 let mut nodes = Vec::new();
641 for i in 0..num_nodes {
642 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
643 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
644 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
645 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
646 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
647 let seed = [i as u8; 32];
648 let network = Network::Testnet;
649 let genesis_block = genesis_block(network);
650 let now = Duration::from_secs(genesis_block.header.time as u64);
651 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
652 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
653 let best_block = BestBlock::from_genesis(network);
654 let params = ChainParameters { network, best_block };
655 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
656 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
657 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
658 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
659 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
660 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
661 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
662 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
666 for i in 0..num_nodes {
667 for j in (i+1)..num_nodes {
668 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
669 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
676 macro_rules! open_channel {
677 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
678 begin_open_channel!($node_a, $node_b, $channel_value);
679 let events = $node_a.node.get_and_clear_pending_events();
680 assert_eq!(events.len(), 1);
681 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
682 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
687 macro_rules! begin_open_channel {
688 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
689 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
690 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
691 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
695 macro_rules! handle_funding_generation_ready {
696 ($event: expr, $channel_value: expr) => {{
698 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
699 assert_eq!(channel_value_satoshis, $channel_value);
700 assert_eq!(user_channel_id, 42);
702 let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
703 value: channel_value_satoshis, script_pubkey: output_script.clone(),
705 (temporary_channel_id, tx)
707 _ => panic!("Unexpected event"),
712 macro_rules! end_open_channel {
713 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
714 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
715 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
716 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
720 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
722 let prev_blockhash = node.best_block.block_hash();
723 let height = node.best_block.height() + 1;
724 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
725 let txdata = vec![(0, tx)];
726 node.best_block = BestBlock::new(header.block_hash(), height);
729 node.node.transactions_confirmed(&header, &txdata, height);
730 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
733 node.node.best_block_updated(&header, height);
734 node.chain_monitor.best_block_updated(&header, height);
740 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
741 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
745 fn test_background_processor() {
746 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
747 // updates. Also test that when new updates are available, the manager signals that it needs
748 // re-persistence and is successfully re-persisted.
749 let nodes = create_nodes(2, "test_background_processor".to_string());
751 // Go through the channel creation process so that each node has something to persist. Since
752 // open_channel consumes events, it must complete before starting BackgroundProcessor to
753 // avoid a race with processing events.
754 let tx = open_channel!(nodes[0], nodes[1], 100000);
756 // Initiate the background processors to watch each node.
757 let data_dir = nodes[0].persister.get_data_dir();
758 let persister = Arc::new(Persister::new(data_dir));
759 let event_handler = |_: &_| {};
760 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
762 macro_rules! check_persisted_data {
763 ($node: expr, $filepath: expr) => {
764 let mut expected_bytes = Vec::new();
766 expected_bytes.clear();
767 match $node.write(&mut expected_bytes) {
769 match std::fs::read($filepath) {
771 if bytes == expected_bytes {
780 Err(e) => panic!("Unexpected error: {}", e)
786 // Check that the initial channel manager data is persisted as expected.
787 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
788 check_persisted_data!(nodes[0].node, filepath.clone());
791 if !nodes[0].node.get_persistence_condvar_value() { break }
794 // Force-close the channel.
795 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
797 // Check that the force-close updates are persisted.
798 check_persisted_data!(nodes[0].node, filepath.clone());
800 if !nodes[0].node.get_persistence_condvar_value() { break }
803 // Check network graph is persisted
804 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
805 check_persisted_data!(nodes[0].network_graph, filepath.clone());
807 // Check scorer is persisted
808 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
809 check_persisted_data!(nodes[0].scorer, filepath.clone());
811 assert!(bg_processor.stop().is_ok());
815 fn test_timer_tick_called() {
816 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
817 // `FRESHNESS_TIMER`.
818 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
819 let data_dir = nodes[0].persister.get_data_dir();
820 let persister = Arc::new(Persister::new(data_dir));
821 let event_handler = |_: &_| {};
822 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
824 let log_entries = nodes[0].logger.lines.lock().unwrap();
825 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
826 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
827 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
828 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
833 assert!(bg_processor.stop().is_ok());
837 fn test_channel_manager_persist_error() {
838 // Test that if we encounter an error during manager persistence, the thread panics.
839 let nodes = create_nodes(2, "test_persist_error".to_string());
840 open_channel!(nodes[0], nodes[1], 100000);
842 let data_dir = nodes[0].persister.get_data_dir();
843 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
844 let event_handler = |_: &_| {};
845 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
846 match bg_processor.join() {
847 Ok(_) => panic!("Expected error persisting manager"),
849 assert_eq!(e.kind(), std::io::ErrorKind::Other);
850 assert_eq!(e.get_ref().unwrap().to_string(), "test");
856 fn test_network_graph_persist_error() {
857 // Test that if we encounter an error during network graph persistence, an error gets returned.
858 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
859 let data_dir = nodes[0].persister.get_data_dir();
860 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
861 let event_handler = |_: &_| {};
862 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
864 match bg_processor.stop() {
865 Ok(_) => panic!("Expected error persisting network graph"),
867 assert_eq!(e.kind(), std::io::ErrorKind::Other);
868 assert_eq!(e.get_ref().unwrap().to_string(), "test");
874 fn test_scorer_persist_error() {
875 // Test that if we encounter an error during scorer persistence, an error gets returned.
876 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
877 let data_dir = nodes[0].persister.get_data_dir();
878 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
879 let event_handler = |_: &_| {};
880 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
882 match bg_processor.stop() {
883 Ok(_) => panic!("Expected error persisting scorer"),
885 assert_eq!(e.kind(), std::io::ErrorKind::Other);
886 assert_eq!(e.get_ref().unwrap().to_string(), "test");
892 fn test_background_event_handling() {
893 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
894 let channel_value = 100000;
895 let data_dir = nodes[0].persister.get_data_dir();
896 let persister = Arc::new(Persister::new(data_dir.clone()));
898 // Set up a background event handler for FundingGenerationReady events.
899 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
900 let event_handler = move |event: &Event| {
901 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
903 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
905 // Open a channel and check that the FundingGenerationReady event was handled.
906 begin_open_channel!(nodes[0], nodes[1], channel_value);
907 let (temporary_channel_id, funding_tx) = receiver
908 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
909 .expect("FundingGenerationReady not handled within deadline");
910 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
912 // Confirm the funding transaction.
913 confirm_transaction(&mut nodes[0], &funding_tx);
914 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
915 confirm_transaction(&mut nodes[1], &funding_tx);
916 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
917 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
918 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
919 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
920 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
922 assert!(bg_processor.stop().is_ok());
924 // Set up a background event handler for SpendableOutputs events.
925 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
926 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
927 let persister = Arc::new(Persister::new(data_dir));
928 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
930 // Force close the channel and check that the SpendableOutputs event was handled.
931 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
932 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
933 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
935 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
936 .expect("SpendableOutputs not handled within deadline");
938 Event::SpendableOutputs { .. } => {},
939 Event::ChannelClosed { .. } => {},
940 _ => panic!("Unexpected event: {:?}", event),
943 assert!(bg_processor.stop().is_ok());
947 fn test_scorer_persistence() {
948 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
949 let data_dir = nodes[0].persister.get_data_dir();
950 let persister = Arc::new(Persister::new(data_dir));
951 let event_handler = |_: &_| {};
952 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
955 let log_entries = nodes[0].logger.lines.lock().unwrap();
956 let expected_log = "Persisting scorer".to_string();
957 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
962 assert!(bg_processor.stop().is_ok());
966 fn test_not_pruning_network_graph_until_graph_sync_completion() {
967 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
968 let data_dir = nodes[0].persister.get_data_dir();
969 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
970 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
971 let network_graph = nodes[0].network_graph.clone();
972 let features = ChannelFeatures::empty();
973 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
974 .expect("Failed to update channel from partial announcement");
975 let original_graph_description = network_graph.to_string();
976 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
977 assert_eq!(network_graph.read_only().channels().len(), 1);
979 let event_handler = |_: &_| {};
980 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
983 let log_entries = nodes[0].logger.lines.lock().unwrap();
984 let expected_log_a = "Assessing prunability of network graph".to_string();
985 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
986 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
987 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
992 let initialization_input = vec![
993 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
994 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
995 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
996 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
997 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
998 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
999 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1000 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1001 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1002 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1003 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1004 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1005 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1007 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1009 // this should have added two channels
1010 assert_eq!(network_graph.read_only().channels().len(), 3);
1013 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1014 .expect("Network graph not pruned within deadline");
1016 background_processor.stop().unwrap();
1018 // all channels should now be pruned
1019 assert_eq!(network_graph.read_only().channels().len(), 0);
1023 fn test_invoice_payer() {
1024 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1025 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1026 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1028 // Initiate the background processors to watch each node.
1029 let data_dir = nodes[0].persister.get_data_dir();
1030 let persister = Arc::new(Persister::new(data_dir));
1031 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1032 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1033 let event_handler = Arc::clone(&invoice_payer);
1034 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1035 assert!(bg_processor.stop().is_ok());