From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Fri, 19 Feb 2021 20:42:17 +0000 (-0800) Subject: Merge pull request #801 from TheBlueMatt/2021-02-789-bindings X-Git-Tag: v0.0.13~24 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=ee68ffa5fdfcfa8ee7fe70512d9e079d621083bc;hp=68811da302f3417777fce9afd4127042c495a27a;p=rust-lightning Merge pull request #801 from TheBlueMatt/2021-02-789-bindings Bindings updates for 789 --- diff --git a/Cargo.toml b/Cargo.toml index 96f4b1d17..1ffd75a87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "lightning-block-sync", "lightning-net-tokio", "lightning-persister", + "background-processor", ] # Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it. diff --git a/background-processor/Cargo.toml b/background-processor/Cargo.toml new file mode 100644 index 000000000..71fbbbff8 --- /dev/null +++ b/background-processor/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "background-processor" +version = "0.1.0" +authors = ["Valentine Wallace "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bitcoin = "0.24" +lightning = { version = "0.0.12", path = "../lightning", features = ["allow_wallclock_use"] } +lightning-persister = { version = "0.0.1", path = "../lightning-persister" } + +[dev-dependencies] +lightning = { version = "0.0.12", path = "../lightning", features = ["_test_utils"] } diff --git a/background-processor/src/lib.rs b/background-processor/src/lib.rs new file mode 100644 index 000000000..63b9ef311 --- /dev/null +++ b/background-processor/src/lib.rs @@ -0,0 +1,294 @@ +#[macro_use] extern crate lightning; + +use lightning::chain; +use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use lightning::chain::keysinterface::{ChannelKeys, KeysInterface}; +use lightning::ln::channelmanager::ChannelManager; +use lightning::util::logger::Logger; +use lightning::util::ser::Writeable; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; + +/// BackgroundProcessor takes care of tasks that (1) need to happen periodically to keep +/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its +/// responsibilities are: +/// * Monitoring whether the ChannelManager needs to be re-persisted to disk, and if so, +/// writing it to disk/backups by invoking the callback given to it at startup. +/// ChannelManager persistence should be done in the background. +/// * Calling `ChannelManager::timer_chan_freshness_every_min()` every minute (can be done in the +/// background). +/// +/// Note that if ChannelManager persistence fails and the persisted manager becomes out-of-date, +/// then there is a risk of channels force-closing on startup when the manager realizes it's +/// outdated. However, as long as `ChannelMonitor` backups are sound, no funds besides those used +/// for unilateral chain closure fees are at risk. +pub struct BackgroundProcessor { + stop_thread: Arc, + /// May be used to retrieve and handle the error if `BackgroundProcessor`'s thread + /// exits due to an error while persisting. + pub thread_handle: JoinHandle>, +} + +#[cfg(not(test))] +const CHAN_FRESHNESS_TIMER: u64 = 60; +#[cfg(test)] +const CHAN_FRESHNESS_TIMER: u64 = 1; + +impl BackgroundProcessor { + /// Start a background thread that takes care of responsibilities enumerated in the top-level + /// documentation. + /// + /// If `persist_manager` returns an error, then this thread will return said error (and `start()` + /// will need to be called again to restart the `BackgroundProcessor`). Users should wait on + /// [`thread_handle`]'s `join()` method to be able to tell if and when an error is returned, or + /// implement `persist_manager` such that an error is never returned to the `BackgroundProcessor` + /// + /// `persist_manager` is responsible for writing out the `ChannelManager` to disk, and/or uploading + /// to one or more backup services. See [`ChannelManager::write`] for writing out a `ChannelManager`. + /// See [`FilesystemPersister::persist_manager`] for Rust-Lightning's provided implementation. + /// + /// [`thread_handle`]: struct.BackgroundProcessor.html#structfield.thread_handle + /// [`ChannelManager::write`]: ../lightning/ln/channelmanager/struct.ChannelManager.html#method.write + /// [`FilesystemPersister::persist_manager`]: ../lightning_persister/struct.FilesystemPersister.html#impl + pub fn start(persist_manager: PM, manager: Arc, Arc, Arc, Arc, Arc>>, logger: Arc) -> Self + where ChanSigner: 'static + ChannelKeys + Writeable, + M: 'static + chain::Watch, + T: 'static + BroadcasterInterface, + K: 'static + KeysInterface, + F: 'static + FeeEstimator, + L: 'static + Logger, + PM: 'static + Send + Fn(&ChannelManager, Arc, Arc, Arc, Arc>) -> Result<(), std::io::Error>, + { + let stop_thread = Arc::new(AtomicBool::new(false)); + let stop_thread_clone = stop_thread.clone(); + let handle = thread::spawn(move || -> Result<(), std::io::Error> { + let mut current_time = Instant::now(); + loop { + let updates_available = manager.wait_timeout(Duration::from_millis(100)); + if updates_available { + persist_manager(&*manager)?; + } + // Exit the loop if the background processor was requested to stop. + if stop_thread.load(Ordering::Acquire) == true { + log_trace!(logger, "Terminating background processor."); + return Ok(()) + } + if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER { + log_trace!(logger, "Calling manager's timer_chan_freshness_every_min"); + manager.timer_chan_freshness_every_min(); + current_time = Instant::now(); + } + } + }); + Self { + stop_thread: stop_thread_clone, + thread_handle: handle, + } + } + + /// Stop `BackgroundProcessor`'s thread. + pub fn stop(self) -> Result<(), std::io::Error> { + self.stop_thread.store(true, Ordering::Release); + self.thread_handle.join().unwrap() + } +} + +#[cfg(test)] +mod tests { + use bitcoin::blockdata::constants::genesis_block; + use bitcoin::blockdata::transaction::{Transaction, TxOut}; + use bitcoin::network::constants::Network; + use lightning::chain; + use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; + use lightning::chain::chainmonitor; + use lightning::chain::keysinterface::{ChannelKeys, InMemoryChannelKeys, KeysInterface, KeysManager}; + use lightning::chain::transaction::OutPoint; + use lightning::get_event_msg; + use lightning::ln::channelmanager::{ChannelManager, SimpleArcChannelManager}; + use lightning::ln::features::InitFeatures; + use lightning::ln::msgs::ChannelMessageHandler; + use lightning::util::config::UserConfig; + use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent}; + use lightning::util::logger::Logger; + use lightning::util::ser::Writeable; + use lightning::util::test_utils; + use lightning_persister::FilesystemPersister; + use std::fs; + use std::path::PathBuf; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + use super::BackgroundProcessor; + + type ChainMonitor = chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; + + struct Node { + node: SimpleArcChannelManager, + persister: Arc, + logger: Arc, + } + + impl Drop for Node { + fn drop(&mut self) { + let data_dir = self.persister.get_data_dir(); + match fs::remove_dir_all(data_dir.clone()) { + Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e), + _ => {} + } + } + } + + fn get_full_filepath(filepath: String, filename: String) -> String { + let mut path = PathBuf::from(filepath); + path.push(filename); + path.to_str().unwrap().to_string() + } + + fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec { + let mut nodes = Vec::new(); + for i in 0..num_nodes { + let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())}); + let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 }); + let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); + let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); + let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); + let seed = [i as u8; 32]; + let network = Network::Testnet; + let now = Duration::from_secs(genesis_block(network).header.time as u64); + let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); + let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); + let manager = Arc::new(ChannelManager::new(Network::Testnet, fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), i)); + let node = Node { node: manager, persister, logger }; + nodes.push(node); + } + nodes + } + + macro_rules! open_channel { + ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ + $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap(); + $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); + $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); + let events = $node_a.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + let (temporary_channel_id, tx, funding_output) = match events[0] { + Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => { + assert_eq!(*channel_value_satoshis, $channel_value); + assert_eq!(user_channel_id, 42); + + let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut { + value: *channel_value_satoshis, script_pubkey: output_script.clone(), + }]}; + let funding_outpoint = OutPoint { txid: tx.txid(), index: 0 }; + (*temporary_channel_id, tx, funding_outpoint) + }, + _ => panic!("Unexpected event"), + }; + + $node_a.node.funding_transaction_generated(&temporary_channel_id, funding_output); + $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id())); + $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id())); + tx + }} + } + + #[test] + fn test_background_processor() { + // Test that when a new channel is created, the ChannelManager needs to be re-persisted with + // updates. Also test that when new updates are available, the manager signals that it needs + // re-persistence and is successfully re-persisted. + let nodes = create_nodes(2, "test_background_processor".to_string()); + + // Initiate the background processors to watch each node. + let data_dir = nodes[0].persister.get_data_dir(); + let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); + let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone()); + + // Go through the channel creation process until each node should have something persisted. + let tx = open_channel!(nodes[0], nodes[1], 100000); + + macro_rules! check_persisted_data { + ($node: expr, $filepath: expr, $expected_bytes: expr) => { + match $node.write(&mut $expected_bytes) { + Ok(()) => { + loop { + match std::fs::read($filepath) { + Ok(bytes) => { + if bytes == $expected_bytes { + break + } else { + continue + } + }, + Err(_) => continue + } + } + }, + Err(e) => panic!("Unexpected error: {}", e) + } + } + } + + // Check that the initial channel manager data is persisted as expected. + let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string()); + let mut expected_bytes = Vec::new(); + check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes); + loop { + if !nodes[0].node.get_persistence_condvar_value() { break } + } + + // Force-close the channel. + nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id()).unwrap(); + + // Check that the force-close updates are persisted. + let mut expected_bytes = Vec::new(); + check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes); + loop { + if !nodes[0].node.get_persistence_condvar_value() { break } + } + + assert!(bg_processor.stop().is_ok()); + } + + #[test] + fn test_chan_freshness_called() { + // Test that ChannelManager's `timer_chan_freshness_every_min` is called every + // `CHAN_FRESHNESS_TIMER`. + let nodes = create_nodes(1, "test_chan_freshness_called".to_string()); + let data_dir = nodes[0].persister.get_data_dir(); + let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); + let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone()); + loop { + let log_entries = nodes[0].logger.lines.lock().unwrap(); + let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string(); + if log_entries.get(&("background_processor".to_string(), desired_log)).is_some() { + break + } + } + + assert!(bg_processor.stop().is_ok()); + } + + #[test] + fn test_persist_error() { + // Test that if we encounter an error during manager persistence, the thread panics. + fn persist_manager(_data: &ChannelManager, Arc, Arc, Arc, Arc>) -> Result<(), std::io::Error> + where ChanSigner: 'static + ChannelKeys + Writeable, + M: 'static + chain::Watch, + T: 'static + BroadcasterInterface, + K: 'static + KeysInterface, + F: 'static + FeeEstimator, + L: 'static + Logger, + { + Err(std::io::Error::new(std::io::ErrorKind::Other, "test")) + } + + let nodes = create_nodes(2, "test_persist_error".to_string()); + let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone()); + open_channel!(nodes[0], nodes[1], 100000); + + let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test"); + } +} diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml index 63b69a1e1..d8be01c92 100644 --- a/lightning-persister/Cargo.toml +++ b/lightning-persister/Cargo.toml @@ -12,6 +12,9 @@ bitcoin = "0.24" lightning = { version = "0.0.12", path = "../lightning" } libc = "0.2" +[target.'cfg(windows)'.dependencies] +winapi = { version = "0.3", features = ["winbase"] } + [dev-dependencies.bitcoin] version = "0.24" features = ["bitcoinconsensus"] diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 7307ef0d1..0257eb50e 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -1,20 +1,26 @@ +mod util; + extern crate lightning; extern crate bitcoin; extern crate libc; use bitcoin::hashes::hex::ToHex; +use crate::util::DiskWriteable; +use lightning::chain; +use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr}; use lightning::chain::channelmonitor; -use lightning::chain::keysinterface::ChannelKeys; +use lightning::chain::keysinterface::{ChannelKeys, KeysInterface}; use lightning::chain::transaction::OutPoint; +use lightning::ln::channelmanager::ChannelManager; +use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use std::fs; use std::io::Error; -use std::path::{Path, PathBuf}; +use std::sync::Arc; #[cfg(test)] use { - lightning::chain::keysinterface::KeysInterface, lightning::util::ser::ReadableArgs, bitcoin::{BlockHash, Txid}, bitcoin::hashes::hex::FromHex, @@ -22,9 +28,6 @@ use { std::io::Cursor }; -#[cfg(not(target_os = "windows"))] -use std::os::unix::io::AsRawFd; - /// FilesystemPersister persists channel data on disk, where each channel's /// data is stored in a file named after its funding outpoint. /// @@ -41,13 +44,22 @@ pub struct FilesystemPersister { path_to_channel_data: String, } -trait DiskWriteable { - fn write(&self, writer: &mut fs::File) -> Result<(), Error>; +impl DiskWriteable for ChannelMonitor { + fn write_to_file(&self, writer: &mut fs::File) -> Result<(), Error> { + self.write(writer) + } } -impl DiskWriteable for ChannelMonitor { - fn write(&self, writer: &mut fs::File) -> Result<(), Error> { - Writeable::write(self, writer) +impl DiskWriteable for ChannelManager, Arc, Arc, Arc, Arc> +where ChanSigner: ChannelKeys + Writeable, + M: chain::Watch, + T: BroadcasterInterface, + K: KeysInterface, + F: FeeEstimator, + L: Logger, +{ + fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> { + self.write(writer) } } @@ -60,39 +72,24 @@ impl FilesystemPersister { } } - fn get_full_filepath(&self, funding_txo: OutPoint) -> String { - let mut path = PathBuf::from(&self.path_to_channel_data); - path.push(format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index)); - path.to_str().unwrap().to_string() + pub fn get_data_dir(&self) -> String { + self.path_to_channel_data.clone() } - // Utility to write a file to disk. - fn write_channel_data(&self, funding_txo: OutPoint, monitor: &dyn DiskWriteable) -> std::io::Result<()> { - fs::create_dir_all(&self.path_to_channel_data)?; - // Do a crazy dance with lots of fsync()s to be overly cautious here... - // We never want to end up in a state where we've lost the old data, or end up using the - // old data on power loss after we've returned. - // The way to atomically write a file on Unix platforms is: - // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) - let filename = self.get_full_filepath(funding_txo); - let tmp_filename = format!("{}.tmp", filename.clone()); - - { - // Note that going by rust-lang/rust@d602a6b, on MacOS it is only safe to use - // rust stdlib 1.36 or higher. - let mut f = fs::File::create(&tmp_filename)?; - monitor.write(&mut f)?; - f.sync_all()?; - } - fs::rename(&tmp_filename, &filename)?; - // Fsync the parent directory on Unix. - #[cfg(not(target_os = "windows"))] - { - let path = Path::new(&filename).parent().unwrap(); - let dir_file = fs::OpenOptions::new().read(true).open(path)?; - unsafe { libc::fsync(dir_file.as_raw_fd()); } - } - Ok(()) + /// Writes the provided `ChannelManager` to the path provided at `FilesystemPersister` + /// initialization, within a file called "manager". + pub fn persist_manager( + data_dir: String, + manager: &ChannelManager, Arc, Arc, Arc, Arc> + ) -> Result<(), std::io::Error> + where ChanSigner: ChannelKeys + Writeable, + M: chain::Watch, + T: BroadcasterInterface, + K: KeysInterface, + F: FeeEstimator, + L: Logger + { + util::write_to_file(data_dir, "manager".to_string(), manager) } #[cfg(test)] @@ -132,28 +129,18 @@ impl FilesystemPersister { impl channelmonitor::Persist for FilesystemPersister { fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { - self.write_channel_data(funding_txo, monitor) + let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); + util::write_to_file(self.path_to_channel_data.clone(), filename, monitor) .map_err(|_| ChannelMonitorUpdateErr::PermanentFailure) } fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { - self.write_channel_data(funding_txo, monitor) + let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); + util::write_to_file(self.path_to_channel_data.clone(), filename, monitor) .map_err(|_| ChannelMonitorUpdateErr::PermanentFailure) } } -#[cfg(test)] -impl Drop for FilesystemPersister { - fn drop(&mut self) { - // We test for invalid directory names, so it's OK if directory removal - // fails. - match fs::remove_dir_all(&self.path_to_channel_data) { - Err(e) => println!("Failed to remove test persister directory: {}", e), - _ => {} - } - } -} - #[cfg(test)] mod tests { extern crate lightning; @@ -162,8 +149,6 @@ mod tests { use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::hashes::hex::FromHex; use bitcoin::Txid; - use DiskWriteable; - use Error; use lightning::chain::channelmonitor::{Persist, ChannelMonitorUpdateErr}; use lightning::chain::transaction::OutPoint; use lightning::{check_closed_broadcast, check_added_monitors}; @@ -171,20 +156,22 @@ mod tests { use lightning::ln::functional_test_utils::*; use lightning::ln::msgs::ErrorAction; use lightning::util::events::{MessageSendEventsProvider, MessageSendEvent}; - use lightning::util::ser::Writer; use lightning::util::test_utils; use std::fs; - use std::io; #[cfg(target_os = "windows")] use { lightning::get_event_msg, lightning::ln::msgs::ChannelMessageHandler, }; - struct TestWriteable{} - impl DiskWriteable for TestWriteable { - fn write(&self, writer: &mut fs::File) -> Result<(), Error> { - writer.write_all(&[42; 1]) + impl Drop for FilesystemPersister { + fn drop(&mut self) { + // We test for invalid directory names, so it's OK if directory removal + // fails. + match fs::remove_dir_all(&self.path_to_channel_data) { + Err(e) => println!("Failed to remove test persister directory: {}", e), + _ => {} + } } } @@ -256,88 +243,6 @@ mod tests { check_persisted_data!(11); } - // Test that if the persister's path to channel data is read-only, writing - // data to it fails. Windows ignores the read-only flag for folders, so this - // test is Unix-only. - #[cfg(not(target_os = "windows"))] - #[test] - fn test_readonly_dir() { - let persister = FilesystemPersister::new("test_readonly_dir_persister".to_string()); - let test_writeable = TestWriteable{}; - let test_txo = OutPoint { - txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), - index: 0 - }; - // Create the persister's directory and set it to read-only. - let path = &persister.path_to_channel_data; - fs::create_dir_all(path).unwrap(); - let mut perms = fs::metadata(path).unwrap().permissions(); - perms.set_readonly(true); - fs::set_permissions(path, perms).unwrap(); - match persister.write_channel_data(test_txo, &test_writeable) { - Err(e) => assert_eq!(e.kind(), io::ErrorKind::PermissionDenied), - _ => panic!("Unexpected error message") - } - } - - // Test failure to rename in the process of atomically creating a channel - // monitor's file. We induce this failure by making the `tmp` file a - // directory. - // Explanation: given "from" = the file being renamed, "to" = the - // renamee that already exists: Windows should fail because it'll fail - // whenever "to" is a directory, and Unix should fail because if "from" is a - // file, then "to" is also required to be a file. - #[test] - fn test_rename_failure() { - let persister = FilesystemPersister::new("test_rename_failure".to_string()); - let test_writeable = TestWriteable{}; - let txid_hex = "8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be"; - let outp_idx = 0; - let test_txo = OutPoint { - txid: Txid::from_hex(txid_hex).unwrap(), - index: outp_idx, - }; - // Create the channel data file and make it a directory. - let path = &persister.path_to_channel_data; - fs::create_dir_all(format!("{}/{}_{}", path, txid_hex, outp_idx)).unwrap(); - match persister.write_channel_data(test_txo, &test_writeable) { - Err(e) => { - #[cfg(not(target_os = "windows"))] - assert_eq!(e.kind(), io::ErrorKind::Other); - #[cfg(target_os = "windows")] - assert_eq!(e.kind(), io::ErrorKind::PermissionDenied); - } - _ => panic!("Unexpected error message") - } - } - - // Test failure to create the temporary file in the persistence process. - // We induce this failure by having the temp file already exist and be a - // directory. - #[test] - fn test_tmp_file_creation_failure() { - let persister = FilesystemPersister::new("test_tmp_file_creation_failure".to_string()); - let test_writeable = TestWriteable{}; - let txid_hex = "8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be"; - let outp_idx = 0; - let test_txo = OutPoint { - txid: Txid::from_hex(txid_hex).unwrap(), - index: outp_idx, - }; - // Create the tmp file and make it a directory. - let path = &persister.path_to_channel_data; - fs::create_dir_all(format!("{}/{}_{}.tmp", path, txid_hex, outp_idx)).unwrap(); - match persister.write_channel_data(test_txo, &test_writeable) { - Err(e) => { - #[cfg(not(target_os = "windows"))] - assert_eq!(e.kind(), io::ErrorKind::Other); - #[cfg(target_os = "windows")] - assert_eq!(e.kind(), io::ErrorKind::PermissionDenied); - } - _ => panic!("Unexpected error message") - } - } - // Test that if the persister's path to channel data is read-only, writing a // monitor to it results in the persister returning a PermanentFailure. // Windows ignores the read-only flag for folders, so this test is Unix-only. diff --git a/lightning-persister/src/util.rs b/lightning-persister/src/util.rs new file mode 100644 index 000000000..daacb00f0 --- /dev/null +++ b/lightning-persister/src/util.rs @@ -0,0 +1,188 @@ +#[cfg(target_os = "windows")] +extern crate winapi; + +use std::fs; +use std::path::{Path, PathBuf}; + +#[cfg(not(target_os = "windows"))] +use std::os::unix::io::AsRawFd; + +#[cfg(target_os = "windows")] +use { + std::ffi::OsStr, + std::os::windows::ffi::OsStrExt +}; + +pub(crate) trait DiskWriteable { + fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error>; +} + +pub(crate) fn get_full_filepath(filepath: String, filename: String) -> String { + let mut path = PathBuf::from(filepath); + path.push(filename); + path.to_str().unwrap().to_string() +} + +#[cfg(target_os = "windows")] +macro_rules! call { + ($e: expr) => ( + if $e != 0 { + return Ok(()) + } else { + return Err(std::io::Error::last_os_error()) + } + ) +} + +#[cfg(target_os = "windows")] +fn path_to_windows_str>(path: T) -> Vec { + path.as_ref().encode_wide().chain(Some(0)).collect() +} + +#[allow(bare_trait_objects)] +pub(crate) fn write_to_file(path: String, filename: String, data: &D) -> std::io::Result<()> { + fs::create_dir_all(path.clone())?; + // Do a crazy dance with lots of fsync()s to be overly cautious here... + // We never want to end up in a state where we've lost the old data, or end up using the + // old data on power loss after we've returned. + // The way to atomically write a file on Unix platforms is: + // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) + let filename_with_path = get_full_filepath(path, filename); + let tmp_filename = format!("{}.tmp", filename_with_path.clone()); + + { + // Note that going by rust-lang/rust@d602a6b, on MacOS it is only safe to use + // rust stdlib 1.36 or higher. + let mut f = fs::File::create(&tmp_filename)?; + data.write_to_file(&mut f)?; + f.sync_all()?; + } + // Fsync the parent directory on Unix. + #[cfg(not(target_os = "windows"))] + { + fs::rename(&tmp_filename, &filename_with_path)?; + let path = Path::new(&filename_with_path).parent().unwrap(); + let dir_file = fs::OpenOptions::new().read(true).open(path)?; + unsafe { libc::fsync(dir_file.as_raw_fd()); } + } + #[cfg(target_os = "windows")] + { + let src = PathBuf::from(tmp_filename.clone()); + let dst = PathBuf::from(filename_with_path.clone()); + if Path::new(&filename_with_path.clone()).exists() { + unsafe {winapi::um::winbase::ReplaceFileW( + path_to_windows_str(dst).as_ptr(), path_to_windows_str(src).as_ptr(), std::ptr::null(), + winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS, + std::ptr::null_mut() as *mut winapi::ctypes::c_void, + std::ptr::null_mut() as *mut winapi::ctypes::c_void + )}; + } else { + call!(unsafe {winapi::um::winbase::MoveFileExW( + path_to_windows_str(src).as_ptr(), path_to_windows_str(dst).as_ptr(), + winapi::um::winbase::MOVEFILE_WRITE_THROUGH | winapi::um::winbase::MOVEFILE_REPLACE_EXISTING + )}); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::{DiskWriteable, get_full_filepath, write_to_file}; + use std::fs; + use std::io; + use std::io::Write; + + struct TestWriteable{} + impl DiskWriteable for TestWriteable { + fn write_to_file(&self, writer: &mut fs::File) -> Result<(), io::Error> { + writer.write_all(&[42; 1]) + } + } + + // Test that if the persister's path to channel data is read-only, writing + // data to it fails. Windows ignores the read-only flag for folders, so this + // test is Unix-only. + #[cfg(not(target_os = "windows"))] + #[test] + fn test_readonly_dir() { + let test_writeable = TestWriteable{}; + let filename = "test_readonly_dir_persister_filename".to_string(); + let path = "test_readonly_dir_persister_dir"; + fs::create_dir_all(path.to_string()).unwrap(); + let mut perms = fs::metadata(path.to_string()).unwrap().permissions(); + perms.set_readonly(true); + fs::set_permissions(path.to_string(), perms).unwrap(); + match write_to_file(path.to_string(), filename, &test_writeable) { + Err(e) => assert_eq!(e.kind(), io::ErrorKind::PermissionDenied), + _ => panic!("Unexpected error message") + } + } + + // Test failure to rename in the process of atomically creating a channel + // monitor's file. We induce this failure by making the `tmp` file a + // directory. + // Explanation: given "from" = the file being renamed, "to" = the destination + // file that already exists: Unix should fail because if "from" is a file, + // then "to" is also required to be a file. + // TODO: ideally try to make this work on Windows again + #[cfg(not(target_os = "windows"))] + #[test] + fn test_rename_failure() { + let test_writeable = TestWriteable{}; + let filename = "test_rename_failure_filename"; + let path = "test_rename_failure_dir"; + // Create the channel data file and make it a directory. + fs::create_dir_all(get_full_filepath(path.to_string(), filename.to_string())).unwrap(); + match write_to_file(path.to_string(), filename.to_string(), &test_writeable) { + Err(e) => assert_eq!(e.kind(), io::ErrorKind::Other), + _ => panic!("Unexpected Ok(())") + } + fs::remove_dir_all(path).unwrap(); + } + + #[test] + fn test_diskwriteable_failure() { + struct FailingWriteable {} + impl DiskWriteable for FailingWriteable { + fn write_to_file(&self, _writer: &mut fs::File) -> Result<(), std::io::Error> { + Err(std::io::Error::new(std::io::ErrorKind::Other, "expected failure")) + } + } + + let filename = "test_diskwriteable_failure"; + let path = "test_diskwriteable_failure_dir"; + let test_writeable = FailingWriteable{}; + match write_to_file(path.to_string(), filename.to_string(), &test_writeable) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::Other); + assert_eq!(e.get_ref().unwrap().to_string(), "expected failure"); + }, + _ => panic!("unexpected result") + } + fs::remove_dir_all(path).unwrap(); + } + + // Test failure to create the temporary file in the persistence process. + // We induce this failure by having the temp file already exist and be a + // directory. + #[test] + fn test_tmp_file_creation_failure() { + let test_writeable = TestWriteable{}; + let filename = "test_tmp_file_creation_failure_filename".to_string(); + let path = "test_tmp_file_creation_failure_dir".to_string(); + + // Create the tmp file and make it a directory. + let tmp_path = get_full_filepath(path.clone(), format!("{}.tmp", filename.clone())); + fs::create_dir_all(tmp_path).unwrap(); + match write_to_file(path, filename, &test_writeable) { + Err(e) => { + #[cfg(not(target_os = "windows"))] + assert_eq!(e.kind(), io::ErrorKind::Other); + #[cfg(target_os = "windows")] + assert_eq!(e.kind(), io::ErrorKind::PermissionDenied); + } + _ => panic!("Unexpected error message") + } + } +} diff --git a/lightning/Cargo.toml b/lightning/Cargo.toml index 883aee4a1..0b51a3992 100644 --- a/lightning/Cargo.toml +++ b/lightning/Cargo.toml @@ -11,6 +11,7 @@ Still missing tons of error-handling. See GitHub issues for suggested projects i """ [features] +allow_wallclock_use = [] fuzztarget = ["bitcoin/fuzztarget", "regex"] # Internal test utilities exposed to other repo crates _test_utils = ["hex", "regex"] @@ -38,3 +39,6 @@ features = ["bitcoinconsensus"] [dev-dependencies] hex = "0.3" regex = "0.1.80" + +[package.metadata.docs.rs] +features = ["allow_wallclock_use"] # When https://github.com/rust-lang/rust/issues/43781 complies with our MSVR, we can add nice banners in the docs for the methods behind this feature-gate. diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index e700b1f3b..d3a372e47 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -42,6 +42,8 @@ use std; use std::default::Default; use std::{cmp,mem,fmt}; use std::ops::Deref; +#[cfg(any(test, feature = "fuzztarget"))] +use std::sync::Mutex; use bitcoin::hashes::hex::ToHex; #[cfg(test)] @@ -258,6 +260,27 @@ enum UpdateStatus { DisabledStaged, } +/// An enum indicating whether the local or remote side offered a given HTLC. +enum HTLCInitiator { + LocalOffered, + RemoteOffered, +} + +/// Used when calculating whether we or the remote can afford an additional HTLC. +struct HTLCCandidate { + amount_msat: u64, + origin: HTLCInitiator, +} + +impl HTLCCandidate { + fn new(amount_msat: u64, origin: HTLCInitiator) -> Self { + Self { + amount_msat, + origin, + } + } +} + // TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking // has been completed, and then turn into a Channel to get compiler-time enforcement of things like // calling channel_id() before we're set up or things like get_outbound_funding_signed on an @@ -386,6 +409,24 @@ pub(super) struct Channel { commitment_secrets: CounterpartyCommitmentSecrets, network_sync: UpdateStatus, + + // We save these values so we can make sure `next_local_commit_tx_fee_msat` and + // `next_remote_commit_tx_fee_msat` properly predict what the next commitment transaction fee will + // be, by comparing the cached values to the fee of the tranaction generated by + // `build_commitment_transaction`. + #[cfg(any(test, feature = "fuzztarget"))] + next_local_commitment_tx_fee_info_cached: Mutex>, + #[cfg(any(test, feature = "fuzztarget"))] + next_remote_commitment_tx_fee_info_cached: Mutex>, +} + +#[cfg(any(test, feature = "fuzztarget"))] +struct CommitmentTxInfoCached { + fee: u64, + total_pending_htlcs: usize, + next_holder_htlc_id: u64, + next_counterparty_htlc_id: u64, + feerate: u32, } pub const OUR_MAX_HTLCS: u16 = 50; //TODO @@ -557,6 +598,11 @@ impl Channel { commitment_secrets: CounterpartyCommitmentSecrets::new(), network_sync: UpdateStatus::Fresh, + + #[cfg(any(test, feature = "fuzztarget"))] + next_local_commitment_tx_fee_info_cached: Mutex::new(None), + #[cfg(any(test, feature = "fuzztarget"))] + next_remote_commitment_tx_fee_info_cached: Mutex::new(None), }) } @@ -790,6 +836,11 @@ impl Channel { commitment_secrets: CounterpartyCommitmentSecrets::new(), network_sync: UpdateStatus::Fresh, + + #[cfg(any(test, feature = "fuzztarget"))] + next_local_commitment_tx_fee_info_cached: Mutex::new(None), + #[cfg(any(test, feature = "fuzztarget"))] + next_remote_commitment_tx_fee_info_cached: Mutex::new(None), }; Ok(chan) @@ -1684,63 +1735,172 @@ impl Channel { (COMMITMENT_TX_BASE_WEIGHT + num_htlcs as u64 * COMMITMENT_TX_WEIGHT_PER_HTLC) * self.feerate_per_kw as u64 / 1000 * 1000 } - // Get the commitment tx fee for the local (i.e our) next commitment transaction - // based on the number of pending HTLCs that are on track to be in our next - // commitment tx. `addl_htcs` is an optional parameter allowing the caller - // to add a number of additional HTLCs to the calculation. Note that dust - // HTLCs are excluded. - fn next_local_commit_tx_fee_msat(&self, addl_htlcs: usize) -> u64 { + // Get the commitment tx fee for the local's (i.e. our) next commitment transaction based on the + // number of pending HTLCs that are on track to be in our next commitment tx, plus an additional + // HTLC if `fee_spike_buffer_htlc` is Some, plus a new HTLC given by `new_htlc_amount`. Dust HTLCs + // are excluded. + fn next_local_commit_tx_fee_msat(&self, htlc: HTLCCandidate, fee_spike_buffer_htlc: Option<()>) -> u64 { assert!(self.is_outbound()); - let mut their_acked_htlcs = self.pending_inbound_htlcs.len(); + let real_dust_limit_success_sat = (self.feerate_per_kw as u64 * HTLC_SUCCESS_TX_WEIGHT / 1000) + self.holder_dust_limit_satoshis; + let real_dust_limit_timeout_sat = (self.feerate_per_kw as u64 * HTLC_TIMEOUT_TX_WEIGHT / 1000) + self.holder_dust_limit_satoshis; + + let mut addl_htlcs = 0; + if fee_spike_buffer_htlc.is_some() { addl_htlcs += 1; } + match htlc.origin { + HTLCInitiator::LocalOffered => { + if htlc.amount_msat / 1000 >= real_dust_limit_timeout_sat { + addl_htlcs += 1; + } + }, + HTLCInitiator::RemoteOffered => { + if htlc.amount_msat / 1000 >= real_dust_limit_success_sat { + addl_htlcs += 1; + } + } + } + + let mut included_htlcs = 0; + for ref htlc in self.pending_inbound_htlcs.iter() { + if htlc.amount_msat / 1000 < real_dust_limit_success_sat { + continue + } + // We include LocalRemoved HTLCs here because we may still need to broadcast a commitment + // transaction including this HTLC if it times out before they RAA. + included_htlcs += 1; + } + for ref htlc in self.pending_outbound_htlcs.iter() { - if htlc.amount_msat / 1000 <= self.holder_dust_limit_satoshis { + if htlc.amount_msat / 1000 < real_dust_limit_timeout_sat { continue } match htlc.state { - OutboundHTLCState::Committed => their_acked_htlcs += 1, - OutboundHTLCState::RemoteRemoved {..} => their_acked_htlcs += 1, - OutboundHTLCState::LocalAnnounced {..} => their_acked_htlcs += 1, + OutboundHTLCState::LocalAnnounced {..} => included_htlcs += 1, + OutboundHTLCState::Committed => included_htlcs += 1, + OutboundHTLCState::RemoteRemoved {..} => included_htlcs += 1, + // We don't include AwaitingRemoteRevokeToRemove HTLCs because our next commitment + // transaction won't be generated until they send us their next RAA, which will mean + // dropping any HTLCs in this state. _ => {}, } } for htlc in self.holding_cell_htlc_updates.iter() { match htlc { - &HTLCUpdateAwaitingACK::AddHTLC { .. } => their_acked_htlcs += 1, - _ => {}, + &HTLCUpdateAwaitingACK::AddHTLC { amount_msat, .. } => { + if amount_msat / 1000 < real_dust_limit_timeout_sat { + continue + } + included_htlcs += 1 + }, + _ => {}, // Don't include claims/fails that are awaiting ack, because once we get the + // ack we're guaranteed to never include them in commitment txs anymore. } } - self.commit_tx_fee_msat(their_acked_htlcs + addl_htlcs) + let num_htlcs = included_htlcs + addl_htlcs; + let res = self.commit_tx_fee_msat(num_htlcs); + #[cfg(any(test, feature = "fuzztarget"))] + { + let mut fee = res; + if fee_spike_buffer_htlc.is_some() { + fee = self.commit_tx_fee_msat(num_htlcs - 1); + } + let total_pending_htlcs = self.pending_inbound_htlcs.len() + self.pending_outbound_htlcs.len() + + self.holding_cell_htlc_updates.len(); + let commitment_tx_info = CommitmentTxInfoCached { + fee, + total_pending_htlcs, + next_holder_htlc_id: match htlc.origin { + HTLCInitiator::LocalOffered => self.next_holder_htlc_id + 1, + HTLCInitiator::RemoteOffered => self.next_holder_htlc_id, + }, + next_counterparty_htlc_id: match htlc.origin { + HTLCInitiator::LocalOffered => self.next_counterparty_htlc_id, + HTLCInitiator::RemoteOffered => self.next_counterparty_htlc_id + 1, + }, + feerate: self.feerate_per_kw, + }; + *self.next_local_commitment_tx_fee_info_cached.lock().unwrap() = Some(commitment_tx_info); + } + res } - // Get the commitment tx fee for the remote's next commitment transaction - // based on the number of pending HTLCs that are on track to be in their - // next commitment tx. `addl_htcs` is an optional parameter allowing the caller - // to add a number of additional HTLCs to the calculation. Note that dust HTLCs - // are excluded. - fn next_remote_commit_tx_fee_msat(&self, addl_htlcs: usize) -> u64 { + // Get the commitment tx fee for the remote's next commitment transaction based on the number of + // pending HTLCs that are on track to be in their next commitment tx, plus an additional HTLC if + // `fee_spike_buffer_htlc` is Some, plus a new HTLC given by `new_htlc_amount`. Dust HTLCs are + // excluded. + fn next_remote_commit_tx_fee_msat(&self, htlc: HTLCCandidate, fee_spike_buffer_htlc: Option<()>) -> u64 { assert!(!self.is_outbound()); - // When calculating the set of HTLCs which will be included in their next - // commitment_signed, all inbound HTLCs are included (as all states imply it will be - // included) and only committed outbound HTLCs, see below. - let mut their_acked_htlcs = self.pending_inbound_htlcs.len(); + let real_dust_limit_success_sat = (self.feerate_per_kw as u64 * HTLC_SUCCESS_TX_WEIGHT / 1000) + self.counterparty_dust_limit_satoshis; + let real_dust_limit_timeout_sat = (self.feerate_per_kw as u64 * HTLC_TIMEOUT_TX_WEIGHT / 1000) + self.counterparty_dust_limit_satoshis; + + let mut addl_htlcs = 0; + if fee_spike_buffer_htlc.is_some() { addl_htlcs += 1; } + match htlc.origin { + HTLCInitiator::LocalOffered => { + if htlc.amount_msat / 1000 >= real_dust_limit_success_sat { + addl_htlcs += 1; + } + }, + HTLCInitiator::RemoteOffered => { + if htlc.amount_msat / 1000 >= real_dust_limit_timeout_sat { + addl_htlcs += 1; + } + } + } + + // When calculating the set of HTLCs which will be included in their next commitment_signed, all + // non-dust inbound HTLCs are included (as all states imply it will be included) and only + // committed outbound HTLCs, see below. + let mut included_htlcs = 0; + for ref htlc in self.pending_inbound_htlcs.iter() { + if htlc.amount_msat / 1000 <= real_dust_limit_timeout_sat { + continue + } + included_htlcs += 1; + } + for ref htlc in self.pending_outbound_htlcs.iter() { - if htlc.amount_msat / 1000 <= self.counterparty_dust_limit_satoshis { + if htlc.amount_msat / 1000 <= real_dust_limit_success_sat { continue } - // We only include outbound HTLCs if it will not be included in their next - // commitment_signed, i.e. if they've responded to us with an RAA after announcement. + // We only include outbound HTLCs if it will not be included in their next commitment_signed, + // i.e. if they've responded to us with an RAA after announcement. match htlc.state { - OutboundHTLCState::Committed => their_acked_htlcs += 1, - OutboundHTLCState::RemoteRemoved {..} => their_acked_htlcs += 1, + OutboundHTLCState::Committed => included_htlcs += 1, + OutboundHTLCState::RemoteRemoved {..} => included_htlcs += 1, + OutboundHTLCState::LocalAnnounced { .. } => included_htlcs += 1, _ => {}, } } - self.commit_tx_fee_msat(their_acked_htlcs + addl_htlcs) + let num_htlcs = included_htlcs + addl_htlcs; + let res = self.commit_tx_fee_msat(num_htlcs); + #[cfg(any(test, feature = "fuzztarget"))] + { + let mut fee = res; + if fee_spike_buffer_htlc.is_some() { + fee = self.commit_tx_fee_msat(num_htlcs - 1); + } + let total_pending_htlcs = self.pending_inbound_htlcs.len() + self.pending_outbound_htlcs.len(); + let commitment_tx_info = CommitmentTxInfoCached { + fee, + total_pending_htlcs, + next_holder_htlc_id: match htlc.origin { + HTLCInitiator::LocalOffered => self.next_holder_htlc_id + 1, + HTLCInitiator::RemoteOffered => self.next_holder_htlc_id, + }, + next_counterparty_htlc_id: match htlc.origin { + HTLCInitiator::LocalOffered => self.next_counterparty_htlc_id, + HTLCInitiator::RemoteOffered => self.next_counterparty_htlc_id + 1, + }, + feerate: self.feerate_per_kw, + }; + *self.next_remote_commitment_tx_fee_info_cached.lock().unwrap() = Some(commitment_tx_info); + } + res } pub fn update_add_htlc(&mut self, msg: &msgs::UpdateAddHTLC, mut pending_forward_status: PendingHTLCStatus, create_pending_htlc_status: F, logger: &L) -> Result<(), ChannelError> @@ -1808,8 +1968,8 @@ impl Channel { // Check that the remote can afford to pay for this HTLC on-chain at the current // feerate_per_kw, while maintaining their channel reserve (as required by the spec). let remote_commit_tx_fee_msat = if self.is_outbound() { 0 } else { - // +1 for this HTLC. - self.next_remote_commit_tx_fee_msat(1) + let htlc_candidate = HTLCCandidate::new(msg.amount_msat, HTLCInitiator::RemoteOffered); + self.next_remote_commit_tx_fee_msat(htlc_candidate, None) // Don't include the extra fee spike buffer HTLC in calculations }; if pending_remote_value_msat - msg.amount_msat < remote_commit_tx_fee_msat { return Err(ChannelError::Close("Remote HTLC add would not leave enough to pay for fees".to_owned())); @@ -1822,14 +1982,16 @@ impl Channel { } if !self.is_outbound() { - // `+1` for this HTLC, `2 *` and `+1` fee spike buffer we keep for the remote. This deviates from the - // spec because in the spec, the fee spike buffer requirement doesn't exist on the receiver's side, - // only on the sender's. - // Note that when we eventually remove support for fee updates and switch to anchor output fees, - // we will drop the `2 *`, since we no longer be as sensitive to fee spikes. But, keep the extra +1 - // as we should still be able to afford adding this HTLC plus one more future HTLC, regardless of - // being sensitive to fee spikes. - let remote_fee_cost_incl_stuck_buffer_msat = 2 * self.next_remote_commit_tx_fee_msat(1 + 1); + // `2 *` and `Some(())` is for the fee spike buffer we keep for the remote. This deviates from + // the spec because in the spec, the fee spike buffer requirement doesn't exist on the + // receiver's side, only on the sender's. + // Note that when we eventually remove support for fee updates and switch to anchor output + // fees, we will drop the `2 *`, since we no longer be as sensitive to fee spikes. But, keep + // the extra htlc when calculating the next remote commitment transaction fee as we should + // still be able to afford adding this HTLC plus one more future HTLC, regardless of being + // sensitive to fee spikes. + let htlc_candidate = HTLCCandidate::new(msg.amount_msat, HTLCInitiator::RemoteOffered); + let remote_fee_cost_incl_stuck_buffer_msat = 2 * self.next_remote_commit_tx_fee_msat(htlc_candidate, Some(())); if pending_remote_value_msat - msg.amount_msat - chan_reserve_msat < remote_fee_cost_incl_stuck_buffer_msat { // Note that if the pending_forward_status is not updated here, then it's because we're already failing // the HTLC, i.e. its status is already set to failing. @@ -1838,9 +2000,8 @@ impl Channel { } } else { // Check that they won't violate our local required channel reserve by adding this HTLC. - - // +1 for this HTLC. - let local_commit_tx_fee_msat = self.next_local_commit_tx_fee_msat(1); + let htlc_candidate = HTLCCandidate::new(msg.amount_msat, HTLCInitiator::RemoteOffered); + let local_commit_tx_fee_msat = self.next_local_commit_tx_fee_msat(htlc_candidate, None); if self.value_to_self_msat < self.counterparty_selected_channel_reserve_satoshis * 1000 + local_commit_tx_fee_msat { return Err(ChannelError::Close("Cannot accept HTLC that would put our balance under counterparty-announced channel reserve value".to_owned())); } @@ -1976,15 +2137,31 @@ impl Channel { (commitment_tx.1, htlcs_cloned, commitment_tx.0, commitment_txid) }; + let total_fee = feerate_per_kw as u64 * (COMMITMENT_TX_BASE_WEIGHT + (num_htlcs as u64) * COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000; //If channel fee was updated by funder confirm funder can afford the new fee rate when applied to the current local commitment transaction if update_fee { - let total_fee = feerate_per_kw as u64 * (COMMITMENT_TX_BASE_WEIGHT + (num_htlcs as u64) * COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000; - let counterparty_reserve_we_require = Channel::::get_holder_selected_channel_reserve_satoshis(self.channel_value_satoshis); if self.channel_value_satoshis - self.value_to_self_msat / 1000 < total_fee + counterparty_reserve_we_require { return Err((None, ChannelError::Close("Funding remote cannot afford proposed new fee".to_owned()))); } } + #[cfg(any(test, feature = "fuzztarget"))] + { + if self.is_outbound() { + let projected_commit_tx_info = self.next_local_commitment_tx_fee_info_cached.lock().unwrap().take(); + *self.next_remote_commitment_tx_fee_info_cached.lock().unwrap() = None; + if let Some(info) = projected_commit_tx_info { + let total_pending_htlcs = self.pending_inbound_htlcs.len() + self.pending_outbound_htlcs.len() + + self.holding_cell_htlc_updates.len(); + if info.total_pending_htlcs == total_pending_htlcs + && info.next_holder_htlc_id == self.next_holder_htlc_id + && info.next_counterparty_htlc_id == self.next_counterparty_htlc_id + && info.feerate == self.feerate_per_kw { + assert_eq!(total_fee, info.fee / 1000); + } + } + } + } if msg.htlc_signatures.len() != num_htlcs { return Err((None, ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), num_htlcs)))); @@ -2250,6 +2427,12 @@ impl Channel { return Err(ChannelError::Close("Received an unexpected revoke_and_ack".to_owned())); } + #[cfg(any(test, feature = "fuzztarget"))] + { + *self.next_local_commitment_tx_fee_info_cached.lock().unwrap() = None; + *self.next_remote_commitment_tx_fee_info_cached.lock().unwrap() = None; + } + self.commitment_secrets.provide_secret(self.cur_counterparty_commitment_transaction_number + 1, msg.per_commitment_secret) .map_err(|_| ChannelError::Close("Previous secrets did not match new one".to_owned()))?; self.latest_monitor_update_id += 1; @@ -3720,11 +3903,10 @@ impl Channel { if !self.is_outbound() { // Check that we won't violate the remote channel reserve by adding this HTLC. - let counterparty_balance_msat = self.channel_value_satoshis * 1000 - self.value_to_self_msat; let holder_selected_chan_reserve_msat = Channel::::get_holder_selected_channel_reserve_satoshis(self.channel_value_satoshis); - // 1 additional HTLC corresponding to this HTLC. - let counterparty_commit_tx_fee_msat = self.next_remote_commit_tx_fee_msat(1); + let htlc_candidate = HTLCCandidate::new(amount_msat, HTLCInitiator::LocalOffered); + let counterparty_commit_tx_fee_msat = self.next_remote_commit_tx_fee_msat(htlc_candidate, None); if counterparty_balance_msat < holder_selected_chan_reserve_msat + counterparty_commit_tx_fee_msat { return Err(ChannelError::Ignore("Cannot send value that would put counterparty balance under holder-announced channel reserve value".to_owned())); } @@ -3735,10 +3917,10 @@ impl Channel { return Err(ChannelError::Ignore(format!("Cannot send value that would overdraw remaining funds. Amount: {}, pending value to self {}", amount_msat, pending_value_to_self_msat))); } - // The `+1` is for the HTLC currently being added to the commitment tx and - // the `2 *` and `+1` are for the fee spike buffer. + // `2 *` and extra HTLC are for the fee spike buffer. let commit_tx_fee_msat = if self.is_outbound() { - 2 * self.next_local_commit_tx_fee_msat(1 + 1) + let htlc_candidate = HTLCCandidate::new(amount_msat, HTLCInitiator::LocalOffered); + 2 * self.next_local_commit_tx_fee_msat(htlc_candidate, Some(())) } else { 0 }; if pending_value_to_self_msat - amount_msat < commit_tx_fee_msat { return Err(ChannelError::Ignore(format!("Cannot send value that would not leave enough to pay for fees. Pending value to self: {}. local_commit_tx_fee {}", pending_value_to_self_msat, commit_tx_fee_msat))); @@ -3881,6 +4063,24 @@ impl Channel { let counterparty_commitment_txid = counterparty_commitment_tx.0.trust().txid(); let (signature, htlc_signatures); + #[cfg(any(test, feature = "fuzztarget"))] + { + if !self.is_outbound() { + let projected_commit_tx_info = self.next_remote_commitment_tx_fee_info_cached.lock().unwrap().take(); + *self.next_local_commitment_tx_fee_info_cached.lock().unwrap() = None; + if let Some(info) = projected_commit_tx_info { + let total_pending_htlcs = self.pending_inbound_htlcs.len() + self.pending_outbound_htlcs.len(); + if info.total_pending_htlcs == total_pending_htlcs + && info.next_holder_htlc_id == self.next_holder_htlc_id + && info.next_counterparty_htlc_id == self.next_counterparty_htlc_id + && info.feerate == self.feerate_per_kw { + let actual_fee = self.commit_tx_fee_msat(counterparty_commitment_tx.1); + assert_eq!(actual_fee, info.fee); + } + } + } + } + { let mut htlcs = Vec::with_capacity(counterparty_commitment_tx.2.len()); for &(ref htlc, _) in counterparty_commitment_tx.2.iter() { @@ -4471,6 +4671,11 @@ impl<'a, ChanSigner: ChannelKeys, K: Deref> ReadableArgs<&'a K> for Channel::new_outbound(&&feeest, &&keys_provider, node_b_node_id, 10000000, 100000, 42, &config).unwrap(); + + // Create Node B's channel by receiving Node A's open_channel message + // Make sure A's dust limit is as we expect. + let open_channel_msg = node_a_chan.get_open_channel(genesis_block(network).header.block_hash()); + assert_eq!(open_channel_msg.dust_limit_satoshis, 1560); + let node_b_node_id = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[7; 32]).unwrap()); + let node_b_chan = Channel::::new_from_req(&&feeest, &&keys_provider, node_b_node_id, InitFeatures::known(), &open_channel_msg, 7, &config).unwrap(); + + // Node B --> Node A: accept channel, explicitly setting B's dust limit. + let mut accept_channel_msg = node_b_chan.get_accept_channel(); + accept_channel_msg.dust_limit_satoshis = 546; + node_a_chan.accept_channel(&accept_channel_msg, &config, InitFeatures::known()).unwrap(); + + // Put some inbound and outbound HTLCs in A's channel. + let htlc_amount_msat = 11_092_000; // put an amount below A's effective dust limit but above B's. + node_a_chan.pending_inbound_htlcs.push(InboundHTLCOutput { + htlc_id: 0, + amount_msat: htlc_amount_msat, + payment_hash: PaymentHash(Sha256::hash(&[42; 32]).into_inner()), + cltv_expiry: 300000000, + state: InboundHTLCState::Committed, + }); + + node_a_chan.pending_outbound_htlcs.push(OutboundHTLCOutput { + htlc_id: 1, + amount_msat: htlc_amount_msat, // put an amount below A's dust amount but above B's. + payment_hash: PaymentHash(Sha256::hash(&[43; 32]).into_inner()), + cltv_expiry: 200000000, + state: OutboundHTLCState::Committed, + source: HTLCSource::OutboundRoute { + path: Vec::new(), + session_priv: SecretKey::from_slice(&hex::decode("0fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap()[..]).unwrap(), + first_hop_htlc_msat: 548, + } + }); + + // Make sure when Node A calculates their local commitment transaction, none of the HTLCs pass + // the dust limit check. + let htlc_candidate = HTLCCandidate::new(htlc_amount_msat, HTLCInitiator::LocalOffered); + let local_commit_tx_fee = node_a_chan.next_local_commit_tx_fee_msat(htlc_candidate, None); + let local_commit_fee_0_htlcs = node_a_chan.commit_tx_fee_msat(0); + assert_eq!(local_commit_tx_fee, local_commit_fee_0_htlcs); + + // Finally, make sure that when Node A calculates the remote's commitment transaction fees, all + // of the HTLCs are seen to be above the dust limit. + node_a_chan.channel_transaction_parameters.is_outbound_from_holder = false; + let remote_commit_fee_3_htlcs = node_a_chan.commit_tx_fee_msat(3); + let htlc_candidate = HTLCCandidate::new(htlc_amount_msat, HTLCInitiator::LocalOffered); + let remote_commit_tx_fee = node_a_chan.next_remote_commit_tx_fee_msat(htlc_candidate, None); + assert_eq!(remote_commit_tx_fee, remote_commit_fee_3_htlcs); + } + + #[test] + fn test_timeout_vs_success_htlc_dust_limit() { + // Make sure that when `next_remote_commit_tx_fee_msat` and `next_local_commit_tx_fee_msat` + // calculate the real dust limits for HTLCs (i.e. the dust limit given by the counterparty + // *plus* the fees paid for the HTLC) they don't swap `HTLC_SUCCESS_TX_WEIGHT` for + // `HTLC_TIMEOUT_TX_WEIGHT`, and vice versa. + let fee_est = TestFeeEstimator{fee_est: 253 }; + let secp_ctx = Secp256k1::new(); + let seed = [42; 32]; + let network = Network::Testnet; + let keys_provider = test_utils::TestKeysInterface::new(&seed, network); + + let node_id = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); + let config = UserConfig::default(); + let mut chan = Channel::::new_outbound(&&fee_est, &&keys_provider, node_id, 10000000, 100000, 42, &config).unwrap(); + + let commitment_tx_fee_0_htlcs = chan.commit_tx_fee_msat(0); + let commitment_tx_fee_1_htlc = chan.commit_tx_fee_msat(1); + + // If HTLC_SUCCESS_TX_WEIGHT and HTLC_TIMEOUT_TX_WEIGHT were swapped: then this HTLC would be + // counted as dust when it shouldn't be. + let htlc_amt_above_timeout = ((253 * HTLC_TIMEOUT_TX_WEIGHT / 1000) + chan.holder_dust_limit_satoshis + 1) * 1000; + let htlc_candidate = HTLCCandidate::new(htlc_amt_above_timeout, HTLCInitiator::LocalOffered); + let commitment_tx_fee = chan.next_local_commit_tx_fee_msat(htlc_candidate, None); + assert_eq!(commitment_tx_fee, commitment_tx_fee_1_htlc); + + // If swapped: this HTLC would be counted as non-dust when it shouldn't be. + let dust_htlc_amt_below_success = ((253 * HTLC_SUCCESS_TX_WEIGHT / 1000) + chan.holder_dust_limit_satoshis - 1) * 1000; + let htlc_candidate = HTLCCandidate::new(dust_htlc_amt_below_success, HTLCInitiator::RemoteOffered); + let commitment_tx_fee = chan.next_local_commit_tx_fee_msat(htlc_candidate, None); + assert_eq!(commitment_tx_fee, commitment_tx_fee_0_htlcs); + + chan.channel_transaction_parameters.is_outbound_from_holder = false; + + // If swapped: this HTLC would be counted as non-dust when it shouldn't be. + let dust_htlc_amt_above_timeout = ((253 * HTLC_TIMEOUT_TX_WEIGHT / 1000) + chan.counterparty_dust_limit_satoshis + 1) * 1000; + let htlc_candidate = HTLCCandidate::new(dust_htlc_amt_above_timeout, HTLCInitiator::LocalOffered); + let commitment_tx_fee = chan.next_remote_commit_tx_fee_msat(htlc_candidate, None); + assert_eq!(commitment_tx_fee, commitment_tx_fee_0_htlcs); + + // If swapped: this HTLC would be counted as dust when it shouldn't be. + let htlc_amt_below_success = ((253 * HTLC_SUCCESS_TX_WEIGHT / 1000) + chan.counterparty_dust_limit_satoshis - 1) * 1000; + let htlc_candidate = HTLCCandidate::new(htlc_amt_below_success, HTLCInitiator::RemoteOffered); + let commitment_tx_fee = chan.next_remote_commit_tx_fee_msat(htlc_candidate, None); + assert_eq!(commitment_tx_fee, commitment_tx_fee_1_htlc); + } + #[test] fn channel_reestablish_no_updates() { let feeest = TestFeeEstimator{fee_est: 15000}; diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 3dcc74f90..84dc486c9 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -58,9 +58,11 @@ use util::errors::APIError; use std::{cmp, mem}; use std::collections::{HashMap, hash_map, HashSet}; use std::io::{Cursor, Read}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock}; +use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; +#[cfg(any(test, feature = "allow_wallclock_use"))] +use std::time::Instant; use std::marker::{Sync, Send}; use std::ops::Deref; use bitcoin::hashes::hex::ToHex; @@ -437,13 +439,46 @@ pub struct ChannelManager, + persistence_notifier: PersistenceNotifier, + keys_manager: K, logger: L, } +/// Whenever we release the `ChannelManager`'s `total_consistency_lock`, from read mode, it is +/// desirable to notify any listeners on `wait_timeout`/`wait` that new updates are available for +/// persistence. Therefore, this struct is responsible for locking the total consistency lock and, +/// upon going out of scope, sending the aforementioned notification (since the lock being released +/// indicates that the updates are ready for persistence). +struct PersistenceNotifierGuard<'a> { + persistence_notifier: &'a PersistenceNotifier, + // We hold onto this result so the lock doesn't get released immediately. + _read_guard: RwLockReadGuard<'a, ()>, +} + +impl<'a> PersistenceNotifierGuard<'a> { + fn new(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> Self { + let read_guard = lock.read().unwrap(); + + Self { + persistence_notifier: notifier, + _read_guard: read_guard, + } + } +} + +impl<'a> Drop for PersistenceNotifierGuard<'a> { + fn drop(&mut self) { + self.persistence_notifier.notify(); + } +} + /// The amount of time we require our counterparty wait to claim their money (ie time between when /// we, or our watchtower, must check for them having broadcast a theft transaction). pub(crate) const BREAKDOWN_TIMEOUT: u16 = 6 * 24; @@ -759,6 +794,7 @@ impl pending_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), + persistence_notifier: PersistenceNotifier::new(), keys_manager, @@ -787,7 +823,10 @@ impl let channel = Channel::new_outbound(&self.fee_estimator, &self.keys_manager, their_network_key, channel_value_satoshis, push_msat, user_id, config)?; let res = channel.get_open_channel(self.genesis_hash.clone()); - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); + // We want to make sure the lock is actually acquired by PersistenceNotifierGuard. + debug_assert!(&self.total_consistency_lock.try_write().is_err()); + let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.entry(channel.channel_id()) { hash_map::Entry::Occupied(_) => { @@ -859,7 +898,7 @@ impl /// /// May generate a SendShutdown message event on success, which should be relayed. pub fn close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let (mut failed_htlcs, chan_option) = { let mut channel_state_lock = self.channel_state.lock().unwrap(); @@ -951,7 +990,7 @@ impl /// Force closes a channel, immediately broadcasting the latest local commitment transaction to /// the chain and rejecting new HTLCs on the given channel. Fails if channel_id is unknown to the manager. pub fn force_close_channel(&self, channel_id: &[u8; 32]) -> Result<(), APIError> { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); self.force_close_channel_with_peer(channel_id, None) } @@ -1279,7 +1318,7 @@ impl } let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash); - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let err: Result<(), _> = loop { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -1447,7 +1486,7 @@ impl /// May panic if the funding_txo is duplicative with some other channel (note that this should /// be trivially prevented by using unique funding transaction keys per-channel). pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_txo: OutPoint) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let (chan, msg) = { let (res, chan) = match self.channel_state.lock().unwrap().by_id.remove(temporary_channel_id) { @@ -1530,7 +1569,7 @@ impl /// /// Panics if addresses is absurdly large (more than 500). pub fn broadcast_node_announcement(&self, rgb: [u8; 3], alias: [u8; 32], addresses: Vec) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); if addresses.len() > 500 { panic!("More than half the message size was taken up by public addresses!"); @@ -1560,7 +1599,7 @@ impl /// Should only really ever be called in response to a PendingHTLCsForwardable event. /// Will likely generate further events. pub fn process_pending_htlc_forwards(&self) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut new_events = Vec::new(); let mut failed_forwards = Vec::new(); @@ -1820,7 +1859,7 @@ impl /// /// This method handles all the details, and must be called roughly once per minute. pub fn timer_chan_freshness_every_min(&self) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut channel_state_lock = self.channel_state.lock().unwrap(); let channel_state = &mut *channel_state_lock; for (_, chan) in channel_state.by_id.iter_mut() { @@ -1845,7 +1884,7 @@ impl /// Returns false if no payment was found to fail backwards, true if the process of failing the /// HTLC backwards has been started. pub fn fail_htlc_backwards(&self, payment_hash: &PaymentHash, payment_secret: &Option) -> bool { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(*payment_hash, *payment_secret)); @@ -2024,7 +2063,7 @@ impl pub fn claim_funds(&self, payment_preimage: PaymentPreimage, payment_secret: &Option, expected_amount: u64) -> bool { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut channel_state = Some(self.channel_state.lock().unwrap()); let removed_source = channel_state.as_mut().unwrap().claimable_htlcs.remove(&(payment_hash, *payment_secret)); @@ -2220,7 +2259,7 @@ impl /// 4) once all remote copies are updated, you call this function with the update_id that /// completed, and once it is the latest the Channel will be re-enabled. pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut close_results = Vec::new(); let mut htlc_forwards = Vec::new(); @@ -2971,7 +3010,7 @@ impl /// (C-not exported) Cause its doc(hidden) anyway #[doc(hidden)] pub fn update_fee(&self, channel_id: [u8;32], feerate_per_kw: u32) -> Result<(), APIError> { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let counterparty_node_id; let err: Result<(), _> = loop { let mut channel_state_lock = self.channel_state.lock().unwrap(); @@ -3111,7 +3150,7 @@ impl pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { let header_hash = header.block_hash(); log_trace!(self.logger, "Block {} at height {} connected", header_hash, height); - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut failed_channels = Vec::new(); let mut timed_out_htlcs = Vec::new(); { @@ -3224,7 +3263,7 @@ impl /// If necessary, the channel may be force-closed without letting the counterparty participate /// in the shutdown. pub fn block_disconnected(&self, header: &BlockHeader) { - let _consistency_lock = self.total_consistency_lock.read().unwrap(); + let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); let mut failed_channels = Vec::new(); { let mut channel_lock = self.channel_state.lock().unwrap(); @@ -3254,6 +3293,29 @@ impl self.latest_block_height.fetch_sub(1, Ordering::AcqRel); *self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.block_hash(); } + + /// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool + /// indicating whether persistence is necessary. Only one listener on `wait_timeout` is + /// guaranteed to be woken up. + /// Note that the feature `allow_wallclock_use` must be enabled to use this function. + #[cfg(any(test, feature = "allow_wallclock_use"))] + pub fn wait_timeout(&self, max_wait: Duration) -> bool { + self.persistence_notifier.wait_timeout(max_wait) + } + + /// Blocks until ChannelManager needs to be persisted. Only one listener on `wait` is + /// guaranteed to be woken up. + pub fn wait(&self) { + self.persistence_notifier.wait() + } + + #[cfg(any(test, feature = "_test_utils"))] + pub fn get_persistence_condvar_value(&self) -> bool { + let mutcond = &self.persistence_notifier.persistence_lock; + let &(ref mtx, _) = mutcond; + let guard = mtx.lock().unwrap(); + *guard + } } impl @@ -3265,87 +3327,87 @@ impl, Condvar), +} + +impl PersistenceNotifier { + fn new() -> Self { + Self { + persistence_lock: (Mutex::new(false), Condvar::new()), + } + } + + fn wait(&self) { + loop { + let &(ref mtx, ref cvar) = &self.persistence_lock; + let mut guard = mtx.lock().unwrap(); + guard = cvar.wait(guard).unwrap(); + let result = *guard; + if result { + *guard = false; + return + } + } + } + + #[cfg(any(test, feature = "allow_wallclock_use"))] + fn wait_timeout(&self, max_wait: Duration) -> bool { + let current_time = Instant::now(); + loop { + let &(ref mtx, ref cvar) = &self.persistence_lock; + let mut guard = mtx.lock().unwrap(); + guard = cvar.wait_timeout(guard, max_wait).unwrap().0; + // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the + // desired wait time has actually passed, and if not then restart the loop with a reduced wait + // time. Note that this logic can be highly simplified through the use of + // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to + // 1.42.0. + let elapsed = current_time.elapsed(); + let result = *guard; + if result || elapsed >= max_wait { + *guard = false; + return result; + } + match max_wait.checked_sub(elapsed) { + None => return result, + Some(_) => continue + } + } + } + + // Signal to the ChannelManager persister that there are updates necessitating persisting to disk. + fn notify(&self) { + let &(ref persist_mtx, ref cnd) = &self.persistence_lock; + let mut persistence_lock = persist_mtx.lock().unwrap(); + *persistence_lock = true; + mem::drop(persistence_lock); + cnd.notify_all(); + } +} + const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; @@ -4011,6 +4136,8 @@ impl<'a, ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Der pending_events: Mutex::new(pending_events_read), total_consistency_lock: RwLock::new(()), + persistence_notifier: PersistenceNotifier::new(), + keys_manager: args.keys_manager, logger: args.logger, default_configuration: args.default_config, @@ -4026,3 +4153,54 @@ impl<'a, ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Der Ok((last_block_hash.clone(), channel_manager)) } } + +#[cfg(test)] +mod tests { + use ln::channelmanager::PersistenceNotifier; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::thread; + use std::time::Duration; + + #[test] + fn test_wait_timeout() { + let persistence_notifier = Arc::new(PersistenceNotifier::new()); + let thread_notifier = Arc::clone(&persistence_notifier); + + let exit_thread = Arc::new(AtomicBool::new(false)); + let exit_thread_clone = exit_thread.clone(); + thread::spawn(move || { + loop { + let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock; + let mut persistence_lock = persist_mtx.lock().unwrap(); + *persistence_lock = true; + cnd.notify_all(); + + if exit_thread_clone.load(Ordering::SeqCst) { + break + } + } + }); + + // Check that we can block indefinitely until updates are available. + let _ = persistence_notifier.wait(); + + // Check that the PersistenceNotifier will return after the given duration if updates are + // available. + loop { + if persistence_notifier.wait_timeout(Duration::from_millis(100)) { + break + } + } + + exit_thread.store(true, Ordering::SeqCst); + + // Check that the PersistenceNotifier will return after the given duration even if no updates + // are available. + loop { + if !persistence_notifier.wait_timeout(Duration::from_millis(100)) { + break + } + } + } +} diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 2060b952d..f49112083 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -1701,8 +1701,9 @@ fn test_fee_spike_violation_fails_htlc() { fn test_chan_reserve_violation_outbound_htlc_inbound_chan() { let mut chanmon_cfgs = create_chanmon_cfgs(2); // Set the fee rate for the channel very high, to the point where the fundee - // sending any amount would result in a channel reserve violation. In this test - // we check that we would be prevented from sending an HTLC in this situation. + // sending any above-dust amount would result in a channel reserve violation. + // In this test we check that we would be prevented from sending an HTLC in + // this situation. chanmon_cfgs[0].fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 6000 }; chanmon_cfgs[1].fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 6000 }; let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); @@ -1720,7 +1721,7 @@ fn test_chan_reserve_violation_outbound_htlc_inbound_chan() { }} } - let (route, our_payment_hash, _) = get_route_and_payment_hash!(1000); + let (route, our_payment_hash, _) = get_route_and_payment_hash!(4843000); unwrap_send_err!(nodes[1].node.send_payment(&route, our_payment_hash, &None), true, APIError::ChannelUnavailable { ref err }, assert_eq!(err, "Cannot send value that would put counterparty balance under holder-announced channel reserve value")); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); @@ -1778,6 +1779,57 @@ fn test_chan_reserve_violation_inbound_htlc_outbound_channel() { check_added_monitors!(nodes[0], 1); } +#[test] +fn test_chan_reserve_dust_inbound_htlcs_outbound_chan() { + // Test that if we receive many dust HTLCs over an outbound channel, they don't count when + // calculating our commitment transaction fee (this was previously broken). + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None, None]); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + // Set nodes[0]'s balance such that they will consider any above-dust received HTLC to be a + // channel reserve violation (so their balance is channel reserve (1000 sats) + commitment + // transaction fee with 0 HTLCs (183 sats)). + create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 98817000, InitFeatures::known(), InitFeatures::known()); + + let dust_amt = 546000; // Dust amount + // In the previous code, routing this dust payment would cause nodes[0] to perceive a channel + // reserve violation even though it's a dust HTLC and therefore shouldn't count towards the + // commitment transaction fee. + let (_, _) = route_payment(&nodes[1], &[&nodes[0]], dust_amt); +} + +#[test] +fn test_chan_reserve_dust_inbound_htlcs_inbound_chan() { + // Test that if we receive many dust HTLCs over an inbound channel, they don't count when + // calculating our counterparty's commitment transaction fee (this was previously broken). + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None, None]); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100000, 98000000, InitFeatures::known(), InitFeatures::known()); + + let payment_amt = 46000; // Dust amount + // In the previous code, these first four payments would succeed. + let (_, _) = route_payment(&nodes[0], &[&nodes[1]], payment_amt); + let (_, _) = route_payment(&nodes[0], &[&nodes[1]], payment_amt); + let (_, _) = route_payment(&nodes[0], &[&nodes[1]], payment_amt); + let (_, _) = route_payment(&nodes[0], &[&nodes[1]], payment_amt); + + // Then these next 5 would be interpreted by nodes[1] as violating the fee spike buffer. + let (_, _) = route_payment(&nodes[0], &[&nodes[1]], payment_amt); + let (_, _) = route_payment(&nodes[0], &[&nodes[1]], payment_amt); + let (_, _) = route_payment(&nodes[0], &[&nodes[1]], payment_amt); + let (_, _) = route_payment(&nodes[0], &[&nodes[1]], payment_amt); + let (_, _) = route_payment(&nodes[0], &[&nodes[1]], payment_amt); + + // And this last payment previously resulted in nodes[1] closing on its inbound-channel + // counterparty, because it counted all the previous dust HTLCs against nodes[0]'s commitment + // transaction fee and therefore perceived this next payment as a channel reserve violation. + let (_, _) = route_payment(&nodes[0], &[&nodes[1]], payment_amt); +} + #[test] fn test_chan_reserve_violation_inbound_htlc_inbound_chan() { let chanmon_cfgs = create_chanmon_cfgs(3); @@ -2089,23 +2141,6 @@ fn test_channel_reserve_holding_cell_htlcs() { let commit_tx_fee_0_htlcs = 2*commit_tx_fee_msat(feerate, 1); let recv_value_3 = commit_tx_fee_2_htlcs - commit_tx_fee_0_htlcs - total_fee_msat; - { - let (route, our_payment_hash, _) = get_route_and_payment_hash!(recv_value_3 + 1); - let err = nodes[0].node.send_payment(&route, our_payment_hash, &None).err().unwrap(); - match err { - PaymentSendFailure::AllFailedRetrySafe(ref fails) => { - match &fails[0] { - &APIError::ChannelUnavailable{ref err} => - assert!(regex::Regex::new(r"Cannot send value that would put our balance under counterparty-announced channel reserve value \(\d+\)").unwrap().is_match(err)), - _ => panic!("Unexpected error variant"), - } - }, - _ => panic!("Unexpected error variant"), - } - assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); - nodes[0].logger.assert_log_contains("lightning::ln::channelmanager".to_string(), "Cannot send value that would put our balance under counterparty-announced channel reserve value".to_string(), 3); - } - send_payment(&nodes[0], &vec![&nodes[1], &nodes[2]][..], recv_value_3, recv_value_3); let commit_tx_fee_1_htlc = 2*commit_tx_fee_msat(feerate, 1 + 1); diff --git a/lightning/src/util/macro_logger.rs b/lightning/src/util/macro_logger.rs index 2065f4045..ab2b6ceea 100644 --- a/lightning/src/util/macro_logger.rs +++ b/lightning/src/util/macro_logger.rs @@ -155,12 +155,17 @@ macro_rules! log_spendable { } } +/// Create a new Record and log it. You probably don't want to use this macro directly, +/// but it needs to be exported so `log_trace` etc can use it in external crates. +#[macro_export] macro_rules! log_internal { ($logger: expr, $lvl:expr, $($arg:tt)+) => ( - $logger.log(&::util::logger::Record::new($lvl, format_args!($($arg)+), module_path!(), file!(), line!())); + $logger.log(&$crate::util::logger::Record::new($lvl, format_args!($($arg)+), module_path!(), file!(), line!())); ); } +/// Log an error. +#[macro_export] macro_rules! log_error { ($logger: expr, $($arg:tt)*) => ( #[cfg(not(any(feature = "max_level_off")))] @@ -189,6 +194,8 @@ macro_rules! log_debug { ) } +/// Log a trace log. +#[macro_export] macro_rules! log_trace { ($logger: expr, $($arg:tt)*) => ( #[cfg(not(any(feature = "max_level_off", feature = "max_level_error", feature = "max_level_warn", feature = "max_level_info", feature = "max_level_debug")))] diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index 57b5f2d74..b8028ea97 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -25,8 +25,10 @@ pub(crate) mod transaction_utils; #[macro_use] pub(crate) mod ser_macros; + +/// Logging macro utilities. #[macro_use] -pub(crate) mod macro_logger; +pub mod macro_logger; // These have to come after macro_logger to build pub mod logger;