3c55e5f824d9dfe449a13e0e1679a2705262f170
[rust-lightning] / lightning-persister / src / fs_store.rs
1 //! Objects related to [`FilesystemStore`] live here.
2 use lightning::util::persist::KVStore;
3
4 use std::collections::HashMap;
5 use std::fs;
6 use std::io::{BufReader, Read, Write};
7 use std::path::{Path, PathBuf};
8 use std::sync::{Arc, Mutex, RwLock};
9
10 #[cfg(not(target_os = "windows"))]
11 use std::os::unix::io::AsRawFd;
12
13 #[cfg(target_os = "windows")]
14 use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
15
16 #[cfg(target_os = "windows")]
17 macro_rules! call {
18         ($e: expr) => {
19                 if $e != 0 {
20                         return Ok(());
21                 } else {
22                         return Err(std::io::Error::last_os_error());
23                 }
24         };
25 }
26
27 #[cfg(target_os = "windows")]
28 fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<u16> {
29         path.as_ref().encode_wide().chain(Some(0)).collect()
30 }
31
32 /// A [`KVStore`] implementation that writes to and reads from the file system.
33 pub struct FilesystemStore {
34         data_dir: PathBuf,
35         locks: Mutex<HashMap<(String, String), Arc<RwLock<()>>>>,
36 }
37
38 impl FilesystemStore {
39         /// Constructs a new [`FilesystemStore`].
40         pub fn new(data_dir: PathBuf) -> Self {
41                 let locks = Mutex::new(HashMap::new());
42                 Self { data_dir, locks }
43         }
44
45         /// Returns the data directory.
46         pub fn get_data_dir(&self) -> PathBuf {
47                 self.data_dir.clone()
48         }
49 }
50
51 impl KVStore for FilesystemStore {
52         type Reader = FilesystemReader;
53
54         fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
55                 let mut outer_lock = self.locks.lock().unwrap();
56                 let lock_key = (namespace.to_string(), key.to_string());
57                 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
58
59                 if key.is_empty() {
60                         let msg = format!("Failed to read {}/{}: key may not be empty.", namespace, key);
61                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
62                 }
63
64                 let mut dest_file_path = self.data_dir.clone();
65                 dest_file_path.push(namespace);
66                 dest_file_path.push(key);
67                 FilesystemReader::new(dest_file_path, inner_lock_ref)
68         }
69
70         fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
71                 let mut outer_lock = self.locks.lock().unwrap();
72                 let lock_key = (namespace.to_string(), key.to_string());
73                 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
74                 let _guard = inner_lock_ref.write().unwrap();
75
76                 if key.is_empty() {
77                         let msg = format!("Failed to write {}/{}: key may not be empty.", namespace, key);
78                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
79                 }
80
81                 let mut dest_file_path = self.data_dir.clone();
82                 dest_file_path.push(namespace);
83                 dest_file_path.push(key);
84
85                 let parent_directory = dest_file_path
86                         .parent()
87                         .ok_or_else(|| {
88                                 let msg =
89                                         format!("Could not retrieve parent directory of {}.", dest_file_path.display());
90                                 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
91                         })?
92                         .to_path_buf();
93                 fs::create_dir_all(&parent_directory)?;
94
95                 // Do a crazy dance with lots of fsync()s to be overly cautious here...
96                 // We never want to end up in a state where we've lost the old data, or end up using the
97                 // old data on power loss after we've returned.
98                 // The way to atomically write a file on Unix platforms is:
99                 // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
100                 let mut tmp_file_path = dest_file_path.clone();
101                 tmp_file_path.set_extension("tmp");
102
103                 {
104                         let mut tmp_file = fs::File::create(&tmp_file_path)?;
105                         tmp_file.write_all(&buf)?;
106                         tmp_file.sync_all()?;
107                 }
108
109                 #[cfg(not(target_os = "windows"))]
110                 {
111                         fs::rename(&tmp_file_path, &dest_file_path)?;
112                         let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
113                         unsafe {
114                                 libc::fsync(dir_file.as_raw_fd());
115                         }
116                         Ok(())
117                 }
118
119                 #[cfg(target_os = "windows")]
120                 {
121                         if dest_file_path.exists() {
122                                 call!(unsafe {
123                                         windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
124                                                 path_to_windows_str(dest_file_path).as_ptr(),
125                                                 path_to_windows_str(tmp_file_path).as_ptr(),
126                                                 std::ptr::null(),
127                                                 windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
128                                                 std::ptr::null_mut() as *const core::ffi::c_void,
129                                                 std::ptr::null_mut() as *const core::ffi::c_void,
130                                         )
131                                 });
132                         } else {
133                                 call!(unsafe {
134                                         windows_sys::Win32::Storage::FileSystem::MoveFileExW(
135                                                 path_to_windows_str(tmp_file_path).as_ptr(),
136                                                 path_to_windows_str(dest_file_path).as_ptr(),
137                                                 windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
138                                                         | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
139                                         )
140                                 });
141                         }
142                 }
143         }
144
145         fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> {
146                 let mut outer_lock = self.locks.lock().unwrap();
147                 let lock_key = (namespace.to_string(), key.to_string());
148                 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key.clone()).or_default());
149
150                 let _guard = inner_lock_ref.write().unwrap();
151
152                 if key.is_empty() {
153                         let msg = format!("Failed to remove {}/{}: key may not be empty.", namespace, key);
154                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
155                 }
156
157                 let mut dest_file_path = self.data_dir.clone();
158                 dest_file_path.push(namespace);
159                 dest_file_path.push(key);
160
161                 if !dest_file_path.is_file() {
162                         return Ok(());
163                 }
164
165                 fs::remove_file(&dest_file_path)?;
166                 #[cfg(not(target_os = "windows"))]
167                 {
168                         let parent_directory = dest_file_path.parent().ok_or_else(|| {
169                                 let msg =
170                                         format!("Could not retrieve parent directory of {}.", dest_file_path.display());
171                                 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
172                         })?;
173                         let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
174                         unsafe {
175                                 // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
176                                 // to the inode might get cached (and hence possibly lost on crash), depending on
177                                 // the target platform and file system.
178                                 //
179                                 // In order to assert we permanently removed the file in question we therefore
180                                 // call `fsync` on the parent directory on platforms that support it,
181                                 libc::fsync(dir_file.as_raw_fd());
182                         }
183                 }
184
185                 if dest_file_path.is_file() {
186                         return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
187                 }
188
189                 if Arc::strong_count(&inner_lock_ref) == 2 {
190                         // It's safe to remove the lock entry if we're the only one left holding a strong
191                         // reference. Checking this is necessary to ensure we continue to distribute references to the
192                         // same lock as long as some Readers are around. However, we still want to
193                         // clean up the table when possible.
194                         //
195                         // Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are
196                         // around, but is preferable to doing nothing *or* something overly complex such as
197                         // implementing yet another RAII structure just for this pupose.
198                         outer_lock.remove(&lock_key);
199                 }
200
201                 // Garbage collect all lock entries that are not referenced anymore.
202                 outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
203
204                 Ok(())
205         }
206
207         fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
208                 let mut prefixed_dest = self.data_dir.clone();
209                 prefixed_dest.push(namespace);
210
211                 let mut keys = Vec::new();
212
213                 if !Path::new(&prefixed_dest).exists() {
214                         return Ok(Vec::new());
215                 }
216
217                 for entry in fs::read_dir(&prefixed_dest)? {
218                         let entry = entry?;
219                         let p = entry.path();
220
221                         if !p.is_file() {
222                                 continue;
223                         }
224
225                         if let Some(ext) = p.extension() {
226                                 if ext == "tmp" {
227                                         continue;
228                                 }
229                         }
230
231                         if let Ok(relative_path) = p.strip_prefix(&prefixed_dest) {
232                                 keys.push(relative_path.display().to_string())
233                         }
234                 }
235
236                 Ok(keys)
237         }
238 }
239
240 /// A buffered [`Read`] implementation as returned from [`FilesystemStore::read`].
241 pub struct FilesystemReader {
242         inner: BufReader<fs::File>,
243         lock_ref: Arc<RwLock<()>>,
244 }
245
246 impl FilesystemReader {
247         fn new(dest_file_path: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
248                 let f = fs::File::open(dest_file_path.clone())?;
249                 let inner = BufReader::new(f);
250                 Ok(Self { inner, lock_ref })
251         }
252 }
253
254 impl Read for FilesystemReader {
255         fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
256                 let _guard = self.lock_ref.read().unwrap();
257                 self.inner.read(buf)
258         }
259 }
260
261 #[cfg(test)]
262 mod tests {
263         use super::*;
264         use crate::test_utils::do_read_write_remove_list_persist;
265
266         #[test]
267         fn read_write_remove_list_persist() {
268                 let temp_path = std::env::temp_dir();
269                 let fs_store = FilesystemStore::new(temp_path);
270                 do_read_write_remove_list_persist(&fs_store);
271         }
272 }