From: Elias Rohrer Date: Tue, 1 Aug 2023 08:46:51 +0000 (+0200) Subject: Add `FilesystemStore` X-Git-Tag: v0.0.117-alpha1~24^2~5 X-Git-Url: http://git.bitcoin.ninja/?a=commitdiff_plain;h=f22d1b63902404a15ca808c977f8364cbf90d277;p=rust-lightning Add `FilesystemStore` We upstream the `FilesystemStore` implementation, which is backwards compatible with `lightning-persister::FilesystemPersister`. --- diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml index a6775a18c..271f3b882 100644 --- a/lightning-persister/Cargo.toml +++ b/lightning-persister/Cargo.toml @@ -20,6 +20,7 @@ libc = "0.2" [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } +windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] } [target.'cfg(ldk_bench)'.dependencies] criterion = { version = "0.4", optional = true, default-features = false } diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs new file mode 100644 index 000000000..f74806d7d --- /dev/null +++ b/lightning-persister/src/fs_store.rs @@ -0,0 +1,379 @@ +//! Objects related to [`FilesystemStore`] live here. +use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str}; + +use lightning::util::persist::KVStore; +use lightning::util::string::PrintableString; + +use std::collections::HashMap; +use std::fs; +use std::io::{Read, Write}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; + +#[cfg(target_os = "windows")] +use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; + +#[cfg(target_os = "windows")] +macro_rules! call { + ($e: expr) => { + if $e != 0 { + Ok(()) + } else { + Err(std::io::Error::last_os_error()) + } + }; +} + +#[cfg(target_os = "windows")] +fn path_to_windows_str>(path: T) -> Vec { + path.as_ref().encode_wide().chain(Some(0)).collect() +} + +// The number of read/write/remove/list operations after which we clean up our `locks` HashMap. +const GC_LOCK_INTERVAL: usize = 25; + +/// A [`KVStore`] implementation that writes to and reads from the file system. +pub struct FilesystemStore { + data_dir: PathBuf, + tmp_file_counter: AtomicUsize, + gc_counter: AtomicUsize, + locks: Mutex>>>, +} + +impl FilesystemStore { + /// Constructs a new [`FilesystemStore`]. + pub fn new(data_dir: PathBuf) -> Self { + let locks = Mutex::new(HashMap::new()); + let tmp_file_counter = AtomicUsize::new(0); + let gc_counter = AtomicUsize::new(1); + Self { data_dir, tmp_file_counter, gc_counter, locks } + } + + /// Returns the data directory. + pub fn get_data_dir(&self) -> PathBuf { + self.data_dir.clone() + } + + fn garbage_collect_locks(&self) { + let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel); + + if gc_counter % GC_LOCK_INTERVAL == 0 { + // Take outer lock for the cleanup. + let mut outer_lock = self.locks.lock().unwrap(); + + // Garbage collect all lock entries that are not referenced anymore. + outer_lock.retain(|_, v| Arc::strong_count(&v) > 1); + } + } + + fn get_dest_dir_path(&self, namespace: &str, sub_namespace: &str) -> std::io::Result { + let mut dest_dir_path = { + #[cfg(target_os = "windows")] + { + let data_dir = self.data_dir.clone(); + fs::create_dir_all(data_dir.clone())?; + fs::canonicalize(data_dir)? + } + #[cfg(not(target_os = "windows"))] + { + self.data_dir.clone() + } + }; + + dest_dir_path.push(namespace); + if !sub_namespace.is_empty() { + dest_dir_path.push(sub_namespace); + } + + Ok(dest_dir_path) + } +} + +impl KVStore for FilesystemStore { + fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> std::io::Result> { + check_namespace_key_validity(namespace, sub_namespace, Some(key), "read")?; + + let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?; + dest_file_path.push(key); + + let mut buf = Vec::new(); + { + let inner_lock_ref = { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) + }; + let _guard = inner_lock_ref.read().unwrap(); + + let mut f = fs::File::open(dest_file_path)?; + f.read_to_end(&mut buf)?; + } + + self.garbage_collect_locks(); + + Ok(buf) + } + + fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> { + check_namespace_key_validity(namespace, sub_namespace, Some(key), "write")?; + + let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?; + dest_file_path.push(key); + + let parent_directory = dest_file_path + .parent() + .ok_or_else(|| { + let msg = + format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + fs::create_dir_all(&parent_directory)?; + + // Do a crazy dance with lots of fsync()s to be overly cautious here... + // We never want to end up in a state where we've lost the old data, or end up using the + // old data on power loss after we've returned. + // The way to atomically write a file on Unix platforms is: + // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) + let mut tmp_file_path = dest_file_path.clone(); + let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + tmp_file_path.set_extension(tmp_file_ext); + + { + let mut tmp_file = fs::File::create(&tmp_file_path)?; + tmp_file.write_all(&buf)?; + tmp_file.sync_all()?; + } + + let res = { + let inner_lock_ref = { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) + }; + let _guard = inner_lock_ref.write().unwrap(); + + #[cfg(not(target_os = "windows"))] + { + fs::rename(&tmp_file_path, &dest_file_path)?; + let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?; + dir_file.sync_all()?; + Ok(()) + } + + #[cfg(target_os = "windows")] + { + let res = if dest_file_path.exists() { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::ReplaceFileW( + path_to_windows_str(dest_file_path.clone()).as_ptr(), + path_to_windows_str(tmp_file_path).as_ptr(), + std::ptr::null(), + windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS, + std::ptr::null_mut() as *const core::ffi::c_void, + std::ptr::null_mut() as *const core::ffi::c_void, + ) + }) + } else { + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( + path_to_windows_str(tmp_file_path).as_ptr(), + path_to_windows_str(dest_file_path.clone()).as_ptr(), + windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH + | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, + ) + }) + }; + + match res { + Ok(()) => { + // We fsync the dest file in hopes this will also flush the metadata to disk. + let dest_file = fs::OpenOptions::new().read(true).write(true) + .open(&dest_file_path)?; + dest_file.sync_all()?; + Ok(()) + } + Err(e) => Err(e), + } + } + }; + + self.garbage_collect_locks(); + + res + } + + fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> std::io::Result<()> { + check_namespace_key_validity(namespace, sub_namespace, Some(key), "remove")?; + + let mut dest_file_path = self.get_dest_dir_path(namespace, sub_namespace)?; + dest_file_path.push(key); + + if !dest_file_path.is_file() { + return Ok(()); + } + + { + let inner_lock_ref = { + let mut outer_lock = self.locks.lock().unwrap(); + Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) + }; + let _guard = inner_lock_ref.write().unwrap(); + + if lazy { + // If we're lazy we just call remove and be done with it. + fs::remove_file(&dest_file_path)?; + } else { + // If we're not lazy we try our best to persist the updated metadata to ensure + // atomicity of this call. + #[cfg(not(target_os = "windows"))] + { + fs::remove_file(&dest_file_path)?; + + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = + format!("Could not retrieve parent directory of {}.", dest_file_path.display()); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; + // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes + // to the inode might get cached (and hence possibly lost on crash), depending on + // the target platform and file system. + // + // In order to assert we permanently removed the file in question we therefore + // call `fsync` on the parent directory on platforms that support it. + dir_file.sync_all()?; + } + + #[cfg(target_os = "windows")] + { + // Since Windows `DeleteFile` API is not persisted until the last open file handle + // is dropped, and there seemingly is no reliable way to flush the directory + // metadata, we here fall back to use a 'recycling bin' model, i.e., first move the + // file to be deleted to a temporary trash file and remove the latter file + // afterwards. + // + // This should be marginally better, as, according to the documentation, + // `MoveFileExW` APIs should offer stronger persistence guarantees, + // at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set. + // However, all this is partially based on assumptions and local experiments, as + // Windows API is horribly underdocumented. + let mut trash_file_path = dest_file_path.clone(); + let trash_file_ext = format!("{}.trash", + self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + trash_file_path.set_extension(trash_file_ext); + + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( + path_to_windows_str(dest_file_path).as_ptr(), + path_to_windows_str(trash_file_path.clone()).as_ptr(), + windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH + | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, + ) + })?; + + { + // We fsync the trash file in hopes this will also flush the original's file + // metadata to disk. + let trash_file = fs::OpenOptions::new().read(true).write(true) + .open(&trash_file_path.clone())?; + trash_file.sync_all()?; + } + + // We're fine if this remove would fail as the trash file will be cleaned up in + // list eventually. + fs::remove_file(trash_file_path).ok(); + } + } + } + + self.garbage_collect_locks(); + + Ok(()) + } + + fn list(&self, namespace: &str, sub_namespace: &str) -> std::io::Result> { + check_namespace_key_validity(namespace, sub_namespace, None, "list")?; + + let prefixed_dest = self.get_dest_dir_path(namespace, sub_namespace)?; + let mut keys = Vec::new(); + + if !Path::new(&prefixed_dest).exists() { + return Ok(Vec::new()); + } + + for entry in fs::read_dir(&prefixed_dest)? { + let entry = entry?; + let p = entry.path(); + + if let Some(ext) = p.extension() { + #[cfg(target_os = "windows")] + { + // Clean up any trash files lying around. + if ext == "trash" { + fs::remove_file(p).ok(); + continue; + } + } + if ext == "tmp" { + continue; + } + } + + let metadata = p.metadata()?; + + // We allow the presence of directories in the empty namespace and just skip them. + if metadata.is_dir() { + continue; + } + + // If we otherwise don't find a file at the given path something went wrong. + if !metadata.is_file() { + debug_assert!(false, "Failed to list keys of {}/{}: file couldn't be accessed.", + PrintableString(namespace), PrintableString(sub_namespace)); + let msg = format!("Failed to list keys of {}/{}: file couldn't be accessed.", + PrintableString(namespace), PrintableString(sub_namespace)); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + match p.strip_prefix(&prefixed_dest) { + Ok(stripped_path) => { + if let Some(relative_path) = stripped_path.to_str() { + if is_valid_kvstore_str(relative_path) { + keys.push(relative_path.to_string()) + } + } else { + debug_assert!(false, "Failed to list keys of {}/{}: file path is not valid UTF-8", + PrintableString(namespace), PrintableString(sub_namespace)); + let msg = format!("Failed to list keys of {}/{}: file path is not valid UTF-8", + PrintableString(namespace), PrintableString(sub_namespace)); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + } + Err(e) => { + debug_assert!(false, "Failed to list keys of {}/{}: {}", + PrintableString(namespace), PrintableString(sub_namespace), e); + let msg = format!("Failed to list keys of {}/{}: {}", + PrintableString(namespace), PrintableString(sub_namespace), e); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + } + } + + self.garbage_collect_locks(); + + Ok(keys) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::do_read_write_remove_list_persist; + + #[test] + fn read_write_remove_list_persist() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_read_write_remove_list_persist"); + let fs_store = FilesystemStore::new(temp_path); + do_read_write_remove_list_persist(&fs_store); + } +} diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 0a667e421..932e4f41a 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -10,6 +10,10 @@ #[cfg(ldk_bench)] extern crate criterion; +pub mod fs_store; + +mod utils; + #[cfg(test)] mod test_utils; diff --git a/lightning-persister/src/utils.rs b/lightning-persister/src/utils.rs new file mode 100644 index 000000000..54ec230de --- /dev/null +++ b/lightning-persister/src/utils.rs @@ -0,0 +1,59 @@ +use lightning::util::persist::{KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN}; +use lightning::util::string::PrintableString; + + +pub(crate) fn is_valid_kvstore_str(key: &str) -> bool { + key.len() <= KVSTORE_NAMESPACE_KEY_MAX_LEN && key.chars().all(|c| KVSTORE_NAMESPACE_KEY_ALPHABET.contains(c)) +} + +pub(crate) fn check_namespace_key_validity(namespace: &str, sub_namespace: &str, key: Option<&str>, operation: &str) -> Result<(), std::io::Error> { + if let Some(key) = key { + if key.is_empty() { + debug_assert!(false, "Failed to {} {}/{}/{}: key may not be empty.", operation, + PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key)); + let msg = format!("Failed to {} {}/{}/{}: key may not be empty.", operation, + PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key)); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + if namespace.is_empty() && !sub_namespace.is_empty() { + debug_assert!(false, + "Failed to {} {}/{}/{}: namespace may not be empty if a non-empty sub-namespace is given.", + operation, + PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key)); + let msg = format!( + "Failed to {} {}/{}/{}: namespace may not be empty if a non-empty sub-namespace is given.", operation, + PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key)); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + + if !is_valid_kvstore_str(namespace) || !is_valid_kvstore_str(sub_namespace) || !is_valid_kvstore_str(key) { + debug_assert!(false, "Failed to {} {}/{}/{}: namespace, sub-namespace, and key must be valid.", + operation, + PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key)); + let msg = format!("Failed to {} {}/{}/{}: namespace, sub-namespace, and key must be valid.", + operation, + PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key)); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + } else { + if namespace.is_empty() && !sub_namespace.is_empty() { + debug_assert!(false, + "Failed to {} {}/{}: namespace may not be empty if a non-empty sub-namespace is given.", + operation, PrintableString(namespace), PrintableString(sub_namespace)); + let msg = format!( + "Failed to {} {}/{}: namespace may not be empty if a non-empty sub-namespace is given.", + operation, PrintableString(namespace), PrintableString(sub_namespace)); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + if !is_valid_kvstore_str(namespace) || !is_valid_kvstore_str(sub_namespace) { + debug_assert!(false, "Failed to {} {}/{}: namespace and sub-namespace must be valid.", + operation, PrintableString(namespace), PrintableString(sub_namespace)); + let msg = format!("Failed to {} {}/{}: namespace and sub-namespace must be valid.", + operation, PrintableString(namespace), PrintableString(sub_namespace)); + return Err(std::io::Error::new(std::io::ErrorKind::Other, msg)); + } + } + + Ok(()) +}