1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 #![deny(broken_intra_doc_links)]
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
28 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 /// writing it to disk/backups by invoking the callback given to it at startup.
40 /// [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 /// at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
44 /// is provided to [`BackgroundProcessor::start`]).
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60 stop_thread: Arc<AtomicBool>,
61 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 const FRESHNESS_TIMER: u64 = 60;
67 const FRESHNESS_TIMER: u64 = 1;
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
77 const PING_TIMER: u64 = 1;
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
82 #[cfg(all(not(test), debug_assertions))]
83 const SCORER_PERSIST_TIMER: u64 = 30;
85 const SCORER_PERSIST_TIMER: u64 = 1;
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
92 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
94 P: Deref<Target = P2PGossipSync<G, A, L>>,
95 R: Deref<Target = RapidGossipSync<G, L>>,
96 G: Deref<Target = NetworkGraph<L>>,
100 where A::Target: chain::Access, L::Target: Logger {
101 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
103 /// Rapid gossip sync from a trusted server.
110 P: Deref<Target = P2PGossipSync<G, A, L>>,
111 R: Deref<Target = RapidGossipSync<G, L>>,
112 G: Deref<Target = NetworkGraph<L>>,
115 > GossipSync<P, R, G, A, L>
116 where A::Target: chain::Access, L::Target: Logger {
117 fn network_graph(&self) -> Option<&G> {
119 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
120 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
121 GossipSync::None => None,
125 fn prunable_network_graph(&self) -> Option<&G> {
127 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
128 GossipSync::Rapid(gossip_sync) => {
129 if gossip_sync.is_initial_sync_complete() {
130 Some(gossip_sync.network_graph())
135 GossipSync::None => None,
140 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
141 struct DecoratingEventHandler<
144 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
145 RGS: Deref<Target = RapidGossipSync<G, L>>,
146 G: Deref<Target = NetworkGraph<L>>,
150 where A::Target: chain::Access, L::Target: Logger {
152 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
158 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
159 RGS: Deref<Target = RapidGossipSync<G, L>>,
160 G: Deref<Target = NetworkGraph<L>>,
163 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
164 where A::Target: chain::Access, L::Target: Logger {
165 fn handle_event(&self, event: &Event) {
166 if let Some(network_graph) = self.gossip_sync.network_graph() {
167 network_graph.handle_event(event);
169 self.event_handler.handle_event(event);
173 impl BackgroundProcessor {
174 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
177 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
178 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
179 /// either [`join`] or [`stop`].
181 /// # Data Persistence
183 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
184 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
185 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
186 /// provided implementation.
188 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
189 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
190 /// See the `lightning-persister` crate for LDK's provided implementation.
192 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
193 /// error or call [`join`] and handle any error that may arise. For the latter case,
194 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
198 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
199 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
200 /// functionality implemented by other handlers.
201 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
203 /// # Rapid Gossip Sync
205 /// If rapid gossip sync is meant to run at startup, pass a [`RapidGossipSync`] to `gossip_sync`
206 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
207 /// until the [`RapidGossipSync`] instance completes its first sync.
209 /// [top-level documentation]: BackgroundProcessor
210 /// [`join`]: Self::join
211 /// [`stop`]: Self::stop
212 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
213 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
214 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
215 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
216 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
217 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
220 Signer: 'static + Sign,
221 CA: 'static + Deref + Send + Sync,
222 CF: 'static + Deref + Send + Sync,
223 CW: 'static + Deref + Send + Sync,
224 T: 'static + Deref + Send + Sync,
225 K: 'static + Deref + Send + Sync,
226 F: 'static + Deref + Send + Sync,
227 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
228 L: 'static + Deref + Send + Sync,
229 P: 'static + Deref + Send + Sync,
230 Descriptor: 'static + SocketDescriptor + Send + Sync,
231 CMH: 'static + Deref + Send + Sync,
232 RMH: 'static + Deref + Send + Sync,
233 EH: 'static + EventHandler + Send,
234 PS: 'static + Deref + Send,
235 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
236 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
237 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
238 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
239 UMH: 'static + Deref + Send + Sync,
240 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
241 S: 'static + Deref<Target = SC> + Send + Sync,
242 SC: WriteableScore<'a>,
244 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
245 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
248 CA::Target: 'static + chain::Access,
249 CF::Target: 'static + chain::Filter,
250 CW::Target: 'static + chain::Watch<Signer>,
251 T::Target: 'static + BroadcasterInterface,
252 K::Target: 'static + KeysInterface<Signer = Signer>,
253 F::Target: 'static + FeeEstimator,
254 L::Target: 'static + Logger,
255 P::Target: 'static + Persist<Signer>,
256 CMH::Target: 'static + ChannelMessageHandler,
257 RMH::Target: 'static + RoutingMessageHandler,
258 UMH::Target: 'static + CustomMessageHandler,
259 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
261 let stop_thread = Arc::new(AtomicBool::new(false));
262 let stop_thread_clone = stop_thread.clone();
263 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
264 let event_handler = DecoratingEventHandler {
266 gossip_sync: &gossip_sync,
269 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
270 channel_manager.timer_tick_occurred();
272 let mut last_freshness_call = Instant::now();
273 let mut last_ping_call = Instant::now();
274 let mut last_prune_call = Instant::now();
275 let mut last_scorer_persist_call = Instant::now();
276 let mut have_pruned = false;
279 channel_manager.process_pending_events(&event_handler);
280 chain_monitor.process_pending_events(&event_handler);
282 // Note that the PeerManager::process_events may block on ChannelManager's locks,
283 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
284 // we want to ensure we get into `persist_manager` as quickly as we can, especially
285 // without running the normal event processing above and handing events to users.
287 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
288 // processing a message effectively at any point during this loop. In order to
289 // minimize the time between such processing completing and persisting the updated
290 // ChannelManager, we want to minimize methods blocking on a ChannelManager
291 // generally, and as a fallback place such blocking only immediately before
293 peer_manager.process_events();
295 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
296 // see `await_start`'s use below.
297 let await_start = Instant::now();
298 let updates_available =
299 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
300 let await_time = await_start.elapsed();
302 if updates_available {
303 log_trace!(logger, "Persisting ChannelManager...");
304 persister.persist_manager(&*channel_manager)?;
305 log_trace!(logger, "Done persisting ChannelManager.");
307 // Exit the loop if the background processor was requested to stop.
308 if stop_thread.load(Ordering::Acquire) == true {
309 log_trace!(logger, "Terminating background processor.");
312 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
313 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
314 channel_manager.timer_tick_occurred();
315 last_freshness_call = Instant::now();
317 if await_time > Duration::from_secs(1) {
318 // On various platforms, we may be starved of CPU cycles for several reasons.
319 // E.g. on iOS, if we've been in the background, we will be entirely paused.
320 // Similarly, if we're on a desktop platform and the device has been asleep, we
321 // may not get any cycles.
322 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
323 // full second, at which point we assume sockets may have been killed (they
324 // appear to be at least on some platforms, even if it has only been a second).
325 // Note that we have to take care to not get here just because user event
326 // processing was slow at the top of the loop. For example, the sample client
327 // may call Bitcoin Core RPCs during event handling, which very often takes
328 // more than a handful of seconds to complete, and shouldn't disconnect all our
330 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
331 peer_manager.disconnect_all_peers();
332 last_ping_call = Instant::now();
333 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
334 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
335 peer_manager.timer_tick_occurred();
336 last_ping_call = Instant::now();
339 // Note that we want to run a graph prune once not long after startup before
340 // falling back to our usual hourly prunes. This avoids short-lived clients never
341 // pruning their network graph. We run once 60 seconds after startup before
342 // continuing our normal cadence.
343 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
344 // The network graph must not be pruned while rapid sync completion is pending
345 log_trace!(logger, "Assessing prunability of network graph");
346 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
347 network_graph.remove_stale_channels();
349 if let Err(e) = persister.persist_graph(network_graph) {
350 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
353 last_prune_call = Instant::now();
356 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
360 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
361 if let Some(ref scorer) = scorer {
362 log_trace!(logger, "Persisting scorer");
363 if let Err(e) = persister.persist_scorer(&scorer) {
364 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
367 last_scorer_persist_call = Instant::now();
371 // After we exit, ensure we persist the ChannelManager one final time - this avoids
372 // some races where users quit while channel updates were in-flight, with
373 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
374 persister.persist_manager(&*channel_manager)?;
376 // Persist Scorer on exit
377 if let Some(ref scorer) = scorer {
378 persister.persist_scorer(&scorer)?;
381 // Persist NetworkGraph on exit
382 if let Some(network_graph) = gossip_sync.network_graph() {
383 persister.persist_graph(network_graph)?;
388 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
391 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
392 /// [`ChannelManager`].
396 /// This function panics if the background thread has panicked such as while persisting or
399 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
400 pub fn join(mut self) -> Result<(), std::io::Error> {
401 assert!(self.thread_handle.is_some());
405 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
406 /// [`ChannelManager`].
410 /// This function panics if the background thread has panicked such as while persisting or
413 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
414 pub fn stop(mut self) -> Result<(), std::io::Error> {
415 assert!(self.thread_handle.is_some());
416 self.stop_and_join_thread()
419 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
420 self.stop_thread.store(true, Ordering::Release);
424 fn join_thread(&mut self) -> Result<(), std::io::Error> {
425 match self.thread_handle.take() {
426 Some(handle) => handle.join().unwrap(),
432 impl Drop for BackgroundProcessor {
434 self.stop_and_join_thread().unwrap();
440 use bitcoin::blockdata::block::BlockHeader;
441 use bitcoin::blockdata::constants::genesis_block;
442 use bitcoin::blockdata::transaction::{Transaction, TxOut};
443 use bitcoin::network::constants::Network;
444 use lightning::chain::{BestBlock, Confirm, chainmonitor};
445 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
446 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
447 use lightning::chain::transaction::OutPoint;
448 use lightning::get_event_msg;
449 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
450 use lightning::ln::features::{ChannelFeatures, InitFeatures};
451 use lightning::ln::msgs::{ChannelMessageHandler, Init};
452 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
453 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
454 use lightning::util::config::UserConfig;
455 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
456 use lightning::util::ser::Writeable;
457 use lightning::util::test_utils;
458 use lightning::util::persist::KVStorePersister;
459 use lightning_invoice::payment::{InvoicePayer, Retry};
460 use lightning_invoice::utils::DefaultRouter;
461 use lightning_persister::FilesystemPersister;
463 use std::path::PathBuf;
464 use std::sync::{Arc, Mutex};
465 use std::sync::mpsc::SyncSender;
466 use std::time::Duration;
467 use lightning::routing::scoring::{FixedPenaltyScorer};
468 use lightning_rapid_gossip_sync::RapidGossipSync;
469 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
471 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
473 #[derive(Clone, Eq, Hash, PartialEq)]
474 struct TestDescriptor{}
475 impl SocketDescriptor for TestDescriptor {
476 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
480 fn disconnect_socket(&mut self) {}
483 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
485 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
486 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
489 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
490 p2p_gossip_sync: PGS,
491 rapid_gossip_sync: RGS,
492 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
493 chain_monitor: Arc<ChainMonitor>,
494 persister: Arc<FilesystemPersister>,
495 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
496 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
497 logger: Arc<test_utils::TestLogger>,
498 best_block: BestBlock,
499 scorer: Arc<Mutex<FixedPenaltyScorer>>,
503 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
504 GossipSync::P2P(self.p2p_gossip_sync.clone())
507 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
508 GossipSync::Rapid(self.rapid_gossip_sync.clone())
511 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
518 let data_dir = self.persister.get_data_dir();
519 match fs::remove_dir_all(data_dir.clone()) {
520 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
527 graph_error: Option<(std::io::ErrorKind, &'static str)>,
528 graph_persistence_notifier: Option<SyncSender<()>>,
529 manager_error: Option<(std::io::ErrorKind, &'static str)>,
530 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
531 filesystem_persister: FilesystemPersister,
535 fn new(data_dir: String) -> Self {
536 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
537 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
540 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
541 Self { graph_error: Some((error, message)), ..self }
544 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
545 Self { graph_persistence_notifier: Some(sender), ..self }
548 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
549 Self { manager_error: Some((error, message)), ..self }
552 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
553 Self { scorer_error: Some((error, message)), ..self }
557 impl KVStorePersister for Persister {
558 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
559 if key == "manager" {
560 if let Some((error, message)) = self.manager_error {
561 return Err(std::io::Error::new(error, message))
565 if key == "network_graph" {
566 if let Some(sender) = &self.graph_persistence_notifier {
567 sender.send(()).unwrap();
570 if let Some((error, message)) = self.graph_error {
571 return Err(std::io::Error::new(error, message))
576 if let Some((error, message)) = self.scorer_error {
577 return Err(std::io::Error::new(error, message))
581 self.filesystem_persister.persist(key, object)
585 fn get_full_filepath(filepath: String, filename: String) -> String {
586 let mut path = PathBuf::from(filepath);
588 path.to_str().unwrap().to_string()
591 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
592 let mut nodes = Vec::new();
593 for i in 0..num_nodes {
594 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
595 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
596 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
597 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
598 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
599 let seed = [i as u8; 32];
600 let network = Network::Testnet;
601 let genesis_block = genesis_block(network);
602 let now = Duration::from_secs(genesis_block.header.time as u64);
603 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
604 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
605 let best_block = BestBlock::from_genesis(network);
606 let params = ChainParameters { network, best_block };
607 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
608 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
609 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
610 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
611 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
612 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
613 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
614 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
618 for i in 0..num_nodes {
619 for j in (i+1)..num_nodes {
620 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
621 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
628 macro_rules! open_channel {
629 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
630 begin_open_channel!($node_a, $node_b, $channel_value);
631 let events = $node_a.node.get_and_clear_pending_events();
632 assert_eq!(events.len(), 1);
633 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
634 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
639 macro_rules! begin_open_channel {
640 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
641 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
642 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
643 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
647 macro_rules! handle_funding_generation_ready {
648 ($event: expr, $channel_value: expr) => {{
650 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
651 assert_eq!(channel_value_satoshis, $channel_value);
652 assert_eq!(user_channel_id, 42);
654 let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
655 value: channel_value_satoshis, script_pubkey: output_script.clone(),
657 (temporary_channel_id, tx)
659 _ => panic!("Unexpected event"),
664 macro_rules! end_open_channel {
665 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
666 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
667 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
668 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
672 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
674 let prev_blockhash = node.best_block.block_hash();
675 let height = node.best_block.height() + 1;
676 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
677 let txdata = vec![(0, tx)];
678 node.best_block = BestBlock::new(header.block_hash(), height);
681 node.node.transactions_confirmed(&header, &txdata, height);
682 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
685 node.node.best_block_updated(&header, height);
686 node.chain_monitor.best_block_updated(&header, height);
692 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
693 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
697 fn test_background_processor() {
698 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
699 // updates. Also test that when new updates are available, the manager signals that it needs
700 // re-persistence and is successfully re-persisted.
701 let nodes = create_nodes(2, "test_background_processor".to_string());
703 // Go through the channel creation process so that each node has something to persist. Since
704 // open_channel consumes events, it must complete before starting BackgroundProcessor to
705 // avoid a race with processing events.
706 let tx = open_channel!(nodes[0], nodes[1], 100000);
708 // Initiate the background processors to watch each node.
709 let data_dir = nodes[0].persister.get_data_dir();
710 let persister = Arc::new(Persister::new(data_dir));
711 let event_handler = |_: &_| {};
712 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
714 macro_rules! check_persisted_data {
715 ($node: expr, $filepath: expr) => {
716 let mut expected_bytes = Vec::new();
718 expected_bytes.clear();
719 match $node.write(&mut expected_bytes) {
721 match std::fs::read($filepath) {
723 if bytes == expected_bytes {
732 Err(e) => panic!("Unexpected error: {}", e)
738 // Check that the initial channel manager data is persisted as expected.
739 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
740 check_persisted_data!(nodes[0].node, filepath.clone());
743 if !nodes[0].node.get_persistence_condvar_value() { break }
746 // Force-close the channel.
747 nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
749 // Check that the force-close updates are persisted.
750 check_persisted_data!(nodes[0].node, filepath.clone());
752 if !nodes[0].node.get_persistence_condvar_value() { break }
755 // Check network graph is persisted
756 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
757 check_persisted_data!(nodes[0].network_graph, filepath.clone());
759 // Check scorer is persisted
760 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
761 check_persisted_data!(nodes[0].scorer, filepath.clone());
763 assert!(bg_processor.stop().is_ok());
767 fn test_timer_tick_called() {
768 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
769 // `FRESHNESS_TIMER`.
770 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
771 let data_dir = nodes[0].persister.get_data_dir();
772 let persister = Arc::new(Persister::new(data_dir));
773 let event_handler = |_: &_| {};
774 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
776 let log_entries = nodes[0].logger.lines.lock().unwrap();
777 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
778 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
779 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
780 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
785 assert!(bg_processor.stop().is_ok());
789 fn test_channel_manager_persist_error() {
790 // Test that if we encounter an error during manager persistence, the thread panics.
791 let nodes = create_nodes(2, "test_persist_error".to_string());
792 open_channel!(nodes[0], nodes[1], 100000);
794 let data_dir = nodes[0].persister.get_data_dir();
795 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
796 let event_handler = |_: &_| {};
797 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
798 match bg_processor.join() {
799 Ok(_) => panic!("Expected error persisting manager"),
801 assert_eq!(e.kind(), std::io::ErrorKind::Other);
802 assert_eq!(e.get_ref().unwrap().to_string(), "test");
808 fn test_network_graph_persist_error() {
809 // Test that if we encounter an error during network graph persistence, an error gets returned.
810 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
811 let data_dir = nodes[0].persister.get_data_dir();
812 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
813 let event_handler = |_: &_| {};
814 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
816 match bg_processor.stop() {
817 Ok(_) => panic!("Expected error persisting network graph"),
819 assert_eq!(e.kind(), std::io::ErrorKind::Other);
820 assert_eq!(e.get_ref().unwrap().to_string(), "test");
826 fn test_scorer_persist_error() {
827 // Test that if we encounter an error during scorer persistence, an error gets returned.
828 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
829 let data_dir = nodes[0].persister.get_data_dir();
830 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
831 let event_handler = |_: &_| {};
832 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
834 match bg_processor.stop() {
835 Ok(_) => panic!("Expected error persisting scorer"),
837 assert_eq!(e.kind(), std::io::ErrorKind::Other);
838 assert_eq!(e.get_ref().unwrap().to_string(), "test");
844 fn test_background_event_handling() {
845 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
846 let channel_value = 100000;
847 let data_dir = nodes[0].persister.get_data_dir();
848 let persister = Arc::new(Persister::new(data_dir.clone()));
850 // Set up a background event handler for FundingGenerationReady events.
851 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
852 let event_handler = move |event: &Event| {
853 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
855 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
857 // Open a channel and check that the FundingGenerationReady event was handled.
858 begin_open_channel!(nodes[0], nodes[1], channel_value);
859 let (temporary_channel_id, funding_tx) = receiver
860 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
861 .expect("FundingGenerationReady not handled within deadline");
862 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
864 // Confirm the funding transaction.
865 confirm_transaction(&mut nodes[0], &funding_tx);
866 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
867 confirm_transaction(&mut nodes[1], &funding_tx);
868 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
869 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
870 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
871 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
872 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
874 assert!(bg_processor.stop().is_ok());
876 // Set up a background event handler for SpendableOutputs events.
877 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
878 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
879 let persister = Arc::new(Persister::new(data_dir));
880 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
882 // Force close the channel and check that the SpendableOutputs event was handled.
883 nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
884 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
885 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
887 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
888 .expect("SpendableOutputs not handled within deadline");
890 Event::SpendableOutputs { .. } => {},
891 Event::ChannelClosed { .. } => {},
892 _ => panic!("Unexpected event: {:?}", event),
895 assert!(bg_processor.stop().is_ok());
899 fn test_scorer_persistence() {
900 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
901 let data_dir = nodes[0].persister.get_data_dir();
902 let persister = Arc::new(Persister::new(data_dir));
903 let event_handler = |_: &_| {};
904 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
907 let log_entries = nodes[0].logger.lines.lock().unwrap();
908 let expected_log = "Persisting scorer".to_string();
909 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
914 assert!(bg_processor.stop().is_ok());
918 fn test_not_pruning_network_graph_until_graph_sync_completion() {
919 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
920 let data_dir = nodes[0].persister.get_data_dir();
921 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
922 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
923 let network_graph = nodes[0].network_graph.clone();
924 let features = ChannelFeatures::empty();
925 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
926 .expect("Failed to update channel from partial announcement");
927 let original_graph_description = network_graph.to_string();
928 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
929 assert_eq!(network_graph.read_only().channels().len(), 1);
931 let event_handler = |_: &_| {};
932 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
935 let log_entries = nodes[0].logger.lines.lock().unwrap();
936 let expected_log_a = "Assessing prunability of network graph".to_string();
937 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
938 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
939 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
944 let initialization_input = vec![
945 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
946 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
947 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
948 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
949 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
950 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
951 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
952 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
953 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
954 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
955 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
956 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
957 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
959 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
961 // this should have added two channels
962 assert_eq!(network_graph.read_only().channels().len(), 3);
965 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
966 .expect("Network graph not pruned within deadline");
968 background_processor.stop().unwrap();
970 // all channels should now be pruned
971 assert_eq!(network_graph.read_only().channels().len(), 0);
975 fn test_invoice_payer() {
976 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
977 let random_seed_bytes = keys_manager.get_secure_random_bytes();
978 let nodes = create_nodes(2, "test_invoice_payer".to_string());
980 // Initiate the background processors to watch each node.
981 let data_dir = nodes[0].persister.get_data_dir();
982 let persister = Arc::new(Persister::new(data_dir));
983 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
984 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
985 let event_handler = Arc::clone(&invoice_payer);
986 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
987 assert!(bg_processor.stop().is_ok());