1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 #![deny(broken_intra_doc_links)]
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
28 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 /// writing it to disk/backups by invoking the callback given to it at startup.
40 /// [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 /// at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`P2PGossipSync`] is provided to
44 /// [`BackgroundProcessor::start`]).
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60 stop_thread: Arc<AtomicBool>,
61 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 const FRESHNESS_TIMER: u64 = 60;
67 const FRESHNESS_TIMER: u64 = 1;
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
77 const PING_TIMER: u64 = 1;
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
82 #[cfg(all(not(test), debug_assertions))]
83 const SCORER_PERSIST_TIMER: u64 = 30;
85 const SCORER_PERSIST_TIMER: u64 = 1;
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
93 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
94 struct DecoratingEventHandler<
96 P: Deref<Target = P2PGossipSync<G, A, L>>,
97 G: Deref<Target = NetworkGraph<L>>,
101 where A::Target: chain::Access, L::Target: Logger {
103 p2p_gossip_sync: Option<P>,
108 P: Deref<Target = P2PGossipSync<G, A, L>>,
109 G: Deref<Target = NetworkGraph<L>>,
112 > EventHandler for DecoratingEventHandler<E, P, G, A, L>
113 where A::Target: chain::Access, L::Target: Logger {
114 fn handle_event(&self, event: &Event) {
115 if let Some(gossip_sync) = &self.p2p_gossip_sync {
116 gossip_sync.network_graph().handle_event(event);
118 self.event_handler.handle_event(event);
122 impl BackgroundProcessor {
123 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
126 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
127 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
128 /// either [`join`] or [`stop`].
130 /// # Data Persistence
132 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
133 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
134 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
135 /// provided implementation.
137 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
138 /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See the `lightning-persister` crate
139 /// for LDK's provided implementation.
141 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
142 /// error or call [`join`] and handle any error that may arise. For the latter case,
143 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
147 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
148 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
149 /// functionality implemented by other handlers.
150 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
152 /// # Rapid Gossip Sync
154 /// If rapid gossip sync is meant to run at startup, pass an optional [`RapidGossipSync`]
155 /// to `rapid_gossip_sync` to indicate to [`BackgroundProcessor`] not to prune the
156 /// [`NetworkGraph`] instance until the [`RapidGossipSync`] instance completes its first sync.
158 /// [top-level documentation]: BackgroundProcessor
159 /// [`join`]: Self::join
160 /// [`stop`]: Self::stop
161 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
162 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
163 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
164 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
165 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
166 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
169 Signer: 'static + Sign,
170 CA: 'static + Deref + Send + Sync,
171 CF: 'static + Deref + Send + Sync,
172 CW: 'static + Deref + Send + Sync,
173 T: 'static + Deref + Send + Sync,
174 K: 'static + Deref + Send + Sync,
175 F: 'static + Deref + Send + Sync,
176 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
177 L: 'static + Deref + Send + Sync,
178 P: 'static + Deref + Send + Sync,
179 Descriptor: 'static + SocketDescriptor + Send + Sync,
180 CMH: 'static + Deref + Send + Sync,
181 RMH: 'static + Deref + Send + Sync,
182 EH: 'static + EventHandler + Send,
183 PS: 'static + Deref + Send,
184 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
185 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
186 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
187 UMH: 'static + Deref + Send + Sync,
188 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
189 S: 'static + Deref<Target = SC> + Send + Sync,
190 SC: WriteableScore<'a>,
191 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send
193 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
194 p2p_gossip_sync: Option<PGS>, peer_manager: PM, logger: L, scorer: Option<S>,
195 rapid_gossip_sync: Option<RGS>
198 CA::Target: 'static + chain::Access,
199 CF::Target: 'static + chain::Filter,
200 CW::Target: 'static + chain::Watch<Signer>,
201 T::Target: 'static + BroadcasterInterface,
202 K::Target: 'static + KeysInterface<Signer = Signer>,
203 F::Target: 'static + FeeEstimator,
204 L::Target: 'static + Logger,
205 P::Target: 'static + Persist<Signer>,
206 CMH::Target: 'static + ChannelMessageHandler,
207 RMH::Target: 'static + RoutingMessageHandler,
208 UMH::Target: 'static + CustomMessageHandler,
209 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
211 let stop_thread = Arc::new(AtomicBool::new(false));
212 let stop_thread_clone = stop_thread.clone();
213 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
214 let event_handler = DecoratingEventHandler {
216 p2p_gossip_sync: p2p_gossip_sync.as_ref().map(|t| t.deref()),
219 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
220 channel_manager.timer_tick_occurred();
222 let mut last_freshness_call = Instant::now();
223 let mut last_ping_call = Instant::now();
224 let mut last_prune_call = Instant::now();
225 let mut last_scorer_persist_call = Instant::now();
226 let mut have_pruned = false;
229 channel_manager.process_pending_events(&event_handler);
230 chain_monitor.process_pending_events(&event_handler);
232 // Note that the PeerManager::process_events may block on ChannelManager's locks,
233 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
234 // we want to ensure we get into `persist_manager` as quickly as we can, especially
235 // without running the normal event processing above and handing events to users.
237 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
238 // processing a message effectively at any point during this loop. In order to
239 // minimize the time between such processing completing and persisting the updated
240 // ChannelManager, we want to minimize methods blocking on a ChannelManager
241 // generally, and as a fallback place such blocking only immediately before
243 peer_manager.process_events();
245 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
246 // see `await_start`'s use below.
247 let await_start = Instant::now();
248 let updates_available =
249 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
250 let await_time = await_start.elapsed();
252 if updates_available {
253 log_trace!(logger, "Persisting ChannelManager...");
254 persister.persist_manager(&*channel_manager)?;
255 log_trace!(logger, "Done persisting ChannelManager.");
257 // Exit the loop if the background processor was requested to stop.
258 if stop_thread.load(Ordering::Acquire) == true {
259 log_trace!(logger, "Terminating background processor.");
262 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
263 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
264 channel_manager.timer_tick_occurred();
265 last_freshness_call = Instant::now();
267 if await_time > Duration::from_secs(1) {
268 // On various platforms, we may be starved of CPU cycles for several reasons.
269 // E.g. on iOS, if we've been in the background, we will be entirely paused.
270 // Similarly, if we're on a desktop platform and the device has been asleep, we
271 // may not get any cycles.
272 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
273 // full second, at which point we assume sockets may have been killed (they
274 // appear to be at least on some platforms, even if it has only been a second).
275 // Note that we have to take care to not get here just because user event
276 // processing was slow at the top of the loop. For example, the sample client
277 // may call Bitcoin Core RPCs during event handling, which very often takes
278 // more than a handful of seconds to complete, and shouldn't disconnect all our
280 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
281 peer_manager.disconnect_all_peers();
282 last_ping_call = Instant::now();
283 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
284 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
285 peer_manager.timer_tick_occurred();
286 last_ping_call = Instant::now();
289 // Note that we want to run a graph prune once not long after startup before
290 // falling back to our usual hourly prunes. This avoids short-lived clients never
291 // pruning their network graph. We run once 60 seconds after startup before
292 // continuing our normal cadence.
293 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
294 // The network graph must not be pruned while rapid sync completion is pending
295 log_trace!(logger, "Assessing prunability of network graph");
296 let graph_to_prune = match rapid_gossip_sync.as_ref() {
297 Some(rapid_sync) => {
298 if rapid_sync.is_initial_sync_complete() {
299 Some(rapid_sync.network_graph())
304 None => p2p_gossip_sync.as_ref().map(|sync| sync.network_graph())
307 if let Some(network_graph_reference) = graph_to_prune {
308 network_graph_reference.remove_stale_channels();
310 if let Err(e) = persister.persist_graph(network_graph_reference) {
311 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
314 last_prune_call = Instant::now();
317 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
321 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
322 if let Some(ref scorer) = scorer {
323 log_trace!(logger, "Persisting scorer");
324 if let Err(e) = persister.persist_scorer(&scorer) {
325 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
328 last_scorer_persist_call = Instant::now();
332 // After we exit, ensure we persist the ChannelManager one final time - this avoids
333 // some races where users quit while channel updates were in-flight, with
334 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
335 persister.persist_manager(&*channel_manager)?;
337 // Persist Scorer on exit
338 if let Some(ref scorer) = scorer {
339 persister.persist_scorer(&scorer)?;
342 // Persist NetworkGraph on exit
343 if let Some(ref gossip_sync) = p2p_gossip_sync {
344 persister.persist_graph(gossip_sync.network_graph())?;
349 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
352 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
353 /// [`ChannelManager`].
357 /// This function panics if the background thread has panicked such as while persisting or
360 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
361 pub fn join(mut self) -> Result<(), std::io::Error> {
362 assert!(self.thread_handle.is_some());
366 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
367 /// [`ChannelManager`].
371 /// This function panics if the background thread has panicked such as while persisting or
374 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
375 pub fn stop(mut self) -> Result<(), std::io::Error> {
376 assert!(self.thread_handle.is_some());
377 self.stop_and_join_thread()
380 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
381 self.stop_thread.store(true, Ordering::Release);
385 fn join_thread(&mut self) -> Result<(), std::io::Error> {
386 match self.thread_handle.take() {
387 Some(handle) => handle.join().unwrap(),
393 impl Drop for BackgroundProcessor {
395 self.stop_and_join_thread().unwrap();
401 use bitcoin::blockdata::block::BlockHeader;
402 use bitcoin::blockdata::constants::genesis_block;
403 use bitcoin::blockdata::transaction::{Transaction, TxOut};
404 use bitcoin::network::constants::Network;
405 use lightning::chain::{BestBlock, Confirm, chainmonitor};
406 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
407 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
408 use lightning::chain::transaction::OutPoint;
409 use lightning::get_event_msg;
410 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
411 use lightning::ln::features::{ChannelFeatures, InitFeatures};
412 use lightning::ln::msgs::{ChannelMessageHandler, Init};
413 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
414 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
415 use lightning::util::config::UserConfig;
416 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
417 use lightning::util::ser::Writeable;
418 use lightning::util::test_utils;
419 use lightning::util::persist::KVStorePersister;
420 use lightning_invoice::payment::{InvoicePayer, Retry};
421 use lightning_invoice::utils::DefaultRouter;
422 use lightning_persister::FilesystemPersister;
424 use std::path::PathBuf;
425 use std::sync::{Arc, Mutex};
426 use std::sync::mpsc::SyncSender;
427 use std::time::Duration;
428 use lightning::routing::scoring::{FixedPenaltyScorer};
429 use lightning_rapid_gossip_sync::RapidGossipSync;
430 use super::{BackgroundProcessor, FRESHNESS_TIMER};
432 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
434 #[derive(Clone, Eq, Hash, PartialEq)]
435 struct TestDescriptor{}
436 impl SocketDescriptor for TestDescriptor {
437 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
441 fn disconnect_socket(&mut self) {}
444 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
447 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
448 p2p_gossip_sync: Option<Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>>,
449 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
450 chain_monitor: Arc<ChainMonitor>,
451 persister: Arc<FilesystemPersister>,
452 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
453 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
454 logger: Arc<test_utils::TestLogger>,
455 best_block: BestBlock,
456 scorer: Arc<Mutex<FixedPenaltyScorer>>,
457 rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
462 let data_dir = self.persister.get_data_dir();
463 match fs::remove_dir_all(data_dir.clone()) {
464 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
471 graph_error: Option<(std::io::ErrorKind, &'static str)>,
472 graph_persistence_notifier: Option<SyncSender<()>>,
473 manager_error: Option<(std::io::ErrorKind, &'static str)>,
474 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
475 filesystem_persister: FilesystemPersister,
479 fn new(data_dir: String) -> Self {
480 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
481 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
484 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
485 Self { graph_error: Some((error, message)), ..self }
488 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
489 Self { graph_persistence_notifier: Some(sender), ..self }
492 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
493 Self { manager_error: Some((error, message)), ..self }
496 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
497 Self { scorer_error: Some((error, message)), ..self }
501 impl KVStorePersister for Persister {
502 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
503 if key == "manager" {
504 if let Some((error, message)) = self.manager_error {
505 return Err(std::io::Error::new(error, message))
509 if key == "network_graph" {
510 if let Some(sender) = &self.graph_persistence_notifier {
511 sender.send(()).unwrap();
514 if let Some((error, message)) = self.graph_error {
515 return Err(std::io::Error::new(error, message))
520 if let Some((error, message)) = self.scorer_error {
521 return Err(std::io::Error::new(error, message))
525 self.filesystem_persister.persist(key, object)
529 fn get_full_filepath(filepath: String, filename: String) -> String {
530 let mut path = PathBuf::from(filepath);
532 path.to_str().unwrap().to_string()
535 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
536 let mut nodes = Vec::new();
537 for i in 0..num_nodes {
538 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
539 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
540 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
541 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
542 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
543 let seed = [i as u8; 32];
544 let network = Network::Testnet;
545 let genesis_block = genesis_block(network);
546 let now = Duration::from_secs(genesis_block.header.time as u64);
547 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
548 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
549 let best_block = BestBlock::from_genesis(network);
550 let params = ChainParameters { network, best_block };
551 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
552 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
553 let p2p_gossip_sync = Some(Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
554 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
555 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
556 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
557 let rapid_gossip_sync = None;
558 let node = Node { node: manager, p2p_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer, rapid_gossip_sync };
562 for i in 0..num_nodes {
563 for j in (i+1)..num_nodes {
564 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
565 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
572 macro_rules! open_channel {
573 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
574 begin_open_channel!($node_a, $node_b, $channel_value);
575 let events = $node_a.node.get_and_clear_pending_events();
576 assert_eq!(events.len(), 1);
577 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
578 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
583 macro_rules! begin_open_channel {
584 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
585 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
586 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
587 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
591 macro_rules! handle_funding_generation_ready {
592 ($event: expr, $channel_value: expr) => {{
594 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
595 assert_eq!(channel_value_satoshis, $channel_value);
596 assert_eq!(user_channel_id, 42);
598 let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
599 value: channel_value_satoshis, script_pubkey: output_script.clone(),
601 (temporary_channel_id, tx)
603 _ => panic!("Unexpected event"),
608 macro_rules! end_open_channel {
609 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
610 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
611 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
612 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
616 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
618 let prev_blockhash = node.best_block.block_hash();
619 let height = node.best_block.height() + 1;
620 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
621 let txdata = vec![(0, tx)];
622 node.best_block = BestBlock::new(header.block_hash(), height);
625 node.node.transactions_confirmed(&header, &txdata, height);
626 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
629 node.node.best_block_updated(&header, height);
630 node.chain_monitor.best_block_updated(&header, height);
636 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
637 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
641 fn test_background_processor() {
642 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
643 // updates. Also test that when new updates are available, the manager signals that it needs
644 // re-persistence and is successfully re-persisted.
645 let nodes = create_nodes(2, "test_background_processor".to_string());
647 // Go through the channel creation process so that each node has something to persist. Since
648 // open_channel consumes events, it must complete before starting BackgroundProcessor to
649 // avoid a race with processing events.
650 let tx = open_channel!(nodes[0], nodes[1], 100000);
652 // Initiate the background processors to watch each node.
653 let data_dir = nodes[0].persister.get_data_dir();
654 let persister = Arc::new(Persister::new(data_dir));
655 let event_handler = |_: &_| {};
656 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
658 macro_rules! check_persisted_data {
659 ($node: expr, $filepath: expr) => {
660 let mut expected_bytes = Vec::new();
662 expected_bytes.clear();
663 match $node.write(&mut expected_bytes) {
665 match std::fs::read($filepath) {
667 if bytes == expected_bytes {
676 Err(e) => panic!("Unexpected error: {}", e)
682 // Check that the initial channel manager data is persisted as expected.
683 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
684 check_persisted_data!(nodes[0].node, filepath.clone());
687 if !nodes[0].node.get_persistence_condvar_value() { break }
690 // Force-close the channel.
691 nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
693 // Check that the force-close updates are persisted.
694 check_persisted_data!(nodes[0].node, filepath.clone());
696 if !nodes[0].node.get_persistence_condvar_value() { break }
699 // Check network graph is persisted
700 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
701 if let Some(ref handler) = nodes[0].p2p_gossip_sync {
702 let network_graph = handler.network_graph();
703 check_persisted_data!(network_graph, filepath.clone());
706 // Check scorer is persisted
707 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
708 check_persisted_data!(nodes[0].scorer, filepath.clone());
710 assert!(bg_processor.stop().is_ok());
714 fn test_timer_tick_called() {
715 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
716 // `FRESHNESS_TIMER`.
717 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
718 let data_dir = nodes[0].persister.get_data_dir();
719 let persister = Arc::new(Persister::new(data_dir));
720 let event_handler = |_: &_| {};
721 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
723 let log_entries = nodes[0].logger.lines.lock().unwrap();
724 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
725 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
726 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
727 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
732 assert!(bg_processor.stop().is_ok());
736 fn test_channel_manager_persist_error() {
737 // Test that if we encounter an error during manager persistence, the thread panics.
738 let nodes = create_nodes(2, "test_persist_error".to_string());
739 open_channel!(nodes[0], nodes[1], 100000);
741 let data_dir = nodes[0].persister.get_data_dir();
742 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
743 let event_handler = |_: &_| {};
744 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
745 match bg_processor.join() {
746 Ok(_) => panic!("Expected error persisting manager"),
748 assert_eq!(e.kind(), std::io::ErrorKind::Other);
749 assert_eq!(e.get_ref().unwrap().to_string(), "test");
755 fn test_network_graph_persist_error() {
756 // Test that if we encounter an error during network graph persistence, an error gets returned.
757 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
758 let data_dir = nodes[0].persister.get_data_dir();
759 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
760 let event_handler = |_: &_| {};
761 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
763 match bg_processor.stop() {
764 Ok(_) => panic!("Expected error persisting network graph"),
766 assert_eq!(e.kind(), std::io::ErrorKind::Other);
767 assert_eq!(e.get_ref().unwrap().to_string(), "test");
773 fn test_scorer_persist_error() {
774 // Test that if we encounter an error during scorer persistence, an error gets returned.
775 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
776 let data_dir = nodes[0].persister.get_data_dir();
777 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
778 let event_handler = |_: &_| {};
779 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
781 match bg_processor.stop() {
782 Ok(_) => panic!("Expected error persisting scorer"),
784 assert_eq!(e.kind(), std::io::ErrorKind::Other);
785 assert_eq!(e.get_ref().unwrap().to_string(), "test");
791 fn test_background_event_handling() {
792 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
793 let channel_value = 100000;
794 let data_dir = nodes[0].persister.get_data_dir();
795 let persister = Arc::new(Persister::new(data_dir.clone()));
797 // Set up a background event handler for FundingGenerationReady events.
798 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
799 let event_handler = move |event: &Event| {
800 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
802 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
804 // Open a channel and check that the FundingGenerationReady event was handled.
805 begin_open_channel!(nodes[0], nodes[1], channel_value);
806 let (temporary_channel_id, funding_tx) = receiver
807 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
808 .expect("FundingGenerationReady not handled within deadline");
809 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
811 // Confirm the funding transaction.
812 confirm_transaction(&mut nodes[0], &funding_tx);
813 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
814 confirm_transaction(&mut nodes[1], &funding_tx);
815 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
816 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
817 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
818 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
819 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
821 assert!(bg_processor.stop().is_ok());
823 // Set up a background event handler for SpendableOutputs events.
824 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
825 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
826 let persister = Arc::new(Persister::new(data_dir));
827 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
829 // Force close the channel and check that the SpendableOutputs event was handled.
830 nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
831 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
832 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
834 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
835 .expect("SpendableOutputs not handled within deadline");
837 Event::SpendableOutputs { .. } => {},
838 Event::ChannelClosed { .. } => {},
839 _ => panic!("Unexpected event: {:?}", event),
842 assert!(bg_processor.stop().is_ok());
846 fn test_scorer_persistence() {
847 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
848 let data_dir = nodes[0].persister.get_data_dir();
849 let persister = Arc::new(Persister::new(data_dir));
850 let event_handler = |_: &_| {};
851 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
854 let log_entries = nodes[0].logger.lines.lock().unwrap();
855 let expected_log = "Persisting scorer".to_string();
856 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
861 assert!(bg_processor.stop().is_ok());
865 fn test_not_pruning_network_graph_until_graph_sync_completion() {
866 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
867 let data_dir = nodes[0].persister.get_data_dir();
868 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
869 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
870 let network_graph = nodes[0].network_graph.clone();
871 let rapid_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
872 let features = ChannelFeatures::empty();
873 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
874 .expect("Failed to update channel from partial announcement");
875 let original_graph_description = network_graph.to_string();
876 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
877 assert_eq!(network_graph.read_only().channels().len(), 1);
879 let event_handler = |_: &_| {};
880 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), Some(rapid_sync.clone()));
883 let log_entries = nodes[0].logger.lines.lock().unwrap();
884 let expected_log_a = "Assessing prunability of network graph".to_string();
885 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
886 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
887 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
892 let initialization_input = vec![
893 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
894 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
895 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
896 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
897 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
898 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
899 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
900 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
901 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
902 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
903 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
904 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
905 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
907 rapid_sync.update_network_graph(&initialization_input[..]).unwrap();
909 // this should have added two channels
910 assert_eq!(network_graph.read_only().channels().len(), 3);
913 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
914 .expect("Network graph not pruned within deadline");
916 background_processor.stop().unwrap();
918 // all channels should now be pruned
919 assert_eq!(network_graph.read_only().channels().len(), 0);
923 fn test_invoice_payer() {
924 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
925 let random_seed_bytes = keys_manager.get_secure_random_bytes();
926 let nodes = create_nodes(2, "test_invoice_payer".to_string());
928 // Initiate the background processors to watch each node.
929 let data_dir = nodes[0].persister.get_data_dir();
930 let persister = Arc::new(Persister::new(data_dir));
931 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
932 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
933 let event_handler = Arc::clone(&invoice_payer);
934 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
935 assert!(bg_processor.stop().is_ok());