1 //! Objects related to [`FilesystemStore`] live here.
2 use lightning::util::persist::KVStore;
4 use std::collections::HashMap;
6 use std::io::{BufReader, Read, Write};
7 use std::path::{Path, PathBuf};
8 use std::sync::{Arc, Mutex, RwLock};
10 #[cfg(not(target_os = "windows"))]
11 use std::os::unix::io::AsRawFd;
13 #[cfg(target_os = "windows")]
14 use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
16 #[cfg(target_os = "windows")]
22 return Err(std::io::Error::last_os_error());
27 #[cfg(target_os = "windows")]
28 fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<u16> {
29 path.as_ref().encode_wide().chain(Some(0)).collect()
32 /// A [`KVStore`] implementation that writes to and reads from the file system.
33 pub struct FilesystemStore {
35 locks: Mutex<HashMap<(String, String), Arc<RwLock<()>>>>,
38 impl FilesystemStore {
39 /// Constructs a new [`FilesystemStore`].
40 pub fn new(data_dir: PathBuf) -> Self {
41 let locks = Mutex::new(HashMap::new());
42 Self { data_dir, locks }
45 /// Returns the data directory.
46 pub fn get_data_dir(&self) -> PathBuf {
51 impl KVStore for FilesystemStore {
52 type Reader = FilesystemReader;
54 fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
56 let msg = format!("Failed to read {}/{}: key may not be empty.", namespace, key);
57 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
60 let mut outer_lock = self.locks.lock().unwrap();
61 let lock_key = (namespace.to_string(), key.to_string());
62 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
64 let mut dest_file_path = self.data_dir.clone();
65 dest_file_path.push(namespace);
66 dest_file_path.push(key);
67 FilesystemReader::new(dest_file_path, inner_lock_ref)
70 fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
72 let msg = format!("Failed to write {}/{}: key may not be empty.", namespace, key);
73 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
76 let mut outer_lock = self.locks.lock().unwrap();
77 let lock_key = (namespace.to_string(), key.to_string());
78 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
79 let _guard = inner_lock_ref.write().unwrap();
81 let mut dest_file_path = self.data_dir.clone();
82 dest_file_path.push(namespace);
83 dest_file_path.push(key);
85 let parent_directory = dest_file_path
89 format!("Could not retrieve parent directory of {}.", dest_file_path.display());
90 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
93 fs::create_dir_all(&parent_directory)?;
95 // Do a crazy dance with lots of fsync()s to be overly cautious here...
96 // We never want to end up in a state where we've lost the old data, or end up using the
97 // old data on power loss after we've returned.
98 // The way to atomically write a file on Unix platforms is:
99 // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
100 let mut tmp_file_path = dest_file_path.clone();
101 tmp_file_path.set_extension("tmp");
104 let mut tmp_file = fs::File::create(&tmp_file_path)?;
105 tmp_file.write_all(&buf)?;
106 tmp_file.sync_all()?;
109 #[cfg(not(target_os = "windows"))]
111 fs::rename(&tmp_file_path, &dest_file_path)?;
112 let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
114 libc::fsync(dir_file.as_raw_fd());
119 #[cfg(target_os = "windows")]
121 if dest_file_path.exists() {
123 windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
124 path_to_windows_str(dest_file_path).as_ptr(),
125 path_to_windows_str(tmp_file_path).as_ptr(),
127 windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
128 std::ptr::null_mut() as *const core::ffi::c_void,
129 std::ptr::null_mut() as *const core::ffi::c_void,
134 windows_sys::Win32::Storage::FileSystem::MoveFileExW(
135 path_to_windows_str(tmp_file_path).as_ptr(),
136 path_to_windows_str(dest_file_path).as_ptr(),
137 windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
138 | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
145 fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> {
147 let msg = format!("Failed to remove {}/{}: key may not be empty.", namespace, key);
148 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
151 let mut outer_lock = self.locks.lock().unwrap();
152 let lock_key = (namespace.to_string(), key.to_string());
153 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key.clone()).or_default());
155 let _guard = inner_lock_ref.write().unwrap();
157 let mut dest_file_path = self.data_dir.clone();
158 dest_file_path.push(namespace);
159 dest_file_path.push(key);
161 if !dest_file_path.is_file() {
165 fs::remove_file(&dest_file_path)?;
166 #[cfg(not(target_os = "windows"))]
168 let parent_directory = dest_file_path.parent().ok_or_else(|| {
170 format!("Could not retrieve parent directory of {}.", dest_file_path.display());
171 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
173 let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
175 // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
176 // to the inode might get cached (and hence possibly lost on crash), depending on
177 // the target platform and file system.
179 // In order to assert we permanently removed the file in question we therefore
180 // call `fsync` on the parent directory on platforms that support it,
181 libc::fsync(dir_file.as_raw_fd());
185 if dest_file_path.is_file() {
186 return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
189 if Arc::strong_count(&inner_lock_ref) == 2 {
190 // It's safe to remove the lock entry if we're the only one left holding a strong
191 // reference. Checking this is necessary to ensure we continue to distribute references to the
192 // same lock as long as some Readers are around. However, we still want to
193 // clean up the table when possible.
195 // Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are
196 // around, but is preferable to doing nothing *or* something overly complex such as
197 // implementing yet another RAII structure just for this pupose.
198 outer_lock.remove(&lock_key);
201 // Garbage collect all lock entries that are not referenced anymore.
202 outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
207 fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
208 let mut prefixed_dest = self.data_dir.clone();
209 prefixed_dest.push(namespace);
211 let mut keys = Vec::new();
213 if !Path::new(&prefixed_dest).exists() {
214 return Ok(Vec::new());
217 for entry in fs::read_dir(&prefixed_dest)? {
219 let p = entry.path();
225 if let Some(ext) = p.extension() {
231 if let Ok(relative_path) = p.strip_prefix(&prefixed_dest) {
232 keys.push(relative_path.display().to_string())
240 /// A buffered [`Read`] implementation as returned from [`FilesystemStore::read`].
241 pub struct FilesystemReader {
242 inner: BufReader<fs::File>,
243 lock_ref: Arc<RwLock<()>>,
246 impl FilesystemReader {
247 fn new(dest_file_path: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
248 let f = fs::File::open(dest_file_path.clone())?;
249 let inner = BufReader::new(f);
250 Ok(Self { inner, lock_ref })
254 impl Read for FilesystemReader {
255 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
256 let _guard = self.lock_ref.read().unwrap();
264 use crate::test_utils::do_read_write_remove_list_persist;
267 fn read_write_remove_list_persist() {
268 let temp_path = std::env::temp_dir();
269 let fs_store = FilesystemStore::new(temp_path);
270 do_read_write_remove_list_persist(&fs_store);