1 //! Objects related to [`FilesystemStore`] live here.
2 use lightning::util::persist::KVStore;
3 use lightning::util::string::PrintableString;
5 use std::collections::HashMap;
7 use std::io::{BufReader, Read, Write};
8 use std::path::{Path, PathBuf};
9 use std::sync::atomic::{AtomicUsize, Ordering};
10 use std::sync::{Arc, Mutex, RwLock};
12 #[cfg(not(target_os = "windows"))]
13 use std::os::unix::io::AsRawFd;
15 #[cfg(target_os = "windows")]
16 use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
18 #[cfg(target_os = "windows")]
24 return Err(std::io::Error::last_os_error());
29 #[cfg(target_os = "windows")]
30 fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<u16> {
31 path.as_ref().encode_wide().chain(Some(0)).collect()
34 /// A [`KVStore`] implementation that writes to and reads from the file system.
35 pub struct FilesystemStore {
37 tmp_file_counter: AtomicUsize,
38 locks: Mutex<HashMap<(String, String), Arc<RwLock<()>>>>,
41 impl FilesystemStore {
42 /// Constructs a new [`FilesystemStore`].
43 pub fn new(data_dir: PathBuf) -> Self {
44 let locks = Mutex::new(HashMap::new());
45 let tmp_file_counter = AtomicUsize::new(0);
46 Self { data_dir, tmp_file_counter, locks }
49 /// Returns the data directory.
50 pub fn get_data_dir(&self) -> PathBuf {
55 impl KVStore for FilesystemStore {
56 type Reader = FilesystemReader;
58 fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
60 let msg = format!("Failed to read {}/{}: key may not be empty.",
61 PrintableString(namespace), PrintableString(key));
62 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
65 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
66 key.chars().any(|c| !c.is_ascii() || c.is_control()) {
67 debug_assert!(false, "Failed to read {}/{}: namespace and key must be valid ASCII
68 strings.", PrintableString(namespace), PrintableString(key));
69 let msg = format!("Failed to read {}/{}: namespace and key must be valid ASCII strings.",
70 PrintableString(namespace), PrintableString(key));
71 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
74 let mut outer_lock = self.locks.lock().unwrap();
75 let lock_key = (namespace.to_string(), key.to_string());
76 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
78 let mut dest_file_path = self.data_dir.clone();
79 dest_file_path.push(namespace);
80 dest_file_path.push(key);
81 FilesystemReader::new(dest_file_path, inner_lock_ref)
84 fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
86 let msg = format!("Failed to write {}/{}: key may not be empty.",
87 PrintableString(namespace), PrintableString(key));
88 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
91 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
92 key.chars().any(|c| !c.is_ascii() || c.is_control()) {
93 debug_assert!(false, "Failed to write {}/{}: namespace and key must be valid ASCII
94 strings.", PrintableString(namespace), PrintableString(key));
95 let msg = format!("Failed to write {}/{}: namespace and key must be valid ASCII strings.",
96 PrintableString(namespace), PrintableString(key));
97 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
100 let mut outer_lock = self.locks.lock().unwrap();
101 let lock_key = (namespace.to_string(), key.to_string());
102 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
103 let _guard = inner_lock_ref.write().unwrap();
105 let mut dest_file_path = self.data_dir.clone();
106 dest_file_path.push(namespace);
107 dest_file_path.push(key);
109 let parent_directory = dest_file_path
113 format!("Could not retrieve parent directory of {}.", dest_file_path.display());
114 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
117 fs::create_dir_all(&parent_directory)?;
119 // Do a crazy dance with lots of fsync()s to be overly cautious here...
120 // We never want to end up in a state where we've lost the old data, or end up using the
121 // old data on power loss after we've returned.
122 // The way to atomically write a file on Unix platforms is:
123 // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
124 let mut tmp_file_path = dest_file_path.clone();
125 let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
126 tmp_file_path.set_extension(tmp_file_ext);
129 let mut tmp_file = fs::File::create(&tmp_file_path)?;
130 tmp_file.write_all(&buf)?;
131 tmp_file.sync_all()?;
134 #[cfg(not(target_os = "windows"))]
136 fs::rename(&tmp_file_path, &dest_file_path)?;
137 let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
139 libc::fsync(dir_file.as_raw_fd());
144 #[cfg(target_os = "windows")]
146 if dest_file_path.exists() {
148 windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
149 path_to_windows_str(dest_file_path).as_ptr(),
150 path_to_windows_str(tmp_file_path).as_ptr(),
152 windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
153 std::ptr::null_mut() as *const core::ffi::c_void,
154 std::ptr::null_mut() as *const core::ffi::c_void,
159 windows_sys::Win32::Storage::FileSystem::MoveFileExW(
160 path_to_windows_str(tmp_file_path).as_ptr(),
161 path_to_windows_str(dest_file_path).as_ptr(),
162 windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
163 | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
170 fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> {
172 let msg = format!("Failed to remove {}/{}: key may not be empty.",
173 PrintableString(namespace), PrintableString(key));
174 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
177 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
178 key.chars().any(|c| !c.is_ascii() || c.is_control()) {
179 debug_assert!(false, "Failed to remove {}/{}: namespace and key must be valid ASCII
180 strings.", PrintableString(namespace), PrintableString(key));
181 let msg = format!("Failed to remove {}/{}: namespace and key must be valid ASCII strings.",
182 PrintableString(namespace), PrintableString(key));
183 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
186 let mut outer_lock = self.locks.lock().unwrap();
187 let lock_key = (namespace.to_string(), key.to_string());
188 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key.clone()).or_default());
190 let _guard = inner_lock_ref.write().unwrap();
192 let mut dest_file_path = self.data_dir.clone();
193 dest_file_path.push(namespace);
194 dest_file_path.push(key);
196 if !dest_file_path.is_file() {
200 fs::remove_file(&dest_file_path)?;
201 #[cfg(not(target_os = "windows"))]
203 let parent_directory = dest_file_path.parent().ok_or_else(|| {
205 format!("Could not retrieve parent directory of {}.", dest_file_path.display());
206 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
208 let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
210 // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
211 // to the inode might get cached (and hence possibly lost on crash), depending on
212 // the target platform and file system.
214 // In order to assert we permanently removed the file in question we therefore
215 // call `fsync` on the parent directory on platforms that support it,
216 libc::fsync(dir_file.as_raw_fd());
220 if dest_file_path.is_file() {
221 return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
224 if Arc::strong_count(&inner_lock_ref) == 2 {
225 // It's safe to remove the lock entry if we're the only one left holding a strong
226 // reference. Checking this is necessary to ensure we continue to distribute references to the
227 // same lock as long as some Readers are around. However, we still want to
228 // clean up the table when possible.
230 // Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are
231 // around, but is preferable to doing nothing *or* something overly complex such as
232 // implementing yet another RAII structure just for this pupose.
233 outer_lock.remove(&lock_key);
236 // Garbage collect all lock entries that are not referenced anymore.
237 outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
242 fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
243 let mut prefixed_dest = self.data_dir.clone();
244 prefixed_dest.push(namespace);
246 let mut keys = Vec::new();
248 if !Path::new(&prefixed_dest).exists() {
249 return Ok(Vec::new());
252 for entry in fs::read_dir(&prefixed_dest)? {
254 let p = entry.path();
260 if let Some(ext) = p.extension() {
266 if let Some(relative_path) = p.strip_prefix(&prefixed_dest).ok()
267 .and_then(|p| p.to_str()) {
268 if relative_path.chars().all(|c| c.is_ascii() && !c.is_control()) {
269 keys.push(relative_path.to_string())
278 /// A buffered [`Read`] implementation as returned from [`FilesystemStore::read`].
279 pub struct FilesystemReader {
280 inner: BufReader<fs::File>,
281 lock_ref: Arc<RwLock<()>>,
284 impl FilesystemReader {
285 fn new(dest_file_path: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
286 let f = fs::File::open(dest_file_path.clone())?;
287 let inner = BufReader::new(f);
288 Ok(Self { inner, lock_ref })
292 impl Read for FilesystemReader {
293 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
294 let _guard = self.lock_ref.read().unwrap();
302 use crate::test_utils::do_read_write_remove_list_persist;
305 fn read_write_remove_list_persist() {
306 let temp_path = std::env::temp_dir();
307 let fs_store = FilesystemStore::new(temp_path);
308 do_read_write_remove_list_persist(&fs_store);