Remove a handful of redundant imports
[rust-lightning] / lightning-persister / src / fs_store.rs
1 //! Objects related to [`FilesystemStore`] live here.
2 use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
3
4 use lightning::util::persist::KVStore;
5 use lightning::util::string::PrintableString;
6
7 use std::collections::HashMap;
8 use std::fs;
9 use std::io::{Read, Write};
10 use std::path::{Path, PathBuf};
11 use std::sync::atomic::{AtomicUsize, Ordering};
12 use std::sync::{Arc, Mutex, RwLock};
13
14 #[cfg(target_os = "windows")]
15 use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
16
17 #[cfg(target_os = "windows")]
18 macro_rules! call {
19         ($e: expr) => {
20                 if $e != 0 {
21                         Ok(())
22                 } else {
23                         Err(std::io::Error::last_os_error())
24                 }
25         };
26 }
27
28 #[cfg(target_os = "windows")]
29 fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
30         path.as_ref().encode_wide().chain(Some(0)).collect()
31 }
32
33 // The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
34 const GC_LOCK_INTERVAL: usize = 25;
35
36 /// A [`KVStore`] implementation that writes to and reads from the file system.
37 pub struct FilesystemStore {
38         data_dir: PathBuf,
39         tmp_file_counter: AtomicUsize,
40         gc_counter: AtomicUsize,
41         locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
42 }
43
44 impl FilesystemStore {
45         /// Constructs a new [`FilesystemStore`].
46         pub fn new(data_dir: PathBuf) -> Self {
47                 let locks = Mutex::new(HashMap::new());
48                 let tmp_file_counter = AtomicUsize::new(0);
49                 let gc_counter = AtomicUsize::new(1);
50                 Self { data_dir, tmp_file_counter, gc_counter, locks }
51         }
52
53         /// Returns the data directory.
54         pub fn get_data_dir(&self) -> PathBuf {
55                 self.data_dir.clone()
56         }
57
58         fn garbage_collect_locks(&self) {
59                 let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel);
60
61                 if gc_counter % GC_LOCK_INTERVAL == 0 {
62                         // Take outer lock for the cleanup.
63                         let mut outer_lock = self.locks.lock().unwrap();
64
65                         // Garbage collect all lock entries that are not referenced anymore.
66                         outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
67                 }
68         }
69
70         fn get_dest_dir_path(&self, primary_namespace: &str, secondary_namespace: &str) -> std::io::Result<PathBuf> {
71                 let mut dest_dir_path = {
72                         #[cfg(target_os = "windows")]
73                         {
74                                 let data_dir = self.data_dir.clone();
75                                 fs::create_dir_all(data_dir.clone())?;
76                                 fs::canonicalize(data_dir)?
77                         }
78                         #[cfg(not(target_os = "windows"))]
79                         {
80                                 self.data_dir.clone()
81                         }
82                 };
83
84                 dest_dir_path.push(primary_namespace);
85                 if !secondary_namespace.is_empty() {
86                         dest_dir_path.push(secondary_namespace);
87                 }
88
89                 Ok(dest_dir_path)
90         }
91 }
92
93 impl KVStore for FilesystemStore {
94         fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
95                 check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
96
97                 let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
98                 dest_file_path.push(key);
99
100                 let mut buf = Vec::new();
101                 {
102                         let inner_lock_ref = {
103                                 let mut outer_lock = self.locks.lock().unwrap();
104                                 Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
105                         };
106                         let _guard = inner_lock_ref.read().unwrap();
107
108                         let mut f = fs::File::open(dest_file_path)?;
109                         f.read_to_end(&mut buf)?;
110                 }
111
112                 self.garbage_collect_locks();
113
114                 Ok(buf)
115         }
116
117         fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
118                 check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
119
120                 let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
121                 dest_file_path.push(key);
122
123                 let parent_directory = dest_file_path
124                         .parent()
125                         .ok_or_else(|| {
126                                 let msg =
127                                         format!("Could not retrieve parent directory of {}.", dest_file_path.display());
128                                 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
129                         })?;
130                 fs::create_dir_all(&parent_directory)?;
131
132                 // Do a crazy dance with lots of fsync()s to be overly cautious here...
133                 // We never want to end up in a state where we've lost the old data, or end up using the
134                 // old data on power loss after we've returned.
135                 // The way to atomically write a file on Unix platforms is:
136                 // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
137                 let mut tmp_file_path = dest_file_path.clone();
138                 let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
139                 tmp_file_path.set_extension(tmp_file_ext);
140
141                 {
142                         let mut tmp_file = fs::File::create(&tmp_file_path)?;
143                         tmp_file.write_all(&buf)?;
144                         tmp_file.sync_all()?;
145                 }
146
147                 let res = {
148                         let inner_lock_ref = {
149                                 let mut outer_lock = self.locks.lock().unwrap();
150                                 Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
151                         };
152                         let _guard = inner_lock_ref.write().unwrap();
153
154                         #[cfg(not(target_os = "windows"))]
155                         {
156                                 fs::rename(&tmp_file_path, &dest_file_path)?;
157                                 let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
158                                 dir_file.sync_all()?;
159                                 Ok(())
160                         }
161
162                         #[cfg(target_os = "windows")]
163                         {
164                                 let res = if dest_file_path.exists() {
165                                         call!(unsafe {
166                                                 windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
167                                                         path_to_windows_str(&dest_file_path).as_ptr(),
168                                                         path_to_windows_str(&tmp_file_path).as_ptr(),
169                                                         std::ptr::null(),
170                                                         windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
171                                                         std::ptr::null_mut() as *const core::ffi::c_void,
172                                                         std::ptr::null_mut() as *const core::ffi::c_void,
173                                                         )
174                                         })
175                                 } else {
176                                         call!(unsafe {
177                                                 windows_sys::Win32::Storage::FileSystem::MoveFileExW(
178                                                         path_to_windows_str(&tmp_file_path).as_ptr(),
179                                                         path_to_windows_str(&dest_file_path).as_ptr(),
180                                                         windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
181                                                         | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
182                                                         )
183                                         })
184                                 };
185
186                                 match res {
187                                         Ok(()) => {
188                                                 // We fsync the dest file in hopes this will also flush the metadata to disk.
189                                                 let dest_file = fs::OpenOptions::new().read(true).write(true)
190                                                         .open(&dest_file_path)?;
191                                                 dest_file.sync_all()?;
192                                                 Ok(())
193                                         }
194                                         Err(e) => Err(e),
195                                 }
196                         }
197                 };
198
199                 self.garbage_collect_locks();
200
201                 res
202         }
203
204         fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> std::io::Result<()> {
205                 check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
206
207                 let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
208                 dest_file_path.push(key);
209
210                 if !dest_file_path.is_file() {
211                         return Ok(());
212                 }
213
214                 {
215                         let inner_lock_ref = {
216                                 let mut outer_lock = self.locks.lock().unwrap();
217                                 Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
218                         };
219                         let _guard = inner_lock_ref.write().unwrap();
220
221                         if lazy {
222                                 // If we're lazy we just call remove and be done with it.
223                                 fs::remove_file(&dest_file_path)?;
224                         } else {
225                                 // If we're not lazy we try our best to persist the updated metadata to ensure
226                                 // atomicity of this call.
227                                 #[cfg(not(target_os = "windows"))]
228                                 {
229                                         fs::remove_file(&dest_file_path)?;
230
231                                         let parent_directory = dest_file_path.parent().ok_or_else(|| {
232                                                 let msg =
233                                                         format!("Could not retrieve parent directory of {}.", dest_file_path.display());
234                                                 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
235                                         })?;
236                                         let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
237                                         // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
238                                         // to the inode might get cached (and hence possibly lost on crash), depending on
239                                         // the target platform and file system.
240                                         //
241                                         // In order to assert we permanently removed the file in question we therefore
242                                         // call `fsync` on the parent directory on platforms that support it.
243                                         dir_file.sync_all()?;
244                                 }
245
246                                 #[cfg(target_os = "windows")]
247                                 {
248                                         // Since Windows `DeleteFile` API is not persisted until the last open file handle
249                                         // is dropped, and there seemingly is no reliable way to flush the directory
250                                         // metadata, we here fall back to use a 'recycling bin' model, i.e., first move the
251                                         // file to be deleted to a temporary trash file and remove the latter file
252                                         // afterwards.
253                                         //
254                                         // This should be marginally better, as, according to the documentation,
255                                         // `MoveFileExW` APIs should offer stronger persistence guarantees,
256                                         // at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set.
257                                         // However, all this is partially based on assumptions and local experiments, as
258                                         // Windows API is horribly underdocumented.
259                                         let mut trash_file_path = dest_file_path.clone();
260                                         let trash_file_ext = format!("{}.trash",
261                                                 self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
262                                         trash_file_path.set_extension(trash_file_ext);
263
264                                         call!(unsafe {
265                                                 windows_sys::Win32::Storage::FileSystem::MoveFileExW(
266                                                         path_to_windows_str(&dest_file_path).as_ptr(),
267                                                         path_to_windows_str(&trash_file_path).as_ptr(),
268                                                         windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
269                                                         | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
270                                                         )
271                                         })?;
272
273                                         {
274                                                 // We fsync the trash file in hopes this will also flush the original's file
275                                                 // metadata to disk.
276                                                 let trash_file = fs::OpenOptions::new().read(true).write(true)
277                                                         .open(&trash_file_path.clone())?;
278                                                 trash_file.sync_all()?;
279                                         }
280
281                                         // We're fine if this remove would fail as the trash file will be cleaned up in
282                                         // list eventually.
283                                         fs::remove_file(trash_file_path).ok();
284                                 }
285                         }
286                 }
287
288                 self.garbage_collect_locks();
289
290                 Ok(())
291         }
292
293         fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> std::io::Result<Vec<String>> {
294                 check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
295
296                 let prefixed_dest = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
297                 let mut keys = Vec::new();
298
299                 if !Path::new(&prefixed_dest).exists() {
300                         return Ok(Vec::new());
301                 }
302
303                 for entry in fs::read_dir(&prefixed_dest)? {
304                         let entry = entry?;
305                         let p = entry.path();
306
307                         if let Some(ext) = p.extension() {
308                                 #[cfg(target_os = "windows")]
309                                 {
310                                         // Clean up any trash files lying around.
311                                         if ext == "trash" {
312                                                 fs::remove_file(p).ok();
313                                                 continue;
314                                         }
315                                 }
316                                 if ext == "tmp" {
317                                         continue;
318                                 }
319                         }
320
321                         let metadata = p.metadata()?;
322
323                         // We allow the presence of directories in the empty primary namespace and just skip them.
324                         if metadata.is_dir() {
325                                 continue;
326                         }
327
328                         // If we otherwise don't find a file at the given path something went wrong.
329                         if !metadata.is_file() {
330                                 debug_assert!(false, "Failed to list keys of {}/{}: file couldn't be accessed.",
331                                         PrintableString(primary_namespace), PrintableString(secondary_namespace));
332                                 let msg = format!("Failed to list keys of {}/{}: file couldn't be accessed.",
333                                         PrintableString(primary_namespace), PrintableString(secondary_namespace));
334                                 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
335                         }
336
337                         match p.strip_prefix(&prefixed_dest) {
338                                 Ok(stripped_path) => {
339                                         if let Some(relative_path) = stripped_path.to_str() {
340                                                 if is_valid_kvstore_str(relative_path) {
341                                                         keys.push(relative_path.to_string())
342                                                 }
343                                         } else {
344                                                 debug_assert!(false, "Failed to list keys of {}/{}: file path is not valid UTF-8",
345                                                         PrintableString(primary_namespace), PrintableString(secondary_namespace));
346                                                 let msg = format!("Failed to list keys of {}/{}: file path is not valid UTF-8",
347                                                         PrintableString(primary_namespace), PrintableString(secondary_namespace));
348                                                 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
349                                         }
350                                 }
351                                 Err(e) => {
352                                         debug_assert!(false, "Failed to list keys of {}/{}: {}",
353                                                 PrintableString(primary_namespace), PrintableString(secondary_namespace), e);
354                                         let msg = format!("Failed to list keys of {}/{}: {}",
355                                                 PrintableString(primary_namespace), PrintableString(secondary_namespace), e);
356                                         return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
357                                 }
358                         }
359                 }
360
361                 self.garbage_collect_locks();
362
363                 Ok(keys)
364         }
365 }
366
367 #[cfg(test)]
368 mod tests {
369         use super::*;
370         use crate::test_utils::{do_read_write_remove_list_persist, do_test_store};
371
372         use bitcoin::Txid;
373
374         use lightning::chain::ChannelMonitorUpdateStatus;
375         use lightning::chain::chainmonitor::Persist;
376         use lightning::chain::transaction::OutPoint;
377         use lightning::check_closed_event;
378         use lightning::events::{ClosureReason, MessageSendEventsProvider};
379         use lightning::ln::functional_test_utils::*;
380         use lightning::util::test_utils;
381         use lightning::util::persist::read_channel_monitors;
382         use std::str::FromStr;
383
384         impl Drop for FilesystemStore {
385                 fn drop(&mut self) {
386                         // We test for invalid directory names, so it's OK if directory removal
387                         // fails.
388                         match fs::remove_dir_all(&self.data_dir) {
389                                 Err(e) => println!("Failed to remove test persister directory: {}", e),
390                                 _ => {}
391                         }
392                 }
393         }
394
395         #[test]
396         fn read_write_remove_list_persist() {
397                 let mut temp_path = std::env::temp_dir();
398                 temp_path.push("test_read_write_remove_list_persist");
399                 let fs_store = FilesystemStore::new(temp_path);
400                 do_read_write_remove_list_persist(&fs_store);
401         }
402
403         #[test]
404         fn test_if_monitors_is_not_dir() {
405                 let store = FilesystemStore::new("test_monitors_is_not_dir".into());
406
407                 fs::create_dir_all(&store.get_data_dir()).unwrap();
408                 let mut path = std::path::PathBuf::from(&store.get_data_dir());
409                 path.push("monitors");
410                 fs::File::create(path).unwrap();
411
412                 let chanmon_cfgs = create_chanmon_cfgs(1);
413                 let mut node_cfgs = create_node_cfgs(1, &chanmon_cfgs);
414                 let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &store, node_cfgs[0].keys_manager);
415                 node_cfgs[0].chain_monitor = chain_mon_0;
416                 let node_chanmgrs = create_node_chanmgrs(1, &node_cfgs, &[None]);
417                 let nodes = create_network(1, &node_cfgs, &node_chanmgrs);
418
419                 // Check that read_channel_monitors() returns error if monitors/ is not a
420                 // directory.
421                 assert!(read_channel_monitors(&store, nodes[0].keys_manager, nodes[0].keys_manager).is_err());
422         }
423
424         #[test]
425         fn test_filesystem_store() {
426                 // Create the nodes, giving them FilesystemStores for data stores.
427                 let store_0 = FilesystemStore::new("test_filesystem_store_0".into());
428                 let store_1 = FilesystemStore::new("test_filesystem_store_1".into());
429                 do_test_store(&store_0, &store_1)
430         }
431
432         // Test that if the store's path to channel data is read-only, writing a
433         // monitor to it results in the store returning an UnrecoverableError.
434         // Windows ignores the read-only flag for folders, so this test is Unix-only.
435         #[cfg(not(target_os = "windows"))]
436         #[test]
437         fn test_readonly_dir_perm_failure() {
438                 let store = FilesystemStore::new("test_readonly_dir_perm_failure".into());
439                 fs::create_dir_all(&store.get_data_dir()).unwrap();
440
441                 // Set up a dummy channel and force close. This will produce a monitor
442                 // that we can then use to test persistence.
443                 let chanmon_cfgs = create_chanmon_cfgs(2);
444                 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
445                 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
446                 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
447                 let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
448                 nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
449                 check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
450                 let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
451                 let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
452                 let update_id = update_map.get(&added_monitors[0].1.channel_id()).unwrap();
453
454                 // Set the store's directory to read-only, which should result in
455                 // returning an unrecoverable failure when we then attempt to persist a
456                 // channel update.
457                 let path = &store.get_data_dir();
458                 let mut perms = fs::metadata(path).unwrap().permissions();
459                 perms.set_readonly(true);
460                 fs::set_permissions(path, perms).unwrap();
461
462                 let test_txo = OutPoint {
463                         txid: Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
464                         index: 0
465                 };
466                 match store.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
467                         ChannelMonitorUpdateStatus::UnrecoverableError => {},
468                         _ => panic!("unexpected result from persisting new channel")
469                 }
470
471                 nodes[1].node.get_and_clear_pending_msg_events();
472                 added_monitors.clear();
473         }
474
475         // Test that if a store's directory name is invalid, monitor persistence
476         // will fail.
477         #[cfg(target_os = "windows")]
478         #[test]
479         fn test_fail_on_open() {
480                 // Set up a dummy channel and force close. This will produce a monitor
481                 // that we can then use to test persistence.
482                 let chanmon_cfgs = create_chanmon_cfgs(2);
483                 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
484                 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
485                 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
486                 let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
487                 nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
488                 check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
489                 let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
490                 let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
491                 let update_id = update_map.get(&added_monitors[0].1.channel_id()).unwrap();
492
493                 // Create the store with an invalid directory name and test that the
494                 // channel fails to open because the directories fail to be created. There
495                 // don't seem to be invalid filename characters on Unix that Rust doesn't
496                 // handle, hence why the test is Windows-only.
497                 let store = FilesystemStore::new(":<>/".into());
498
499                 let test_txo = OutPoint {
500                         txid: Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
501                         index: 0
502                 };
503                 match store.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
504                         ChannelMonitorUpdateStatus::UnrecoverableError => {},
505                         _ => panic!("unexpected result from persisting new channel")
506                 }
507
508                 nodes[1].node.get_and_clear_pending_msg_events();
509                 added_monitors.clear();
510         }
511 }
512
513 #[cfg(ldk_bench)]
514 /// Benches
515 pub mod bench {
516         use criterion::Criterion;
517
518         /// Bench!
519         pub fn bench_sends(bench: &mut Criterion) {
520                 let store_a = super::FilesystemStore::new("bench_filesystem_store_a".into());
521                 let store_b = super::FilesystemStore::new("bench_filesystem_store_b".into());
522                 lightning::ln::channelmanager::bench::bench_two_sends(
523                         bench, "bench_filesystem_persisted_sends", store_a, store_b);
524         }
525 }