4ef8fc59809d162b63177734d88dd7be7ea4252c
[rust-lightning] / lightning-persister / src / fs_store.rs
1 //! Objects related to [`FilesystemStore`] live here.
2 use lightning::util::persist::KVStore;
3 use lightning::util::string::PrintableString;
4
5 use std::collections::HashMap;
6 use std::fs;
7 use std::io::{BufReader, Read, Write};
8 use std::path::{Path, PathBuf};
9 use std::sync::atomic::{AtomicUsize, Ordering};
10 use std::sync::{Arc, Mutex, RwLock};
11
12 #[cfg(not(target_os = "windows"))]
13 use std::os::unix::io::AsRawFd;
14
15 #[cfg(target_os = "windows")]
16 use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
17
18 #[cfg(target_os = "windows")]
19 macro_rules! call {
20         ($e: expr) => {
21                 if $e != 0 {
22                         return Ok(());
23                 } else {
24                         return Err(std::io::Error::last_os_error());
25                 }
26         };
27 }
28
29 #[cfg(target_os = "windows")]
30 fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<u16> {
31         path.as_ref().encode_wide().chain(Some(0)).collect()
32 }
33
34 /// A [`KVStore`] implementation that writes to and reads from the file system.
35 pub struct FilesystemStore {
36         data_dir: PathBuf,
37         tmp_file_counter: AtomicUsize,
38         locks: Mutex<HashMap<(String, String), Arc<RwLock<()>>>>,
39 }
40
41 impl FilesystemStore {
42         /// Constructs a new [`FilesystemStore`].
43         pub fn new(data_dir: PathBuf) -> Self {
44                 let locks = Mutex::new(HashMap::new());
45                 let tmp_file_counter = AtomicUsize::new(0);
46                 Self { data_dir, tmp_file_counter, locks }
47         }
48
49         /// Returns the data directory.
50         pub fn get_data_dir(&self) -> PathBuf {
51                 self.data_dir.clone()
52         }
53 }
54
55 impl KVStore for FilesystemStore {
56         type Reader = FilesystemReader;
57
58         fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
59                 if key.is_empty() {
60                         let msg = format!("Failed to read {}/{}: key may not be empty.",
61                                 PrintableString(namespace), PrintableString(key));
62                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
63                 }
64
65                 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
66                         key.chars().any(|c| !c.is_ascii() || c.is_control()) {
67                         debug_assert!(false, "Failed to read {}/{}: namespace and key must be valid ASCII
68                                 strings.", PrintableString(namespace), PrintableString(key));
69                         let msg = format!("Failed to read {}/{}: namespace and key must be valid ASCII strings.",
70                                 PrintableString(namespace), PrintableString(key));
71                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
72                 }
73
74                 let mut outer_lock = self.locks.lock().unwrap();
75                 let lock_key = (namespace.to_string(), key.to_string());
76                 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
77
78                 let mut dest_file_path = self.data_dir.clone();
79                 dest_file_path.push(namespace);
80                 dest_file_path.push(key);
81                 FilesystemReader::new(dest_file_path, inner_lock_ref)
82         }
83
84         fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
85                 if key.is_empty() {
86                         let msg = format!("Failed to write {}/{}: key may not be empty.",
87                                 PrintableString(namespace), PrintableString(key));
88                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
89                 }
90
91                 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
92                         key.chars().any(|c| !c.is_ascii() || c.is_control()) {
93                         debug_assert!(false, "Failed to write {}/{}: namespace and key must be valid ASCII
94                                 strings.", PrintableString(namespace), PrintableString(key));
95                         let msg = format!("Failed to write {}/{}: namespace and key must be valid ASCII strings.",
96                                 PrintableString(namespace), PrintableString(key));
97                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
98                 }
99
100                 let mut outer_lock = self.locks.lock().unwrap();
101                 let lock_key = (namespace.to_string(), key.to_string());
102                 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
103                 let _guard = inner_lock_ref.write().unwrap();
104
105                 let mut dest_file_path = self.data_dir.clone();
106                 dest_file_path.push(namespace);
107                 dest_file_path.push(key);
108
109                 let parent_directory = dest_file_path
110                         .parent()
111                         .ok_or_else(|| {
112                                 let msg =
113                                         format!("Could not retrieve parent directory of {}.", dest_file_path.display());
114                                 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
115                         })?
116                         .to_path_buf();
117                 fs::create_dir_all(&parent_directory)?;
118
119                 // Do a crazy dance with lots of fsync()s to be overly cautious here...
120                 // We never want to end up in a state where we've lost the old data, or end up using the
121                 // old data on power loss after we've returned.
122                 // The way to atomically write a file on Unix platforms is:
123                 // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
124                 let mut tmp_file_path = dest_file_path.clone();
125                 let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
126                 tmp_file_path.set_extension(tmp_file_ext);
127
128                 {
129                         let mut tmp_file = fs::File::create(&tmp_file_path)?;
130                         tmp_file.write_all(&buf)?;
131                         tmp_file.sync_all()?;
132                 }
133
134                 #[cfg(not(target_os = "windows"))]
135                 {
136                         fs::rename(&tmp_file_path, &dest_file_path)?;
137                         let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
138                         unsafe {
139                                 libc::fsync(dir_file.as_raw_fd());
140                         }
141                         Ok(())
142                 }
143
144                 #[cfg(target_os = "windows")]
145                 {
146                         if dest_file_path.exists() {
147                                 call!(unsafe {
148                                         windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
149                                                 path_to_windows_str(dest_file_path).as_ptr(),
150                                                 path_to_windows_str(tmp_file_path).as_ptr(),
151                                                 std::ptr::null(),
152                                                 windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
153                                                 std::ptr::null_mut() as *const core::ffi::c_void,
154                                                 std::ptr::null_mut() as *const core::ffi::c_void,
155                                         )
156                                 });
157                         } else {
158                                 call!(unsafe {
159                                         windows_sys::Win32::Storage::FileSystem::MoveFileExW(
160                                                 path_to_windows_str(tmp_file_path).as_ptr(),
161                                                 path_to_windows_str(dest_file_path).as_ptr(),
162                                                 windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
163                                                         | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
164                                         )
165                                 });
166                         }
167                 }
168         }
169
170         fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> {
171                 if key.is_empty() {
172                         let msg = format!("Failed to remove {}/{}: key may not be empty.",
173                                 PrintableString(namespace), PrintableString(key));
174                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
175                 }
176
177                 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
178                         key.chars().any(|c| !c.is_ascii() || c.is_control()) {
179                         debug_assert!(false, "Failed to remove {}/{}: namespace and key must be valid ASCII
180                                 strings.", PrintableString(namespace), PrintableString(key));
181                         let msg = format!("Failed to remove {}/{}: namespace and key must be valid ASCII strings.",
182                                 PrintableString(namespace), PrintableString(key));
183                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
184                 }
185
186                 let mut outer_lock = self.locks.lock().unwrap();
187                 let lock_key = (namespace.to_string(), key.to_string());
188                 let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key.clone()).or_default());
189
190                 let _guard = inner_lock_ref.write().unwrap();
191
192                 let mut dest_file_path = self.data_dir.clone();
193                 dest_file_path.push(namespace);
194                 dest_file_path.push(key);
195
196                 if !dest_file_path.is_file() {
197                         return Ok(());
198                 }
199
200                 fs::remove_file(&dest_file_path)?;
201                 #[cfg(not(target_os = "windows"))]
202                 {
203                         let parent_directory = dest_file_path.parent().ok_or_else(|| {
204                                 let msg =
205                                         format!("Could not retrieve parent directory of {}.", dest_file_path.display());
206                                 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
207                         })?;
208                         let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
209                         unsafe {
210                                 // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
211                                 // to the inode might get cached (and hence possibly lost on crash), depending on
212                                 // the target platform and file system.
213                                 //
214                                 // In order to assert we permanently removed the file in question we therefore
215                                 // call `fsync` on the parent directory on platforms that support it,
216                                 libc::fsync(dir_file.as_raw_fd());
217                         }
218                 }
219
220                 if dest_file_path.is_file() {
221                         return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
222                 }
223
224                 if Arc::strong_count(&inner_lock_ref) == 2 {
225                         // It's safe to remove the lock entry if we're the only one left holding a strong
226                         // reference. Checking this is necessary to ensure we continue to distribute references to the
227                         // same lock as long as some Readers are around. However, we still want to
228                         // clean up the table when possible.
229                         //
230                         // Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are
231                         // around, but is preferable to doing nothing *or* something overly complex such as
232                         // implementing yet another RAII structure just for this pupose.
233                         outer_lock.remove(&lock_key);
234                 }
235
236                 // Garbage collect all lock entries that are not referenced anymore.
237                 outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
238
239                 Ok(())
240         }
241
242         fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
243                 let mut prefixed_dest = self.data_dir.clone();
244                 prefixed_dest.push(namespace);
245
246                 let mut keys = Vec::new();
247
248                 if !Path::new(&prefixed_dest).exists() {
249                         return Ok(Vec::new());
250                 }
251
252                 for entry in fs::read_dir(&prefixed_dest)? {
253                         let entry = entry?;
254                         let p = entry.path();
255
256                         if !p.is_file() {
257                                 continue;
258                         }
259
260                         if let Some(ext) = p.extension() {
261                                 if ext == "tmp" {
262                                         continue;
263                                 }
264                         }
265
266                         if let Some(relative_path) = p.strip_prefix(&prefixed_dest).ok()
267                                 .and_then(|p| p.to_str()) {
268                                         if relative_path.chars().all(|c| c.is_ascii() && !c.is_control()) {
269                                                 keys.push(relative_path.to_string())
270                                         }
271                         }
272                 }
273
274                 Ok(keys)
275         }
276 }
277
278 /// A buffered [`Read`] implementation as returned from [`FilesystemStore::read`].
279 pub struct FilesystemReader {
280         inner: BufReader<fs::File>,
281         lock_ref: Arc<RwLock<()>>,
282 }
283
284 impl FilesystemReader {
285         fn new(dest_file_path: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
286                 let f = fs::File::open(dest_file_path.clone())?;
287                 let inner = BufReader::new(f);
288                 Ok(Self { inner, lock_ref })
289         }
290 }
291
292 impl Read for FilesystemReader {
293         fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
294                 let _guard = self.lock_ref.read().unwrap();
295                 self.inner.read(buf)
296         }
297 }
298
299 #[cfg(test)]
300 mod tests {
301         use super::*;
302         use crate::test_utils::do_read_write_remove_list_persist;
303
304         #[test]
305         fn read_write_remove_list_persist() {
306                 let temp_path = std::env::temp_dir();
307                 let fs_store = FilesystemStore::new(temp_path);
308                 do_read_write_remove_list_persist(&fs_store);
309         }
310 }