From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Fri, 19 Feb 2021 20:37:24 +0000 (-0800) Subject: Merge pull request #752 from valentinewallace/chanman-persistence X-Git-Tag: v0.0.13~25 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=041d7aaa9da9b4e5cc0fb3276edf71301014fe41;hp=13e990e6ae0f130534aceadc008e964822b39b14;p=rust-lightning Merge pull request #752 from valentinewallace/chanman-persistence ChannelManager persistence --- diff --git a/Cargo.toml b/Cargo.toml index 96f4b1d1..1ffd75a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "lightning-block-sync", "lightning-net-tokio", "lightning-persister", + "background-processor", ] # Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it. diff --git a/background-processor/Cargo.toml b/background-processor/Cargo.toml new file mode 100644 index 00000000..71fbbbff --- /dev/null +++ b/background-processor/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "background-processor" +version = "0.1.0" +authors = ["Valentine Wallace "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bitcoin = "0.24" +lightning = { version = "0.0.12", path = "../lightning", features = ["allow_wallclock_use"] } +lightning-persister = { version = "0.0.1", path = "../lightning-persister" } + +[dev-dependencies] +lightning = { version = "0.0.12", path = "../lightning", features = ["_test_utils"] } diff --git a/background-processor/src/lib.rs b/background-processor/src/lib.rs new file mode 100644 index 00000000..63b9ef31 --- /dev/null +++ b/background-processor/src/lib.rs @@ -0,0 +1,294 @@ +#[macro_use] extern crate lightning; + +use lightning::chain; +use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use lightning::chain::keysinterface::{ChannelKeys, KeysInterface}; +use lightning::ln::channelmanager::ChannelManager; +use lightning::util::logger::Logger; +use lightning::util::ser::Writeable; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; + +/// BackgroundProcessor takes care of tasks that (1) need to happen periodically to keep +/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its +/// responsibilities are: +/// * Monitoring whether the ChannelManager needs to be re-persisted to disk, and if so, +/// writing it to disk/backups by invoking the callback given to it at startup. +/// ChannelManager persistence should be done in the background. +/// * Calling `ChannelManager::timer_chan_freshness_every_min()` every minute (can be done in the +/// background). +/// +/// Note that if ChannelManager persistence fails and the persisted manager becomes out-of-date, +/// then there is a risk of channels force-closing on startup when the manager realizes it's +/// outdated. However, as long as `ChannelMonitor` backups are sound, no funds besides those used +/// for unilateral chain closure fees are at risk. +pub struct BackgroundProcessor { + stop_thread: Arc, + /// May be used to retrieve and handle the error if `BackgroundProcessor`'s thread + /// exits due to an error while persisting. + pub thread_handle: JoinHandle>, +} + +#[cfg(not(test))] +const CHAN_FRESHNESS_TIMER: u64 = 60; +#[cfg(test)] +const CHAN_FRESHNESS_TIMER: u64 = 1; + +impl BackgroundProcessor { + /// Start a background thread that takes care of responsibilities enumerated in the top-level + /// documentation. + /// + /// If `persist_manager` returns an error, then this thread will return said error (and `start()` + /// will need to be called again to restart the `BackgroundProcessor`). Users should wait on + /// [`thread_handle`]'s `join()` method to be able to tell if and when an error is returned, or + /// implement `persist_manager` such that an error is never returned to the `BackgroundProcessor` + /// + /// `persist_manager` is responsible for writing out the `ChannelManager` to disk, and/or uploading + /// to one or more backup services. See [`ChannelManager::write`] for writing out a `ChannelManager`. + /// See [`FilesystemPersister::persist_manager`] for Rust-Lightning's provided implementation. + /// + /// [`thread_handle`]: struct.BackgroundProcessor.html#structfield.thread_handle + /// [`ChannelManager::write`]: ../lightning/ln/channelmanager/struct.ChannelManager.html#method.write + /// [`FilesystemPersister::persist_manager`]: ../lightning_persister/struct.FilesystemPersister.html#impl + pub fn start(persist_manager: PM, manager: Arc, Arc, Arc, Arc, Arc>>, logger: Arc) -> Self + where ChanSigner: 'static + ChannelKeys + Writeable, + M: 'static + chain::Watch, + T: 'static + BroadcasterInterface, + K: 'static + KeysInterface, + F: 'static + FeeEstimator, + L: 'static + Logger, + PM: 'static + Send + Fn(&ChannelManager, Arc, Arc, Arc, Arc>) -> Result<(), std::io::Error>, + { + let stop_thread = Arc::new(AtomicBool::new(false)); + let stop_thread_clone = stop_thread.clone(); + let handle = thread::spawn(move || -> Result<(), std::io::Error> { + let mut current_time = Instant::now(); + loop { + let updates_available = manager.wait_timeout(Duration::from_millis(100)); + if updates_available { + persist_manager(&*manager)?; + } + // Exit the loop if the background processor was requested to stop. + if stop_thread.load(Ordering::Acquire) == true { + log_trace!(logger, "Terminating background processor."); + return Ok(()) + } + if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER { + log_trace!(logger, "Calling manager's timer_chan_freshness_every_min"); + manager.timer_chan_freshness_every_min(); + current_time = Instant::now(); + } + } + }); + Self { + stop_thread: stop_thread_clone, + thread_handle: handle, + } + } + + /// Stop `BackgroundProcessor`'s thread. + pub fn stop(self) -> Result<(), std::io::Error> { + self.stop_thread.store(true, Ordering::Release); + self.thread_handle.join().unwrap() + } +} + +#[cfg(test)] +mod tests { + use bitcoin::blockdata::constants::genesis_block; + use bitcoin::blockdata::transaction::{Transaction, TxOut}; + use bitcoin::network::constants::Network; + use lightning::chain; + use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; + use lightning::chain::chainmonitor; + use lightning::chain::keysinterface::{ChannelKeys, InMemoryChannelKeys, KeysInterface, KeysManager}; + use lightning::chain::transaction::OutPoint; + use lightning::get_event_msg; + use lightning::ln::channelmanager::{ChannelManager, SimpleArcChannelManager}; + use lightning::ln::features::InitFeatures; + use lightning::ln::msgs::ChannelMessageHandler; + use lightning::util::config::UserConfig; + use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent}; + use lightning::util::logger::Logger; + use lightning::util::ser::Writeable; + use lightning::util::test_utils; + use lightning_persister::FilesystemPersister; + use std::fs; + use std::path::PathBuf; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + use super::BackgroundProcessor; + + type ChainMonitor = chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; + + struct Node { + node: SimpleArcChannelManager, + persister: Arc, + logger: Arc, + } + + impl Drop for Node { + fn drop(&mut self) { + let data_dir = self.persister.get_data_dir(); + match fs::remove_dir_all(data_dir.clone()) { + Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e), + _ => {} + } + } + } + + fn get_full_filepath(filepath: String, filename: String) -> String { + let mut path = PathBuf::from(filepath); + path.push(filename); + path.to_str().unwrap().to_string() + } + + fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec { + let mut nodes = Vec::new(); + for i in 0..num_nodes { + let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())}); + let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }); + let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); + let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); + let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); + let seed = [i as u8; 32]; + let network = Network::Testnet; + let now = Duration::from_secs(genesis_block(network).header.time as u64); + let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); + let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); + let manager = Arc::new(ChannelManager::new(Network::Testnet, fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), i)); + let node = Node { node: manager, persister, logger }; + nodes.push(node); + } + nodes + } + + macro_rules! open_channel { + ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ + $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap(); + $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); + $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); + let events = $node_a.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + let (temporary_channel_id, tx, funding_output) = match events[0] { + Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => { + assert_eq!(*channel_value_satoshis, $channel_value); + assert_eq!(user_channel_id, 42); + + let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut { + value: *channel_value_satoshis, script_pubkey: output_script.clone(), + }]}; + let funding_outpoint = OutPoint { txid: tx.txid(), index: 0 }; + (*temporary_channel_id, tx, funding_outpoint) + }, + _ => panic!("Unexpected event"), + }; + + $node_a.node.funding_transaction_generated(&temporary_channel_id, funding_output); + $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id())); + $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id())); + tx + }} + } + + #[test] + fn test_background_processor() { + // Test that when a new channel is created, the ChannelManager needs to be re-persisted with + // updates. Also test that when new updates are available, the manager signals that it needs + // re-persistence and is successfully re-persisted. + let nodes = create_nodes(2, "test_background_processor".to_string()); + + // Initiate the background processors to watch each node. + let data_dir = nodes[0].persister.get_data_dir(); + let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); + let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone()); + + // Go through the channel creation process until each node should have something persisted. + let tx = open_channel!(nodes[0], nodes[1], 100000); + + macro_rules! check_persisted_data { + ($node: expr, $filepath: expr, $expected_bytes: expr) => { + match $node.write(&mut $expected_bytes) { + Ok(()) => { + loop { + match std::fs::read($filepath) { + Ok(bytes) => { + if bytes == $expected_bytes { + break + } else { + continue + } + }, + Err(_) => continue + } + } + }, + Err(e) => panic!("Unexpected error: {}", e) + } + } + } + + // Check that the initial channel manager data is persisted as expected. + let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string()); + let mut expected_bytes = Vec::new(); + check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes); + loop { + if !nodes[0].node.get_persistence_condvar_value() { break } + } + + // Force-close the channel. + nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id()).unwrap(); + + // Check that the force-close updates are persisted. + let mut expected_bytes = Vec::new(); + check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes); + loop { + if !nodes[0].node.get_persistence_condvar_value() { break } + } + + assert!(bg_processor.stop().is_ok()); + } + + #[test] + fn test_chan_freshness_called() { + // Test that ChannelManager's `timer_chan_freshness_every_min` is called every + // `CHAN_FRESHNESS_TIMER`. + let nodes = create_nodes(1, "test_chan_freshness_called".to_string()); + let data_dir = nodes[0].persister.get_data_dir(); + let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); + let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone()); + loop { + let log_entries = nodes[0].logger.lines.lock().unwrap(); + let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string(); + if log_entries.get(&("background_processor".to_string(), desired_log)).is_some() { + break + } + } + + assert!(bg_processor.stop().is_ok()); + } + + #[test] + fn test_persist_error() { + // Test that if we encounter an error during manager persistence, the thread panics. + fn persist_manager(_data: &ChannelManager, Arc, Arc, Arc, Arc>) -> Result<(), std::io::Error> + where ChanSigner: 'static + ChannelKeys + Writeable, + M: 'static + chain::Watch, + T: 'static + BroadcasterInterface, + K: 'static + KeysInterface, + F: 'static + FeeEstimator, + L: 'static + Logger, + { + Err(std::io::Error::new(std::io::ErrorKind::Other, "test")) + } + + let nodes = create_nodes(2, "test_persist_error".to_string()); + let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone()); + open_channel!(nodes[0], nodes[1], 100000); + + let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test"); + } +} diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml index 63b69a1e..d8be01c9 100644 --- a/lightning-persister/Cargo.toml +++ b/lightning-persister/Cargo.toml @@ -12,6 +12,9 @@ bitcoin = "0.24" lightning = { version = "0.0.12", path = "../lightning" } libc = "0.2" +[target.'cfg(windows)'.dependencies] +winapi = { version = "0.3", features = ["winbase"] } + [dev-dependencies.bitcoin] version = "0.24" features = ["bitcoinconsensus"] diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 7307ef0d..0257eb50 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -1,20 +1,26 @@ +mod util; + extern crate lightning; extern crate bitcoin; extern crate libc; use bitcoin::hashes::hex::ToHex; +use crate::util::DiskWriteable; +use lightning::chain; +use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr}; use lightning::chain::channelmonitor; -use lightning::chain::keysinterface::ChannelKeys; +use lightning::chain::keysinterface::{ChannelKeys, KeysInterface}; use lightning::chain::transaction::OutPoint; +use lightning::ln::channelmanager::ChannelManager; +use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use std::fs; use std::io::Error; -use std::path::{Path, PathBuf}; +use std::sync::Arc; #[cfg(test)] use { - lightning::chain::keysinterface::KeysInterface, lightning::util::ser::ReadableArgs, bitcoin::{BlockHash, Txid}, bitcoin::hashes::hex::FromHex, @@ -22,9 +28,6 @@ use { std::io::Cursor }; -#[cfg(not(target_os = "windows"))] -use std::os::unix::io::AsRawFd; - /// FilesystemPersister persists channel data on disk, where each channel's /// data is stored in a file named after its funding outpoint. /// @@ -41,13 +44,22 @@ pub struct FilesystemPersister { path_to_channel_data: String, } -trait DiskWriteable { - fn write(&self, writer: &mut fs::File) -> Result<(), Error>; +impl DiskWriteable for ChannelMonitor { + fn write_to_file(&self, writer: &mut fs::File) -> Result<(), Error> { + self.write(writer) + } } -impl DiskWriteable for ChannelMonitor { - fn write(&self, writer: &mut fs::File) -> Result<(), Error> { - Writeable::write(self, writer) +impl DiskWriteable for ChannelManager, Arc, Arc, Arc, Arc> +where ChanSigner: ChannelKeys + Writeable, + M: chain::Watch, + T: BroadcasterInterface, + K: KeysInterface, + F: FeeEstimator, + L: Logger, +{ + fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> { + self.write(writer) } } @@ -60,39 +72,24 @@ impl FilesystemPersister { } } - fn get_full_filepath(&self, funding_txo: OutPoint) -> String { - let mut path = PathBuf::from(&self.path_to_channel_data); - path.push(format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index)); - path.to_str().unwrap().to_string() + pub fn get_data_dir(&self) -> String { + self.path_to_channel_data.clone() } - // Utility to write a file to disk. - fn write_channel_data(&self, funding_txo: OutPoint, monitor: &dyn DiskWriteable) -> std::io::Result<()> { - fs::create_dir_all(&self.path_to_channel_data)?; - // Do a crazy dance with lots of fsync()s to be overly cautious here... - // We never want to end up in a state where we've lost the old data, or end up using the - // old data on power loss after we've returned. - // The way to atomically write a file on Unix platforms is: - // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) - let filename = self.get_full_filepath(funding_txo); - let tmp_filename = format!("{}.tmp", filename.clone()); - - { - // Note that going by rust-lang/rust@d602a6b, on MacOS it is only safe to use - // rust stdlib 1.36 or higher. - let mut f = fs::File::create(&tmp_filename)?; - monitor.write(&mut f)?; - f.sync_all()?; - } - fs::rename(&tmp_filename, &filename)?; - // Fsync the parent directory on Unix. - #[cfg(not(target_os = "windows"))] - { - let path = Path::new(&filename).parent().unwrap(); - let dir_file = fs::OpenOptions::new().read(true).open(path)?; - unsafe { libc::fsync(dir_file.as_raw_fd()); } - } - Ok(()) + /// Writes the provided `ChannelManager` to the path provided at `FilesystemPersister` + /// initialization, within a file called "manager". + pub fn persist_manager( + data_dir: String, + manager: &ChannelManager, Arc, Arc, Arc, Arc> + ) -> Result<(), std::io::Error> + where ChanSigner: ChannelKeys + Writeable, + M: chain::Watch, + T: BroadcasterInterface, + K: KeysInterface, + F: FeeEstimator, + L: Logger + { + util::write_to_file(data_dir, "manager".to_string(), manager) } #[cfg(test)] @@ -132,28 +129,18 @@ impl FilesystemPersister { impl channelmonitor::Persist for FilesystemPersister { fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { - self.write_channel_data(funding_txo, monitor) + let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); + util::write_to_file(self.path_to_channel_data.clone(), filename, monitor) .map_err(|_| ChannelMonitorUpdateErr::PermanentFailure) } fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { - self.write_channel_data(funding_txo, monitor) + let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); + util::write_to_file(self.path_to_channel_data.clone(), filename, monitor) .map_err(|_| ChannelMonitorUpdateErr::PermanentFailure) } } -#[cfg(test)] -impl Drop for FilesystemPersister { - fn drop(&mut self) { - // We test for invalid directory names, so it's OK if directory removal - // fails. - match fs::remove_dir_all(&self.path_to_channel_data) { - Err(e) => println!("Failed to remove test persister directory: {}", e), - _ => {} - } - } -} - #[cfg(test)] mod tests { extern crate lightning; @@ -162,8 +149,6 @@ mod tests { use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::hashes::hex::FromHex; use bitcoin::Txid; - use DiskWriteable; - use Error; use lightning::chain::channelmonitor::{Persist, ChannelMonitorUpdateErr}; use lightning::chain::transaction::OutPoint; use lightning::{check_closed_broadcast, check_added_monitors}; @@ -171,20 +156,22 @@ mod tests { use lightning::ln::functional_test_utils::*; use lightning::ln::msgs::ErrorAction; use lightning::util::events::{MessageSendEventsProvider, MessageSendEvent}; - use lightning::util::ser::Writer; use lightning::util::test_utils; use std::fs; - use std::io; #[cfg(target_os = "windows")] use { lightning::get_event_msg, lightning::ln::msgs::ChannelMessageHandler, }; - struct TestWriteable{} - impl DiskWriteable for TestWriteable { - fn write(&self, writer: &mut fs::File) -> Result<(), Error> { - writer.write_all(&[42; 1]) + impl Drop for FilesystemPersister { + fn drop(&mut self) { + // We test for invalid directory names, so it's OK if directory removal + // fails. + match fs::remove_dir_all(&self.path_to_channel_data) { + Err(e) => println!("Failed to remove test persister directory: {}", e), + _ => {} + } } } @@ -256,88 +243,6 @@ mod tests { check_persisted_data!(11); } - // Test that if the persister's path to channel data is read-only, writing - // data to it fails. Windows ignores the read-only flag for folders, so this - // test is Unix-only. - #[cfg(not(target_os = "windows"))] - #[test] - fn test_readonly_dir() { - let persister = FilesystemPersister::new("test_readonly_dir_persister".to_string()); - let test_writeable = TestWriteable{}; - let test_txo = OutPoint { - txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), - index: 0 - }; - // Create the persister's directory and set it to read-only. - let path = &persister.path_to_channel_data; - fs::create_dir_all(path).unwrap(); - let mut perms = fs::metadata(path).unwrap().permissions(); - perms.set_readonly(true); - fs::set_permissions(path, perms).unwrap(); - match persister.write_channel_data(test_txo, &test_writeable) { - Err(e) => assert_eq!(e.kind(), io::ErrorKind::PermissionDenied), - _ => panic!("Unexpected error message") - } - } - - // Test failure to rename in the process of atomically creating a channel - // monitor's file. We induce this failure by making the `tmp` file a - // directory. - // Explanation: given "from" = the file being renamed, "to" = the - // renamee that already exists: Windows should fail because it'll fail - // whenever "to" is a directory, and Unix should fail because if "from" is a - // file, then "to" is also required to be a file. - #[test] - fn test_rename_failure() { - let persister = FilesystemPersister::new("test_rename_failure".to_string()); - let test_writeable = TestWriteable{}; - let txid_hex = "8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be"; - let outp_idx = 0; - let test_txo = OutPoint { - txid: Txid::from_hex(txid_hex).unwrap(), - index: outp_idx, - }; - // Create the channel data file and make it a directory. - let path = &persister.path_to_channel_data; - fs::create_dir_all(format!("{}/{}_{}", path, txid_hex, outp_idx)).unwrap(); - match persister.write_channel_data(test_txo, &test_writeable) { - Err(e) => { - #[cfg(not(target_os = "windows"))] - assert_eq!(e.kind(), io::ErrorKind::Other); - #[cfg(target_os = "windows")] - assert_eq!(e.kind(), io::ErrorKind::PermissionDenied); - } - _ => panic!("Unexpected error message") - } - } - - // Test failure to create the temporary file in the persistence process. - // We induce this failure by having the temp file already exist and be a - // directory. - #[test] - fn test_tmp_file_creation_failure() { - let persister = FilesystemPersister::new("test_tmp_file_creation_failure".to_string()); - let test_writeable = TestWriteable{}; - let txid_hex = "8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be"; - let outp_idx = 0; - let test_txo = OutPoint { - txid: Txid::from_hex(txid_hex).unwrap(), - index: outp_idx, - }; - // Create the tmp file and make it a directory. - let path = &persister.path_to_channel_data; - fs::create_dir_all(format!("{}/{}_{}.tmp", path, txid_hex, outp_idx)).unwrap(); - match persister.write_channel_data(test_txo, &test_writeable) { - Err(e) => { - #[cfg(not(target_os = "windows"))] - assert_eq!(e.kind(), io::ErrorKind::Other); - #[cfg(target_os = "windows")] - assert_eq!(e.kind(), io::ErrorKind::PermissionDenied); - } - _ => panic!("Unexpected error message") - } - } - // Test that if the persister's path to channel data is read-only, writing a // monitor to it results in the persister returning a PermanentFailure. // Windows ignores the read-only flag for folders, so this test is Unix-only. diff --git a/lightning-persister/src/util.rs b/lightning-persister/src/util.rs new file mode 100644 index 00000000..daacb00f --- /dev/null +++ b/lightning-persister/src/util.rs @@ -0,0 +1,188 @@ +#[cfg(target_os = "windows")] +extern crate winapi; + +use std::fs; +use std::path::{Path, PathBuf}; + +#[cfg(not(target_os = "windows"))] +use std::os::unix::io::AsRawFd; + +#[cfg(target_os = "windows")] +use { + std::ffi::OsStr, + std::os::windows::ffi::OsStrExt +}; + +pub(crate) trait DiskWriteable { + fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error>; +} + +pub(crate) fn get_full_filepath(filepath: String, filename: String) -> String { + let mut path = PathBuf::from(filepath); + path.push(filename); + path.to_str().unwrap().to_string() +} + +#[cfg(target_os = "windows")] +macro_rules! call { + ($e: expr) => ( + if $e != 0 { + return Ok(()) + } else { + return Err(std::io::Error::last_os_error()) + } + ) +} + +#[cfg(target_os = "windows")] +fn path_to_windows_str>(path: T) -> Vec { + path.as_ref().encode_wide().chain(Some(0)).collect() +} + +#[allow(bare_trait_objects)] +pub(crate) fn write_to_file(path: String, filename: String, data: &D) -> std::io::Result<()> { + fs::create_dir_all(path.clone())?; + // Do a crazy dance with lots of fsync()s to be overly cautious here... + // We never want to end up in a state where we've lost the old data, or end up using the + // old data on power loss after we've returned. + // The way to atomically write a file on Unix platforms is: + // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) + let filename_with_path = get_full_filepath(path, filename); + let tmp_filename = format!("{}.tmp", filename_with_path.clone()); + + { + // Note that going by rust-lang/rust@d602a6b, on MacOS it is only safe to use + // rust stdlib 1.36 or higher. + let mut f = fs::File::create(&tmp_filename)?; + data.write_to_file(&mut f)?; + f.sync_all()?; + } + // Fsync the parent directory on Unix. + #[cfg(not(target_os = "windows"))] + { + fs::rename(&tmp_filename, &filename_with_path)?; + let path = Path::new(&filename_with_path).parent().unwrap(); + let dir_file = fs::OpenOptions::new().read(true).open(path)?; + unsafe { libc::fsync(dir_file.as_raw_fd()); } + } + #[cfg(target_os = "windows")] + { + let src = PathBuf::from(tmp_filename.clone()); + let dst = PathBuf::from(filename_with_path.clone()); + if Path::new(&filename_with_path.clone()).exists() { + unsafe {winapi::um::winbase::ReplaceFileW( + path_to_windows_str(dst).as_ptr(), path_to_windows_str(src).as_ptr(), std::ptr::null(), + winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS, + std::ptr::null_mut() as *mut winapi::ctypes::c_void, + std::ptr::null_mut() as *mut winapi::ctypes::c_void + )}; + } else { + call!(unsafe {winapi::um::winbase::MoveFileExW( + path_to_windows_str(src).as_ptr(), path_to_windows_str(dst).as_ptr(), + winapi::um::winbase::MOVEFILE_WRITE_THROUGH | winapi::um::winbase::MOVEFILE_REPLACE_EXISTING + )}); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::{DiskWriteable, get_full_filepath, write_to_file}; + use std::fs; + use std::io; + use std::io::Write; + + struct TestWriteable{} + impl DiskWriteable for TestWriteable { + fn write_to_file(&self, writer: &mut fs::File) -> Result<(), io::Error> { + writer.write_all(&[42; 1]) + } + } + + // Test that if the persister's path to channel data is read-only, writing + // data to it fails. Windows ignores the read-only flag for folders, so this + // test is Unix-only. + #[cfg(not(target_os = "windows"))] + #[test] + fn test_readonly_dir() { + let test_writeable = TestWriteable{}; + let filename = "test_readonly_dir_persister_filename".to_string(); + let path = "test_readonly_dir_persister_dir"; + fs::create_dir_all(path.to_string()).unwrap(); + let mut perms = fs::metadata(path.to_string()).unwrap().permissions(); + perms.set_readonly(true); + fs::set_permissions(path.to_string(), perms).unwrap(); + match write_to_file(path.to_string(), filename, &test_writeable) { + Err(e) => assert_eq!(e.kind(), io::ErrorKind::PermissionDenied), + _ => panic!("Unexpected error message") + } + } + + // Test failure to rename in the process of atomically creating a channel + // monitor's file. We induce this failure by making the `tmp` file a + // directory. + // Explanation: given "from" = the file being renamed, "to" = the destination + // file that already exists: Unix should fail because if "from" is a file, + // then "to" is also required to be a file. + // TODO: ideally try to make this work on Windows again + #[cfg(not(target_os = "windows"))] + #[test] + fn test_rename_failure() { + let test_writeable = TestWriteable{}; + let filename = "test_rename_failure_filename"; + let path = "test_rename_failure_dir"; + // Create the channel data file and make it a directory. + fs::create_dir_all(get_full_filepath(path.to_string(), filename.to_string())).unwrap(); + match write_to_file(path.to_string(), filename.to_string(), &test_writeable) { + Err(e) => assert_eq!(e.kind(), io::ErrorKind::Other), + _ => panic!("Unexpected Ok(())") + } + fs::remove_dir_all(path).unwrap(); + } + + #[test] + fn test_diskwriteable_failure() { + struct FailingWriteable {} + impl DiskWriteable for FailingWriteable { + fn write_to_file(&self, _writer: &mut fs::File) -> Result<(), std::io::Error> { + Err(std::io::Error::new(std::io::ErrorKind::Other, "expected failure")) + } + } + + let filename = "test_diskwriteable_failure"; + let path = "test_diskwriteable_failure_dir"; + let test_writeable = FailingWriteable{}; + match write_to_file(path.to_string(), filename.to_string(), &test_writeable) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::Other); + assert_eq!(e.get_ref().unwrap().to_string(), "expected failure"); + }, + _ => panic!("unexpected result") + } + fs::remove_dir_all(path).unwrap(); + } + + // Test failure to create the temporary file in the persistence process. + // We induce this failure by having the temp file already exist and be a + // directory. + #[test] + fn test_tmp_file_creation_failure() { + let test_writeable = TestWriteable{}; + let filename = "test_tmp_file_creation_failure_filename".to_string(); + let path = "test_tmp_file_creation_failure_dir".to_string(); + + // Create the tmp file and make it a directory. + let tmp_path = get_full_filepath(path.clone(), format!("{}.tmp", filename.clone())); + fs::create_dir_all(tmp_path).unwrap(); + match write_to_file(path, filename, &test_writeable) { + Err(e) => { + #[cfg(not(target_os = "windows"))] + assert_eq!(e.kind(), io::ErrorKind::Other); + #[cfg(target_os = "windows")] + assert_eq!(e.kind(), io::ErrorKind::PermissionDenied); + } + _ => panic!("Unexpected error message") + } + } +} diff --git a/lightning/Cargo.toml b/lightning/Cargo.toml index 883aee4a..0b51a399 100644 --- a/lightning/Cargo.toml +++ b/lightning/Cargo.toml @@ -11,6 +11,7 @@ Still missing tons of error-handling. See GitHub issues for suggested projects i """ [features] +allow_wallclock_use = [] fuzztarget = ["bitcoin/fuzztarget", "regex"] # Internal test utilities exposed to other repo crates _test_utils = ["hex", "regex"] @@ -38,3 +39,6 @@ features = ["bitcoinconsensus"] [dev-dependencies] hex = "0.3" regex = "0.1.80" + +[package.metadata.docs.rs] +features = ["allow_wallclock_use"] # When https://github.com/rust-lang/rust/issues/43781 complies with our MSVR, we can add nice banners in the docs for the methods behind this feature-gate. diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 3dcc74f9..84dc486c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -58,9 +58,11 @@ use util::errors::APIError; use std::{cmp, mem}; use std::collections::{HashMap, hash_map, HashSet}; use std::io::{Cursor, Read}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock}; +use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; +#[cfg(any(test, feature = "allow_wallclock_use"))] +use std::time::Instant; use std::marker::{Sync, Send}; use std::ops::Deref; use bitcoin::hashes::hex::ToHex; @@ -437,13 +439,46 @@ pub struct ChannelManager, + persistence_notifier: PersistenceNotifier, + keys_manager: K, logger: L, } +/// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is +/// desirable to notify any listeners on `wait_timeout`/`wait` that new updates are available for +/// persistence. Therefore, this struct is responsible for locking the total consistency lock and, +/// upon going out of scope, sending the aforementioned notification (since the lock being released +/// indicates that the updates are ready for persistence). +struct PersistenceNotifierGuard<'a> { + persistence_notifier: &'a PersistenceNotifier, + // We hold onto this result so the lock doesn't get released immediately. + _read_guard: RwLockReadGuard<'a, ()>, +} + +impl<'a> PersistenceNotifierGuard<'a> { + fn new(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> Self { + let read_guard = lock.read().unwrap(); + + Self { + persistence_notifier: notifier, + _read_guard: read_guard, + } + } +} + +impl<'a> Drop for PersistenceNotifierGuard<'a> { + fn drop(&mut self) { + self.persistence_notifier.notify(); + } +} + /// The amount of time we require our counterparty wait to claim their money (ie time between when /// we, or our watchtower, must check for them having broadcast a theft transaction). pub(crate) const BREAKDOWN_TIMEOUT: u16 = 6 * 24; @@ -759,6 +794,7 @@ impl pending_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), + persistence_notifier: PersistenceNotifier::new(), keys_manager, @@ -787,7 +823,10 @@ impl let channel = Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, user_id, config)?; let res = channel.get_open_channel(self.genesis_hash.clone()); - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + // We want to make sure the lock is actually acquired by PersistenceNotifierGuard. + debug_assert!(&self.total_consistency_lock.try_write().is_err()); + let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.entry(channel.channel_id()) { hash_map::Entry::Occupied(_) => { @@ -859,7 +898,7 @@ impl /// /// May generate a SendShutdown message event on success, which should be relayed. pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let (mut failed_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); @@ -951,7 +990,7 @@ impl /// Force closes a channel, immediately broadcasting the latest local commitment transaction to /// the chain and rejecting new HTLCs on the given channel. Fails if channel_id is unknown to the manager. pub fn force_close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); self.force_close_channel_with_peer(channel_id, None) } @@ -1279,7 +1318,7 @@ impl } let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash); - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let err: Result<(), _> = loop { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -1447,7 +1486,7 @@ impl /// May panic if the funding_txo is duplicative with some other channel (note that this should /// be trivially prevented by using unique funding transaction keys per-channel). pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let (chan, msg) = { let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) { @@ -1530,7 +1569,7 @@ impl /// /// Panics if addresses is absurdly large (more than 500). pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], addresses: Vec) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); if addresses.len() > 500 { panic!("More than half the message size was taken up by public addresses!"); @@ -1560,7 +1599,7 @@ impl /// Should only really ever be called in response to a PendingHTLCsForwardable event. /// Will likely generate further events. pub fn process_pending_htlc_forwards(&self) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut new_events = Vec::new(); let mut failed_forwards = Vec::new(); @@ -1820,7 +1859,7 @@ impl /// /// This method handles all the details, and must be called roughly once per minute. pub fn timer_chan_freshness_every_min(&self) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; for (_, chan) in channel_state.by_id.iter_mut() { @@ -1845,7 +1884,7 @@ impl /// Returns false if no payment was found to fail backwards, true if the process of failing the /// HTLC backwards has been started. pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash, payment_secret: &Option) -> bool { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(*payment_hash, *payment_secret)); @@ -2024,7 +2063,7 @@ impl pub fn claim_funds(&self, payment_preimage: PaymentPreimage, payment_secret: &Option, expected_amount: u64) -> bool { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(payment_hash, *payment_secret)); @@ -2220,7 +2259,7 @@ impl /// 4) once all remote copies are updated, you call this function with the update_id that /// completed, and once it is the latest the Channel will be re-enabled. pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut close_results = Vec::new(); let mut htlc_forwards = Vec::new(); @@ -2971,7 +3010,7 @@ impl /// (C-not exported) Cause its doc(hidden) anyway #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u32) -> Result<(), APIError> { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let counterparty_node_id; let err: Result<(), _> = loop { let mut channel_state_lock = self.channel_state.lock().unwrap(); @@ -3111,7 +3150,7 @@ impl pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { let header_hash = header.block_hash(); log_trace!(self.logger, "Block {} at height {} connected", header_hash, height); - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut failed_channels = Vec::new(); let mut timed_out_htlcs = Vec::new(); { @@ -3224,7 +3263,7 @@ impl /// If necessary, the channel may be force-closed without letting the counterparty participate /// in the shutdown. pub fn block_disconnected(&self, header: &BlockHeader) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -3254,6 +3293,29 @@ impl self.latest_block_height.fetch_sub(1, Ordering::AcqRel); *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.block_hash(); } + + /// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool + /// indicating whether persistence is necessary. Only one listener on `wait_timeout` is + /// guaranteed to be woken up. + /// Note that the feature `allow_wallclock_use` must be enabled to use this function. + #[cfg(any(test, feature = "allow_wallclock_use"))] + pub fn wait_timeout(&self, max_wait: Duration) -> bool { + self.persistence_notifier.wait_timeout(max_wait) + } + + /// Blocks until ChannelManager needs to be persisted. Only one listener on `wait` is + /// guaranteed to be woken up. + pub fn wait(&self) { + self.persistence_notifier.wait() + } + + #[cfg(any(test, feature = "_test_utils"))] + pub fn get_persistence_condvar_value(&self) -> bool { + let mutcond = &self.persistence_notifier.persistence_lock; + let &(ref mtx, _) = mutcond; + let guard = mtx.lock().unwrap(); + *guard + } } impl @@ -3265,87 +3327,87 @@ impl, Condvar), +} + +impl PersistenceNotifier { + fn new() -> Self { + Self { + persistence_lock: (Mutex::new(false), Condvar::new()), + } + } + + fn wait(&self) { + loop { + let &(ref mtx, ref cvar) = &self.persistence_lock; + let mut guard = mtx.lock().unwrap(); + guard = cvar.wait(guard).unwrap(); + let result = *guard; + if result { + *guard = false; + return + } + } + } + + #[cfg(any(test, feature = "allow_wallclock_use"))] + fn wait_timeout(&self, max_wait: Duration) -> bool { + let current_time = Instant::now(); + loop { + let &(ref mtx, ref cvar) = &self.persistence_lock; + let mut guard = mtx.lock().unwrap(); + guard = cvar.wait_timeout(guard, max_wait).unwrap().0; + // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the + // desired wait time has actually passed, and if not then restart the loop with a reduced wait + // time. Note that this logic can be highly simplified through the use of + // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to + // 1.42.0. + let elapsed = current_time.elapsed(); + let result = *guard; + if result || elapsed >= max_wait { + *guard = false; + return result; + } + match max_wait.checked_sub(elapsed) { + None => return result, + Some(_) => continue + } + } + } + + // Signal to the ChannelManager persister that there are updates necessitating persisting to disk. + fn notify(&self) { + let &(ref persist_mtx, ref cnd) = &self.persistence_lock; + let mut persistence_lock = persist_mtx.lock().unwrap(); + *persistence_lock = true; + mem::drop(persistence_lock); + cnd.notify_all(); + } +} + const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; @@ -4011,6 +4136,8 @@ impl<'a, ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Der pending_events: Mutex::new(pending_events_read), total_consistency_lock: RwLock::new(()), + persistence_notifier: PersistenceNotifier::new(), + keys_manager: args.keys_manager, logger: args.logger, default_configuration: args.default_config, @@ -4026,3 +4153,54 @@ impl<'a, ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Der Ok((last_block_hash.clone(), channel_manager)) } } + +#[cfg(test)] +mod tests { + use ln::channelmanager::PersistenceNotifier; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::thread; + use std::time::Duration; + + #[test] + fn test_wait_timeout() { + let persistence_notifier = Arc::new(PersistenceNotifier::new()); + let thread_notifier = Arc::clone(&persistence_notifier); + + let exit_thread = Arc::new(AtomicBool::new(false)); + let exit_thread_clone = exit_thread.clone(); + thread::spawn(move || { + loop { + let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock; + let mut persistence_lock = persist_mtx.lock().unwrap(); + *persistence_lock = true; + cnd.notify_all(); + + if exit_thread_clone.load(Ordering::SeqCst) { + break + } + } + }); + + // Check that we can block indefinitely until updates are available. + let _ = persistence_notifier.wait(); + + // Check that the PersistenceNotifier will return after the given duration if updates are + // available. + loop { + if persistence_notifier.wait_timeout(Duration::from_millis(100)) { + break + } + } + + exit_thread.store(true, Ordering::SeqCst); + + // Check that the PersistenceNotifier will return after the given duration even if no updates + // are available. + loop { + if !persistence_notifier.wait_timeout(Duration::from_millis(100)) { + break + } + } + } +} diff --git a/lightning/src/util/macro_logger.rs b/lightning/src/util/macro_logger.rs index 2065f404..ab2b6cee 100644 --- a/lightning/src/util/macro_logger.rs +++ b/lightning/src/util/macro_logger.rs @@ -155,12 +155,17 @@ macro_rules! log_spendable { } } +/// Create a new Record and log it. You probably don't want to use this macro directly, +/// but it needs to be exported so `log_trace` etc can use it in external crates. +#[macro_export] macro_rules! log_internal { ($logger: expr, $lvl:expr, $($arg:tt)+) => ( - $logger.log(&::util::logger::Record::new($lvl, format_args!($($arg)+), module_path!(), file!(), line!())); + $logger.log(&$crate::util::logger::Record::new($lvl, format_args!($($arg)+), module_path!(), file!(), line!())); ); } +/// Log an error. +#[macro_export] macro_rules! log_error { ($logger: expr, $($arg:tt)*) => ( #[cfg(not(any(feature = "max_level_off")))] @@ -189,6 +194,8 @@ macro_rules! log_debug { ) } +/// Log a trace log. +#[macro_export] macro_rules! log_trace { ($logger: expr, $($arg:tt)*) => ( #[cfg(not(any(feature = "max_level_off", feature = "max_level_error", feature = "max_level_warn", feature = "max_level_info", feature = "max_level_debug")))] diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index 57b5f2d7..b8028ea9 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -25,8 +25,10 @@ pub(crate) mod transaction_utils; #[macro_use] pub(crate) mod ser_macros; + +/// Logging macro utilities. #[macro_use] -pub(crate) mod macro_logger; +pub mod macro_logger; // These have to come after macro_logger to build pub mod logger;