1 //! Objects related to [`FilesystemStore`] live here.
2 use lightning::util::persist::KVStore;
3 use lightning::util::string::PrintableString;
5 use std::collections::HashMap;
7 use std::io::{BufReader, Read, Write};
8 use std::path::{Path, PathBuf};
9 use std::sync::atomic::{AtomicUsize, Ordering};
10 use std::sync::{Arc, Mutex, RwLock};
12 #[cfg(target_os = "windows")]
13 use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
15 #[cfg(target_os = "windows")]
21 return Err(std::io::Error::last_os_error());
26 #[cfg(target_os = "windows")]
27 fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<u16> {
28 path.as_ref().encode_wide().chain(Some(0)).collect()
31 /// A [`KVStore`] implementation that writes to and reads from the file system.
32 pub struct FilesystemStore {
34 tmp_file_counter: AtomicUsize,
35 locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
38 impl FilesystemStore {
39 /// Constructs a new [`FilesystemStore`].
40 pub fn new(data_dir: PathBuf) -> Self {
41 let locks = Mutex::new(HashMap::new());
42 let tmp_file_counter = AtomicUsize::new(0);
43 Self { data_dir, tmp_file_counter, locks }
46 /// Returns the data directory.
47 pub fn get_data_dir(&self) -> PathBuf {
52 impl KVStore for FilesystemStore {
53 fn read(&self, namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
55 let msg = format!("Failed to read {}/{}: key may not be empty.",
56 PrintableString(namespace), PrintableString(key));
57 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
60 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
61 key.chars().any(|c| !c.is_ascii() || c.is_control()) {
62 debug_assert!(false, "Failed to read {}/{}: namespace and key must be valid ASCII
63 strings.", PrintableString(namespace), PrintableString(key));
64 let msg = format!("Failed to read {}/{}: namespace and key must be valid ASCII strings.",
65 PrintableString(namespace), PrintableString(key));
66 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
69 let mut dest_file_path = self.data_dir.clone();
70 dest_file_path.push(namespace);
71 dest_file_path.push(key);
73 let inner_lock_ref = {
74 let mut outer_lock = self.locks.lock().unwrap();
75 Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
77 let _guard = inner_lock_ref.read().unwrap();
79 let mut buf = Vec::new();
80 let f = fs::File::open(dest_file_path.clone())?;
81 let mut reader = BufReader::new(f);
82 let nread = reader.read_to_end(&mut buf)?;
83 debug_assert_ne!(nread, 0);
88 fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
90 let msg = format!("Failed to write {}/{}: key may not be empty.",
91 PrintableString(namespace), PrintableString(key));
92 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
95 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
96 key.chars().any(|c| !c.is_ascii() || c.is_control()) {
97 debug_assert!(false, "Failed to write {}/{}: namespace and key must be valid ASCII
98 strings.", PrintableString(namespace), PrintableString(key));
99 let msg = format!("Failed to write {}/{}: namespace and key must be valid ASCII strings.",
100 PrintableString(namespace), PrintableString(key));
101 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
104 let mut dest_file_path = self.data_dir.clone();
105 dest_file_path.push(namespace);
106 dest_file_path.push(key);
108 let parent_directory = dest_file_path
112 format!("Could not retrieve parent directory of {}.", dest_file_path.display());
113 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
116 fs::create_dir_all(&parent_directory)?;
118 // Do a crazy dance with lots of fsync()s to be overly cautious here...
119 // We never want to end up in a state where we've lost the old data, or end up using the
120 // old data on power loss after we've returned.
121 // The way to atomically write a file on Unix platforms is:
122 // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
123 let mut tmp_file_path = dest_file_path.clone();
124 let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
125 tmp_file_path.set_extension(tmp_file_ext);
128 let mut tmp_file = fs::File::create(&tmp_file_path)?;
129 tmp_file.write_all(&buf)?;
130 tmp_file.sync_all()?;
133 let inner_lock_ref = {
134 let mut outer_lock = self.locks.lock().unwrap();
135 Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
137 let _guard = inner_lock_ref.write().unwrap();
139 #[cfg(not(target_os = "windows"))]
141 fs::rename(&tmp_file_path, &dest_file_path)?;
142 let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
143 dir_file.sync_all()?;
147 #[cfg(target_os = "windows")]
149 if dest_file_path.exists() {
151 windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
152 path_to_windows_str(dest_file_path).as_ptr(),
153 path_to_windows_str(tmp_file_path).as_ptr(),
155 windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
156 std::ptr::null_mut() as *const core::ffi::c_void,
157 std::ptr::null_mut() as *const core::ffi::c_void,
162 windows_sys::Win32::Storage::FileSystem::MoveFileExW(
163 path_to_windows_str(tmp_file_path).as_ptr(),
164 path_to_windows_str(dest_file_path).as_ptr(),
165 windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
166 | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
173 fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> {
175 let msg = format!("Failed to remove {}/{}: key may not be empty.",
176 PrintableString(namespace), PrintableString(key));
177 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
180 if namespace.chars().any(|c| !c.is_ascii() || c.is_control()) ||
181 key.chars().any(|c| !c.is_ascii() || c.is_control()) {
182 debug_assert!(false, "Failed to remove {}/{}: namespace and key must be valid ASCII
183 strings.", PrintableString(namespace), PrintableString(key));
184 let msg = format!("Failed to remove {}/{}: namespace and key must be valid ASCII strings.",
185 PrintableString(namespace), PrintableString(key));
186 return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
189 let mut dest_file_path = self.data_dir.clone();
190 dest_file_path.push(namespace);
191 dest_file_path.push(key);
193 if !dest_file_path.is_file() {
198 let inner_lock_ref = {
199 let mut outer_lock = self.locks.lock().unwrap();
200 Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
202 let _guard = inner_lock_ref.write().unwrap();
204 fs::remove_file(&dest_file_path)?;
205 #[cfg(not(target_os = "windows"))]
207 let parent_directory = dest_file_path.parent().ok_or_else(|| {
209 format!("Could not retrieve parent directory of {}.", dest_file_path.display());
210 std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
212 let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
213 // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
214 // to the inode might get cached (and hence possibly lost on crash), depending on
215 // the target platform and file system.
217 // In order to assert we permanently removed the file in question we therefore
218 // call `fsync` on the parent directory on platforms that support it,
219 dir_file.sync_all()?;
222 if dest_file_path.is_file() {
223 return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
228 // Retake outer lock for the cleanup.
229 let mut outer_lock = self.locks.lock().unwrap();
231 // Garbage collect all lock entries that are not referenced anymore.
232 outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
238 fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
239 let mut prefixed_dest = self.data_dir.clone();
240 prefixed_dest.push(namespace);
242 let mut keys = Vec::new();
244 if !Path::new(&prefixed_dest).exists() {
245 return Ok(Vec::new());
248 for entry in fs::read_dir(&prefixed_dest)? {
250 let p = entry.path();
256 if let Some(ext) = p.extension() {
262 if let Some(relative_path) = p.strip_prefix(&prefixed_dest).ok()
263 .and_then(|p| p.to_str()) {
264 if relative_path.chars().all(|c| c.is_ascii() && !c.is_control()) {
265 keys.push(relative_path.to_string())
277 use crate::test_utils::{do_read_write_remove_list_persist, do_test_store};
279 use bitcoin::hashes::hex::FromHex;
282 use lightning::chain::ChannelMonitorUpdateStatus;
283 use lightning::chain::chainmonitor::Persist;
284 use lightning::chain::transaction::OutPoint;
285 use lightning::check_closed_event;
286 use lightning::events::{ClosureReason, MessageSendEventsProvider};
287 use lightning::ln::functional_test_utils::*;
288 use lightning::util::test_utils;
289 use lightning::util::persist::read_channel_monitors;
291 #[cfg(target_os = "windows")]
293 lightning::get_event_msg,
294 lightning::ln::msgs::ChannelMessageHandler,
297 impl Drop for FilesystemStore{
299 // We test for invalid directory names, so it's OK if directory removal
301 match fs::remove_dir_all(&self.data_dir) {
302 Err(e) => println!("Failed to remove test persister directory: {}", e),
309 fn read_write_remove_list_persist() {
310 let mut temp_path = std::env::temp_dir();
311 temp_path.push("test_read_write_remove_list_persist");
312 let fs_store = FilesystemStore::new(temp_path);
313 do_read_write_remove_list_persist(&fs_store);
317 fn test_if_monitors_is_not_dir() {
318 let store = FilesystemStore::new("test_monitors_is_not_dir".into());
320 fs::create_dir_all(&store.get_data_dir()).unwrap();
321 let mut path = std::path::PathBuf::from(&store.get_data_dir());
322 path.push("monitors");
323 fs::File::create(path).unwrap();
325 let chanmon_cfgs = create_chanmon_cfgs(1);
326 let mut node_cfgs = create_node_cfgs(1, &chanmon_cfgs);
327 let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &store, node_cfgs[0].keys_manager);
328 node_cfgs[0].chain_monitor = chain_mon_0;
329 let node_chanmgrs = create_node_chanmgrs(1, &node_cfgs, &[None]);
330 let nodes = create_network(1, &node_cfgs, &node_chanmgrs);
332 // Check that read_channel_monitors() returns error if monitors/ is not a
334 assert!(read_channel_monitors(&store, nodes[0].keys_manager, nodes[0].keys_manager).is_err());
338 fn test_filesystem_store() {
339 // Create the nodes, giving them FilesystemStores for data stores.
340 let store_0 = FilesystemStore::new("test_filesystem_store_0".into());
341 let store_1 = FilesystemStore::new("test_filesystem_store_1".into());
342 do_test_store(&store_0, &store_1)
345 // Test that if the store's path to channel data is read-only, writing a
346 // monitor to it results in the store returning a PermanentFailure.
347 // Windows ignores the read-only flag for folders, so this test is Unix-only.
348 #[cfg(not(target_os = "windows"))]
350 fn test_readonly_dir_perm_failure() {
351 let store = FilesystemStore::new("test_readonly_dir_perm_failure".into());
352 fs::create_dir_all(&store.get_data_dir()).unwrap();
354 // Set up a dummy channel and force close. This will produce a monitor
355 // that we can then use to test persistence.
356 let chanmon_cfgs = create_chanmon_cfgs(2);
357 let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
358 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
359 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
360 let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
361 nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
362 check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
363 let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
364 let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
365 let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
367 // Set the store's directory to read-only, which should result in
368 // returning a permanent failure when we then attempt to persist a
370 let path = &store.get_data_dir();
371 let mut perms = fs::metadata(path).unwrap().permissions();
372 perms.set_readonly(true);
373 fs::set_permissions(path, perms).unwrap();
375 let test_txo = OutPoint {
376 txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
379 match store.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
380 ChannelMonitorUpdateStatus::PermanentFailure => {},
381 _ => panic!("unexpected result from persisting new channel")
384 nodes[1].node.get_and_clear_pending_msg_events();
385 added_monitors.clear();
388 // Test that if a store's directory name is invalid, monitor persistence
390 #[cfg(target_os = "windows")]
392 fn test_fail_on_open() {
393 // Set up a dummy channel and force close. This will produce a monitor
394 // that we can then use to test persistence.
395 let chanmon_cfgs = create_chanmon_cfgs(2);
396 let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
397 let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
398 let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
399 let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
400 nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap();
401 check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000);
402 let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
403 let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
404 let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
406 // Create the store with an invalid directory name and test that the
407 // channel fails to open because the directories fail to be created. There
408 // don't seem to be invalid filename characters on Unix that Rust doesn't
409 // handle, hence why the test is Windows-only.
410 let store = FilesystemStore::new(":<>/".into());
412 let test_txo = OutPoint {
413 txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
416 match store.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
417 ChannelMonitorUpdateStatus::PermanentFailure => {},
418 _ => panic!("unexpected result from persisting new channel")
421 nodes[1].node.get_and_clear_pending_msg_events();
422 added_monitors.clear();
429 use criterion::Criterion;
432 pub fn bench_sends(bench: &mut Criterion) {
433 let store_a = super::FilesystemStore::new("bench_filesystem_store_a".into());
434 let store_b = super::FilesystemStore::new("bench_filesystem_store_b".into());
435 lightning::ln::channelmanager::bench::bench_two_sends(
436 bench, "bench_filesystem_persisted_sends", store_a, store_b);