]> git.bitcoin.ninja Git - rust-lightning/blob - lightning-persister/src/fs_store.rs
Add benchmarking for `FilesystemStore`
[rust-lightning] / lightning-persister / src / fs_store.rs
1 //! Objects related to [`FilesystemStore`] live here.
2 use lightning::util::persist::KVStore;
3 use lightning::util::string::PrintableString;
4
5 use std::collections::HashMap;
6 use std::fs;
7 use std::io::{BufReader, Read, Write};
8 use std::path::{Path, PathBuf};
9 use std::sync::atomic::{AtomicUsize, Ordering};
10 use std::sync::{Arc, Mutex, RwLock};
11
12 #[cfg(target_os = "windows")]
13 use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
14
15 #[cfg(target_os = "windows")]
16 macro_rules! call {
17         ($e: expr) => {
18                 if $e != 0 {
19                         return Ok(());
20                 } else {
21                         return Err(std::io::Error::last_os_error());
22                 }
23         };
24 }
25
26 #[cfg(target_os = "windows")]
27 fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<u16> {
28         path.as_ref().encode_wide().chain(Some(0)).collect()
29 }
30
31 /// A [`KVStore`] implementation that writes to and reads from the file system.
32 pub struct FilesystemStore {
33         data_dir: PathBuf,
34         tmp_file_counter: AtomicUsize,
35         locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
36 }
37
38 impl FilesystemStore {
39         /// Constructs a new [`FilesystemStore`].
40         pub fn new(data_dir: PathBuf) -> Self {
41                 let locks = Mutex::new(HashMap::new());
42                 let tmp_file_counter = AtomicUsize::new(0);
43                 Self { data_dir, tmp_file_counter, locks }
44         }
45
46         /// Returns the data directory.
47         pub fn get_data_dir(&self) -> PathBuf {
48                 self.data_dir.clone()
49         }
50 }
51
52 impl KVStore for FilesystemStore {
53         fn read(&self, namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
54                 if key.is_empty() {
55                         let msg = format!("Failed to read {}/{}: key may not be empty.",
56                                 PrintableString(namespace), PrintableString(key));
57                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
58                 }
59
60                 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
61                         key.chars().any(|c| !c.is_ascii() || c.is_control()) {
62                         debug_assert!(false, "Failed to read {}/{}: namespace and key must be valid ASCII
63                                 strings.", PrintableString(namespace), PrintableString(key));
64                         let msg = format!("Failed to read {}/{}: namespace and key must be valid ASCII strings.",
65                                 PrintableString(namespace), PrintableString(key));
66                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
67                 }
68
69                 let mut dest_file_path = self.data_dir.clone();
70                 dest_file_path.push(namespace);
71                 dest_file_path.push(key);
72
73                 let inner_lock_ref = {
74                         let mut outer_lock = self.locks.lock().unwrap();
75                         Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
76                 };
77                 let _guard = inner_lock_ref.read().unwrap();
78
79                 let mut buf = Vec::new();
80                 let f = fs::File::open(dest_file_path.clone())?;
81                 let mut reader = BufReader::new(f);
82                 let nread = reader.read_to_end(&mut buf)?;
83                 debug_assert_ne!(nread, 0);
84
85                 Ok(buf)
86         }
87
88         fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
89                 if key.is_empty() {
90                         let msg = format!("Failed to write {}/{}: key may not be empty.",
91                                 PrintableString(namespace), PrintableString(key));
92                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
93                 }
94
95                 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
96                         key.chars().any(|c| !c.is_ascii() || c.is_control()) {
97                         debug_assert!(false, "Failed to write {}/{}: namespace and key must be valid ASCII
98                                 strings.", PrintableString(namespace), PrintableString(key));
99                         let msg = format!("Failed to write {}/{}: namespace and key must be valid ASCII strings.",
100                                 PrintableString(namespace), PrintableString(key));
101                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
102                 }
103
104                 let mut dest_file_path = self.data_dir.clone();
105                 dest_file_path.push(namespace);
106                 dest_file_path.push(key);
107
108                 let parent_directory = dest_file_path
109                         .parent()
110                         .ok_or_else(|| {
111                                 let msg =
112                                         format!("Could not retrieve parent directory of {}.", dest_file_path.display());
113                                 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
114                         })?
115                         .to_path_buf();
116                 fs::create_dir_all(&parent_directory)?;
117
118                 // Do a crazy dance with lots of fsync()s to be overly cautious here...
119                 // We never want to end up in a state where we've lost the old data, or end up using the
120                 // old data on power loss after we've returned.
121                 // The way to atomically write a file on Unix platforms is:
122                 // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
123                 let mut tmp_file_path = dest_file_path.clone();
124                 let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
125                 tmp_file_path.set_extension(tmp_file_ext);
126
127                 {
128                         let mut tmp_file = fs::File::create(&tmp_file_path)?;
129                         tmp_file.write_all(&buf)?;
130                         tmp_file.sync_all()?;
131                 }
132
133                 let inner_lock_ref = {
134                         let mut outer_lock = self.locks.lock().unwrap();
135                         Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
136                 };
137                 let _guard = inner_lock_ref.write().unwrap();
138
139                 #[cfg(not(target_os = "windows"))]
140                 {
141                         fs::rename(&tmp_file_path, &dest_file_path)?;
142                         let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
143                         dir_file.sync_all()?;
144                         Ok(())
145                 }
146
147                 #[cfg(target_os = "windows")]
148                 {
149                         if dest_file_path.exists() {
150                                 call!(unsafe {
151                                         windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
152                                                 path_to_windows_str(dest_file_path).as_ptr(),
153                                                 path_to_windows_str(tmp_file_path).as_ptr(),
154                                                 std::ptr::null(),
155                                                 windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
156                                                 std::ptr::null_mut() as *const core::ffi::c_void,
157                                                 std::ptr::null_mut() as *const core::ffi::c_void,
158                                         )
159                                 });
160                         } else {
161                                 call!(unsafe {
162                                         windows_sys::Win32::Storage::FileSystem::MoveFileExW(
163                                                 path_to_windows_str(tmp_file_path).as_ptr(),
164                                                 path_to_windows_str(dest_file_path).as_ptr(),
165                                                 windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
166                                                         | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
167                                         )
168                                 });
169                         }
170                 }
171         }
172
173         fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> {
174                 if key.is_empty() {
175                         let msg = format!("Failed to remove {}/{}: key may not be empty.",
176                                 PrintableString(namespace), PrintableString(key));
177                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
178                 }
179
180                 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
181                         key.chars().any(|c| !c.is_ascii() || c.is_control()) {
182                         debug_assert!(false, "Failed to remove {}/{}: namespace and key must be valid ASCII
183                                 strings.", PrintableString(namespace), PrintableString(key));
184                         let msg = format!("Failed to remove {}/{}: namespace and key must be valid ASCII strings.",
185                                 PrintableString(namespace), PrintableString(key));
186                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
187                 }
188
189                 let mut dest_file_path = self.data_dir.clone();
190                 dest_file_path.push(namespace);
191                 dest_file_path.push(key);
192
193                 if !dest_file_path.is_file() {
194                         return Ok(());
195                 }
196
197                 {
198                         let inner_lock_ref = {
199                                 let mut outer_lock = self.locks.lock().unwrap();
200                                 Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
201                         };
202                         let _guard = inner_lock_ref.write().unwrap();
203
204                         fs::remove_file(&dest_file_path)?;
205                         #[cfg(not(target_os = "windows"))]
206                         {
207                                 let parent_directory = dest_file_path.parent().ok_or_else(|| {
208                                         let msg =
209                                                 format!("Could not retrieve parent directory of {}.", dest_file_path.display());
210                                         std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
211                                 })?;
212                                 let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
213                                 // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
214                                 // to the inode might get cached (and hence possibly lost on crash), depending on
215                                 // the target platform and file system.
216                                 //
217                                 // In order to assert we permanently removed the file in question we therefore
218                                 // call `fsync` on the parent directory on platforms that support it,
219                                 dir_file.sync_all()?;
220                         }
221
222                         if dest_file_path.is_file() {
223                                 return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
224                         }
225                 }
226
227                 {
228                         // Retake outer lock for the cleanup.
229                         let mut outer_lock = self.locks.lock().unwrap();
230
231                         // Garbage collect all lock entries that are not referenced anymore.
232                         outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
233                 }
234
235                 Ok(())
236         }
237
238         fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
239                 let mut prefixed_dest = self.data_dir.clone();
240                 prefixed_dest.push(namespace);
241
242                 let mut keys = Vec::new();
243
244                 if !Path::new(&prefixed_dest).exists() {
245                         return Ok(Vec::new());
246                 }
247
248                 for entry in fs::read_dir(&prefixed_dest)? {
249                         let entry = entry?;
250                         let p = entry.path();
251
252                         if !p.is_file() {
253                                 continue;
254                         }
255
256                         if let Some(ext) = p.extension() {
257                                 if ext == "tmp" {
258                                         continue;
259                                 }
260                         }
261
262                         if let Some(relative_path) = p.strip_prefix(&prefixed_dest).ok()
263                                 .and_then(|p| p.to_str()) {
264                                         if relative_path.chars().all(|c| c.is_ascii() && !c.is_control()) {
265                                                 keys.push(relative_path.to_string())
266                                         }
267                         }
268                 }
269
270                 Ok(keys)
271         }
272 }
273
274 #[cfg(test)]
275 mod tests {
276         use super::*;
277         use crate::test_utils::{do_read_write_remove_list_persist, do_test_store};
278
279         use bitcoin::hashes::hex::FromHex;
280         use bitcoin::Txid;
281
282         use lightning::chain::ChannelMonitorUpdateStatus;
283         use lightning::chain::chainmonitor::Persist;
284         use lightning::chain::transaction::OutPoint;
285         use lightning::check_closed_event;
286         use lightning::events::{ClosureReason, MessageSendEventsProvider};
287         use lightning::ln::functional_test_utils::*;
288         use lightning::util::test_utils;
289         use lightning::util::persist::read_channel_monitors;
290         use std::fs;
291         #[cfg(target_os = "windows")]
292         use {
293                 lightning::get_event_msg,
294                 lightning::ln::msgs::ChannelMessageHandler,
295         };
296
297         impl Drop for FilesystemStore{
298                 fn drop(&mut self) {
299                         // We test for invalid directory names, so it's OK if directory removal
300                         // fails.
301                         match fs::remove_dir_all(&self.data_dir) {
302                                 Err(e) => println!("Failed to remove test persister directory: {}", e),
303                                 _ => {}
304                         }
305                 }
306         }
307
308         #[test]
309         fn read_write_remove_list_persist() {
310                 let mut temp_path = std::env::temp_dir();
311                 temp_path.push("test_read_write_remove_list_persist");
312                 let fs_store = FilesystemStore::new(temp_path);
313                 do_read_write_remove_list_persist(&fs_store);
314         }
315
316         #[test]
317         fn test_if_monitors_is_not_dir() {
318                 let store = FilesystemStore::new("test_monitors_is_not_dir".into());
319
320                 fs::create_dir_all(&store.get_data_dir()).unwrap();
321                 let mut path = std::path::PathBuf::from(&store.get_data_dir());
322                 path.push("monitors");
323                 fs::File::create(path).unwrap();
324
325                 let chanmon_cfgs = create_chanmon_cfgs(1);
326                 let mut node_cfgs = create_node_cfgs(1, &chanmon_cfgs);
327                 let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &store, node_cfgs[0].keys_manager);
328                 node_cfgs[0].chain_monitor = chain_mon_0;
329                 let node_chanmgrs = create_node_chanmgrs(1, &node_cfgs, &[None]);
330                 let nodes = create_network(1, &node_cfgs, &node_chanmgrs);
331
332                 // Check that read_channel_monitors() returns error if monitors/ is not a
333                 // directory.
334                 assert!(read_channel_monitors(&store, nodes[0].keys_manager, nodes[0].keys_manager).is_err());
335         }
336
337         #[test]
338         fn test_filesystem_store() {
339                 // Create the nodes, giving them FilesystemStores for data stores.
340                 let store_0 = FilesystemStore::new("test_filesystem_store_0".into());
341                 let store_1 = FilesystemStore::new("test_filesystem_store_1".into());
342                 do_test_store(&store_0, &store_1)
343         }
344
345         // Test that if the store's path to channel data is read-only, writing a
346         // monitor to it results in the store returning a PermanentFailure.
347         // Windows ignores the read-only flag for folders, so this test is Unix-only.
348         #[cfg(not(target_os = "windows"))]
349         #[test]
350         fn test_readonly_dir_perm_failure() {
351                 let store = FilesystemStore::new("test_readonly_dir_perm_failure".into());
352                 fs::create_dir_all(&store.get_data_dir()).unwrap();
353
354                 // Set up a dummy channel and force close. This will produce a monitor
355                 // that we can then use to test persistence.
356                 let chanmon_cfgs = create_chanmon_cfgs(2);
357                 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
358                 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
359                 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
360                 let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
361                 nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
362                 check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
363                 let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
364                 let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
365                 let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
366
367                 // Set the store's directory to read-only, which should result in
368                 // returning a permanent failure when we then attempt to persist a
369                 // channel update.
370                 let path = &store.get_data_dir();
371                 let mut perms = fs::metadata(path).unwrap().permissions();
372                 perms.set_readonly(true);
373                 fs::set_permissions(path, perms).unwrap();
374
375                 let test_txo = OutPoint {
376                         txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
377                         index: 0
378                 };
379                 match store.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
380                         ChannelMonitorUpdateStatus::PermanentFailure => {},
381                         _ => panic!("unexpected result from persisting new channel")
382                 }
383
384                 nodes[1].node.get_and_clear_pending_msg_events();
385                 added_monitors.clear();
386         }
387
388         // Test that if a store's directory name is invalid, monitor persistence
389         // will fail.
390         #[cfg(target_os = "windows")]
391         #[test]
392         fn test_fail_on_open() {
393                 // Set up a dummy channel and force close. This will produce a monitor
394                 // that we can then use to test persistence.
395                 let chanmon_cfgs = create_chanmon_cfgs(2);
396                 let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
397                 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
398                 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
399                 let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
400                 nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
401                 check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
402                 let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
403                 let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
404                 let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
405
406                 // Create the store with an invalid directory name and test that the
407                 // channel fails to open because the directories fail to be created. There
408                 // don't seem to be invalid filename characters on Unix that Rust doesn't
409                 // handle, hence why the test is Windows-only.
410                 let store = FilesystemStore::new(":<>/".into());
411
412                 let test_txo = OutPoint {
413                         txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
414                         index: 0
415                 };
416                 match store.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
417                         ChannelMonitorUpdateStatus::PermanentFailure => {},
418                         _ => panic!("unexpected result from persisting new channel")
419                 }
420
421                 nodes[1].node.get_and_clear_pending_msg_events();
422                 added_monitors.clear();
423         }
424 }
425
426 #[cfg(ldk_bench)]
427 /// Benches
428 pub mod bench {
429         use criterion::Criterion;
430
431         /// Bench!
432         pub fn bench_sends(bench: &mut Criterion) {
433                 let store_a = super::FilesystemStore::new("bench_filesystem_store_a".into());
434                 let store_b = super::FilesystemStore::new("bench_filesystem_store_b".into());
435                 lightning::ln::channelmanager::bench::bench_two_sends(
436                         bench, "bench_filesystem_persisted_sends", store_a, store_b);
437         }
438 }