From: Matt Corallo <649246+TheBlueMatt@users.noreply.github.com> Date: Thu, 21 Apr 2022 18:09:20 +0000 (+0000) Subject: Merge pull request #1378 from ViktorTigerstrom/2022-03-include-htlc-min-max X-Git-Tag: v0.0.107~57 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=637fb88037fa329363e4b6462e9ad01f25cb4e41;hp=3c6768706662f954b4d6140362743fdecc5bcd01;p=rust-lightning Merge pull request #1378 from ViktorTigerstrom/2022-03-include-htlc-min-max Include htlc min/max for ChannelDetails and ChannelCounterparty --- diff --git a/lightning-background-processor/Cargo.toml b/lightning-background-processor/Cargo.toml index bd6d54d87..16ec763fb 100644 --- a/lightning-background-processor/Cargo.toml +++ b/lightning-background-processor/Cargo.toml @@ -16,8 +16,8 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] bitcoin = "0.27" lightning = { version = "0.0.106", path = "../lightning", features = ["std"] } -lightning-persister = { version = "0.0.106", path = "../lightning-persister" } [dev-dependencies] lightning = { version = "0.0.106", path = "../lightning", features = ["_test_utils"] } lightning-invoice = { version = "0.14.0", path = "../lightning-invoice" } +lightning-persister = { version = "0.0.106", path = "../lightning-persister" } diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 73f420c98..6beee915b 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -20,6 +20,7 @@ use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescr use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; use lightning::util::events::{Event, EventHandler, EventsProvider}; use lightning::util::logger::Logger; +use lightning::util::persist::Persister; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; @@ -80,22 +81,6 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60; #[cfg(test)] const FIRST_NETWORK_PRUNE_TIMER: u64 = 1; -/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk. -pub trait Persister -where - M::Target: 'static + chain::Watch, - T::Target: 'static + BroadcasterInterface, - K::Target: 'static + KeysInterface, - F::Target: 'static + FeeEstimator, - L::Target: 'static + Logger, -{ - /// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed - /// (which will cause the [`BackgroundProcessor`] which called this method to exit). - fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), std::io::Error>; - - /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed. - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>; -} /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s. struct DecoratingEventHandler< @@ -138,12 +123,12 @@ impl BackgroundProcessor { /// /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a - /// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's + /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's /// provided implementation. /// /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See - /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_network_graph`] - /// for Rust-Lightning's provided implementation. + /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See the `lightning-persister` crate + /// for LDK's provided implementation. /// /// Typically, users should either implement [`Persister::persist_manager`] to never return an /// error or call [`join`] and handle any error that may arise. For the latter case, @@ -161,8 +146,8 @@ impl BackgroundProcessor { /// [`stop`]: Self::stop /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable - /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager - /// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph + /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager + /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph /// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph /// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable pub fn start< @@ -180,7 +165,7 @@ impl BackgroundProcessor { CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, - PS: 'static + Send + Persister, + PS: 'static + Deref + Send, M: 'static + Deref> + Send + Sync, CM: 'static + Deref> + Send + Sync, NG: 'static + Deref> + Send + Sync, @@ -202,6 +187,7 @@ impl BackgroundProcessor { CMH::Target: 'static + ChannelMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, + PS::Target: 'static + Persister { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -365,10 +351,11 @@ mod tests { use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use lightning::util::test_utils; + use lightning::util::persist::KVStorePersister; use lightning_invoice::payment::{InvoicePayer, RetryAttempts}; use lightning_invoice::utils::DefaultRouter; use lightning_persister::FilesystemPersister; - use std::fs; + use std::fs::{self, File}; use std::ops::Deref; use std::path::PathBuf; use std::sync::{Arc, Mutex}; @@ -414,12 +401,14 @@ mod tests { struct Persister { data_dir: String, graph_error: Option<(std::io::ErrorKind, &'static str)>, - manager_error: Option<(std::io::ErrorKind, &'static str)> + manager_error: Option<(std::io::ErrorKind, &'static str)>, + filesystem_persister: FilesystemPersister, } impl Persister { fn new(data_dir: String) -> Self { - Self { data_dir, graph_error: None, manager_error: None } + let filesystem_persister = FilesystemPersister::new(data_dir.clone()); + Self { data_dir, graph_error: None, manager_error: None, filesystem_persister } } fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { @@ -431,25 +420,21 @@ mod tests { } } - impl super::Persister for Persister where - M::Target: 'static + chain::Watch, - T::Target: 'static + BroadcasterInterface, - K::Target: 'static + KeysInterface, - F::Target: 'static + FeeEstimator, - L::Target: 'static + Logger, - { - fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), std::io::Error> { - match self.manager_error { - None => FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager), - Some((error, message)) => Err(std::io::Error::new(error, message)), + impl KVStorePersister for Persister { + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + if key == "manager" { + if let Some((error, message)) = self.manager_error { + return Err(std::io::Error::new(error, message)) + } } - } - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> { - match self.graph_error { - None => FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph), - Some((error, message)) => Err(std::io::Error::new(error, message)), + if key == "network_graph" { + if let Some((error, message)) = self.graph_error { + return Err(std::io::Error::new(error, message)) + } } + + self.filesystem_persister.persist(key, object) } } @@ -576,7 +561,7 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); - let persister = Persister::new(data_dir); + let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: &_| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); @@ -637,7 +622,7 @@ mod tests { // `FRESHNESS_TIMER`. let nodes = create_nodes(1, "test_timer_tick_called".to_string()); let data_dir = nodes[0].persister.get_data_dir(); - let persister = Persister::new(data_dir); + let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: &_| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); loop { @@ -660,7 +645,7 @@ mod tests { open_channel!(nodes[0], nodes[1], 100000); let data_dir = nodes[0].persister.get_data_dir(); - let persister = Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"); + let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: &_| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); match bg_processor.join() { @@ -677,7 +662,7 @@ mod tests { // Test that if we encounter an error during network graph persistence, an error gets returned. let nodes = create_nodes(2, "test_persist_network_graph_error".to_string()); let data_dir = nodes[0].persister.get_data_dir(); - let persister = Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"); + let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: &_| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); @@ -695,7 +680,7 @@ mod tests { let mut nodes = create_nodes(2, "test_background_event_handling".to_string()); let channel_value = 100000; let data_dir = nodes[0].persister.get_data_dir(); - let persister = Persister::new(data_dir.clone()); + let persister = Arc::new(Persister::new(data_dir.clone())); // Set up a background event handler for FundingGenerationReady events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); @@ -726,7 +711,8 @@ mod tests { // Set up a background event handler for SpendableOutputs events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); let event_handler = move |event: &Event| sender.send(event.clone()).unwrap(); - let bg_processor = BackgroundProcessor::start(Persister::new(data_dir), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let persister = Arc::new(Persister::new(data_dir)); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); // Force close the channel and check that the SpendableOutputs event was handled. nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap(); @@ -752,7 +738,7 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); - let persister = Persister::new(data_dir); + let persister = Arc::new(Persister::new(data_dir)); let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes); let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2))); diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index 450062127..c23baf8ad 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -15,20 +15,13 @@ extern crate bitcoin; extern crate libc; use bitcoin::hash_types::{BlockHash, Txid}; -use bitcoin::hashes::hex::{FromHex, ToHex}; -use lightning::routing::network_graph::NetworkGraph; -use crate::util::DiskWriteable; -use lightning::chain; -use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; -use lightning::chain::chainmonitor; +use bitcoin::hashes::hex::FromHex; +use lightning::chain::channelmonitor::ChannelMonitor; use lightning::chain::keysinterface::{Sign, KeysInterface}; -use lightning::chain::transaction::OutPoint; -use lightning::ln::channelmanager::ChannelManager; -use lightning::util::logger::Logger; use lightning::util::ser::{ReadableArgs, Writeable}; +use lightning::util::persist::KVStorePersister; use std::fs; -use std::io::{Cursor, Error, Write}; +use std::io::Cursor; use std::ops::Deref; use std::path::{Path, PathBuf}; @@ -48,31 +41,6 @@ pub struct FilesystemPersister { path_to_channel_data: String, } -impl DiskWriteable for ChannelMonitor { - fn write_to_file(&self, writer: &mut W) -> Result<(), Error> { - self.write(writer) - } -} - -impl DiskWriteable for ChannelManager -where - M::Target: chain::Watch, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, -{ - fn write_to_file(&self, writer: &mut W) -> Result<(), std::io::Error> { - self.write(writer) - } -} - -impl DiskWriteable for NetworkGraph { - fn write_to_file(&self, writer: &mut W) -> Result<(), std::io::Error> { - self.write(writer) - } -} - impl FilesystemPersister { /// Initialize a new FilesystemPersister and set the path to the individual channels' /// files. @@ -87,43 +55,14 @@ impl FilesystemPersister { self.path_to_channel_data.clone() } - pub(crate) fn path_to_monitor_data(&self) -> PathBuf { - let mut path = PathBuf::from(self.path_to_channel_data.clone()); - path.push("monitors"); - path - } - - /// Writes the provided `ChannelManager` to the path provided at `FilesystemPersister` - /// initialization, within a file called "manager". - pub fn persist_manager( - data_dir: String, - manager: &ChannelManager - ) -> Result<(), std::io::Error> - where - M::Target: chain::Watch, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, - { - let path = PathBuf::from(data_dir); - util::write_to_file(path, "manager".to_string(), manager) - } - - /// Write the provided `NetworkGraph` to the path provided at `FilesystemPersister` - /// initialization, within a file called "network_graph" - pub fn persist_network_graph(data_dir: String, network_graph: &NetworkGraph) -> Result<(), std::io::Error> { - let path = PathBuf::from(data_dir); - util::write_to_file(path, "network_graph".to_string(), network_graph) - } - /// Read `ChannelMonitor`s from disk. pub fn read_channelmonitors ( &self, keys_manager: K ) -> Result)>, std::io::Error> where K::Target: KeysInterface + Sized, { - let path = self.path_to_monitor_data(); + let mut path = PathBuf::from(&self.path_to_channel_data); + path.push("monitors"); if !Path::new(&path).exists() { return Ok(Vec::new()); } @@ -180,22 +119,11 @@ impl FilesystemPersister { } } -impl chainmonitor::Persist for FilesystemPersister { - // TODO: We really need a way for the persister to inform the user that its time to crash/shut - // down once these start returning failure. - // A PermanentFailure implies we need to shut down since we're force-closing channels without - // even broadcasting! - - fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { - let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); - util::write_to_file(self.path_to_monitor_data(), filename, monitor) - .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure) - } - - fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option, monitor: &ChannelMonitor, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { - let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index); - util::write_to_file(self.path_to_monitor_data(), filename, monitor) - .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure) +impl KVStorePersister for FilesystemPersister { + fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { + let mut dest_file = PathBuf::from(self.path_to_channel_data.clone()); + dest_file.push(key); + util::write_to_file(dest_file, object) } } diff --git a/lightning-persister/src/util.rs b/lightning-persister/src/util.rs index f26296794..25bd00f5e 100644 --- a/lightning-persister/src/util.rs +++ b/lightning-persister/src/util.rs @@ -2,27 +2,20 @@ extern crate winapi; use std::fs; -use std::path::{Path, PathBuf}; -use std::io::{BufWriter, Write}; +use std::path::PathBuf; +use std::io::BufWriter; #[cfg(not(target_os = "windows"))] use std::os::unix::io::AsRawFd; +use lightning::util::ser::Writeable; + #[cfg(target_os = "windows")] use { std::ffi::OsStr, std::os::windows::ffi::OsStrExt }; -pub(crate) trait DiskWriteable { - fn write_to_file(&self, writer: &mut W) -> Result<(), std::io::Error>; -} - -pub(crate) fn get_full_filepath(mut filepath: PathBuf, filename: String) -> String { - filepath.push(filename); - filepath.to_str().unwrap().to_string() -} - #[cfg(target_os = "windows")] macro_rules! call { ($e: expr) => ( @@ -40,45 +33,43 @@ fn path_to_windows_str>(path: T) -> Vec(path: PathBuf, filename: String, data: &D) -> std::io::Result<()> { - fs::create_dir_all(path.clone())?; +pub(crate) fn write_to_file(dest_file: PathBuf, data: &W) -> std::io::Result<()> { + let mut tmp_file = dest_file.clone(); + tmp_file.set_extension("tmp"); + + let parent_directory = dest_file.parent().unwrap(); + fs::create_dir_all(parent_directory)?; // Do a crazy dance with lots of fsync()s to be overly cautious here... // We never want to end up in a state where we've lost the old data, or end up using the // old data on power loss after we've returned. // The way to atomically write a file on Unix platforms is: // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir) - let filename_with_path = get_full_filepath(path, filename); - let tmp_filename = format!("{}.tmp", filename_with_path.clone()); - { // Note that going by rust-lang/rust@d602a6b, on MacOS it is only safe to use // rust stdlib 1.36 or higher. - let mut buf = BufWriter::new(fs::File::create(&tmp_filename)?); - data.write_to_file(&mut buf)?; + let mut buf = BufWriter::new(fs::File::create(&tmp_file)?); + data.write(&mut buf)?; buf.into_inner()?.sync_all()?; } // Fsync the parent directory on Unix. #[cfg(not(target_os = "windows"))] { - fs::rename(&tmp_filename, &filename_with_path)?; - let path = Path::new(&filename_with_path).parent().unwrap(); - let dir_file = fs::OpenOptions::new().read(true).open(path)?; + fs::rename(&tmp_file, &dest_file)?; + let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; unsafe { libc::fsync(dir_file.as_raw_fd()); } } #[cfg(target_os = "windows")] { - let src = PathBuf::from(tmp_filename.clone()); - let dst = PathBuf::from(filename_with_path.clone()); - if Path::new(&filename_with_path.clone()).exists() { + if dest_file.exists() { unsafe {winapi::um::winbase::ReplaceFileW( - path_to_windows_str(dst).as_ptr(), path_to_windows_str(src).as_ptr(), std::ptr::null(), + path_to_windows_str(dest_file).as_ptr(), path_to_windows_str(tmp_file).as_ptr(), std::ptr::null(), winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS, std::ptr::null_mut() as *mut winapi::ctypes::c_void, std::ptr::null_mut() as *mut winapi::ctypes::c_void )}; } else { call!(unsafe {winapi::um::winbase::MoveFileExW( - path_to_windows_str(src).as_ptr(), path_to_windows_str(dst).as_ptr(), + path_to_windows_str(tmp_file).as_ptr(), path_to_windows_str(dest_file).as_ptr(), winapi::um::winbase::MOVEFILE_WRITE_THROUGH | winapi::um::winbase::MOVEFILE_REPLACE_EXISTING )}); } @@ -88,15 +79,17 @@ pub(crate) fn write_to_file(path: PathBuf, filename: String, d #[cfg(test)] mod tests { - use super::{DiskWriteable, get_full_filepath, write_to_file}; + use lightning::util::ser::{Writer, Writeable}; + + use super::{write_to_file}; use std::fs; use std::io; use std::io::Write; use std::path::PathBuf; struct TestWriteable{} - impl DiskWriteable for TestWriteable { - fn write_to_file(&self, writer: &mut W) -> Result<(), io::Error> { + impl Writeable for TestWriteable { + fn write(&self, writer: &mut W) -> Result<(), std::io::Error> { writer.write_all(&[42; 1]) } } @@ -114,7 +107,9 @@ mod tests { let mut perms = fs::metadata(path.to_string()).unwrap().permissions(); perms.set_readonly(true); fs::set_permissions(path.to_string(), perms).unwrap(); - match write_to_file(PathBuf::from(path.to_string()), filename, &test_writeable) { + let mut dest_file = PathBuf::from(path); + dest_file.push(filename); + match write_to_file(dest_file, &test_writeable) { Err(e) => assert_eq!(e.kind(), io::ErrorKind::PermissionDenied), _ => panic!("Unexpected error message") } @@ -132,10 +127,12 @@ mod tests { fn test_rename_failure() { let test_writeable = TestWriteable{}; let filename = "test_rename_failure_filename"; - let path = PathBuf::from("test_rename_failure_dir"); + let path = "test_rename_failure_dir"; + let mut dest_file = PathBuf::from(path); + dest_file.push(filename); // Create the channel data file and make it a directory. - fs::create_dir_all(get_full_filepath(path.clone(), filename.to_string())).unwrap(); - match write_to_file(path.clone(), filename.to_string(), &test_writeable) { + fs::create_dir_all(dest_file.clone()).unwrap(); + match write_to_file(dest_file, &test_writeable) { Err(e) => assert_eq!(e.raw_os_error(), Some(libc::EISDIR)), _ => panic!("Unexpected Ok(())") } @@ -145,16 +142,18 @@ mod tests { #[test] fn test_diskwriteable_failure() { struct FailingWriteable {} - impl DiskWriteable for FailingWriteable { - fn write_to_file(&self, _writer: &mut W) -> Result<(), std::io::Error> { + impl Writeable for FailingWriteable { + fn write(&self, _writer: &mut W) -> Result<(), std::io::Error> { Err(std::io::Error::new(std::io::ErrorKind::Other, "expected failure")) } } let filename = "test_diskwriteable_failure"; - let path = PathBuf::from("test_diskwriteable_failure_dir"); + let path = "test_diskwriteable_failure_dir"; let test_writeable = FailingWriteable{}; - match write_to_file(path.clone(), filename.to_string(), &test_writeable) { + let mut dest_file = PathBuf::from(path); + dest_file.push(filename); + match write_to_file(dest_file, &test_writeable) { Err(e) => { assert_eq!(e.kind(), std::io::ErrorKind::Other); assert_eq!(e.get_ref().unwrap().to_string(), "expected failure"); @@ -171,12 +170,13 @@ mod tests { fn test_tmp_file_creation_failure() { let test_writeable = TestWriteable{}; let filename = "test_tmp_file_creation_failure_filename".to_string(); - let path = PathBuf::from("test_tmp_file_creation_failure_dir"); - - // Create the tmp file and make it a directory. - let tmp_path = get_full_filepath(path.clone(), format!("{}.tmp", filename.clone())); - fs::create_dir_all(tmp_path).unwrap(); - match write_to_file(path, filename, &test_writeable) { + let path = "test_tmp_file_creation_failure_dir"; + let mut dest_file = PathBuf::from(path); + dest_file.push(filename); + let mut tmp_file = dest_file.clone(); + tmp_file.set_extension("tmp"); + fs::create_dir_all(tmp_file).unwrap(); + match write_to_file(dest_file, &test_writeable) { Err(e) => { #[cfg(not(target_os = "windows"))] assert_eq!(e.raw_os_error(), Some(libc::EISDIR)); diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 94af00c7c..58fe30ba2 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -1102,7 +1102,7 @@ fn test_monitor_update_fail_reestablish() { assert!(updates.update_fee.is_none()); assert_eq!(updates.update_fulfill_htlcs.len(), 1); nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); - expect_payment_forwarded!(nodes[1], Some(1000), false); + expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false); check_added_monitors!(nodes[1], 1); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false); @@ -2087,7 +2087,7 @@ fn test_fail_htlc_on_broadcast_after_claim() { nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]); let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); check_added_monitors!(nodes[1], 1); - expect_payment_forwarded!(nodes[1], Some(1000), false); + expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false); mine_transaction(&nodes[1], &bs_txn[0]); check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed); @@ -2423,7 +2423,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); - let chan_2 = create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known()).2; + let chan_id_2 = create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known()).2; let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 100_000); @@ -2450,7 +2450,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f } let fulfill_msg = msgs::UpdateFulfillHTLC { - channel_id: chan_2, + channel_id: chan_id_2, htlc_id: 0, payment_preimage, }; @@ -2468,7 +2468,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f assert_eq!(fulfill_msg, cs_updates.update_fulfill_htlcs[0]); } nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &fulfill_msg); - expect_payment_forwarded!(nodes[1], Some(1000), false); + expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false); check_added_monitors!(nodes[1], 1); let mut bs_updates = None; diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 020fc22cd..93746f0fd 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -4043,7 +4043,10 @@ impl ChannelMana } else { None }; let mut pending_events = self.pending_events.lock().unwrap(); + + let source_channel_id = Some(prev_outpoint.to_channel_id()); pending_events.push(events::Event::PaymentForwarded { + source_channel_id, fee_earned_msat, claim_from_onchain_tx: from_onchain, }); diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 3977952e9..9bbd6c0e9 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -1327,12 +1327,16 @@ macro_rules! expect_payment_path_successful { } macro_rules! expect_payment_forwarded { - ($node: expr, $expected_fee: expr, $upstream_force_closed: expr) => { + ($node: expr, $source_node: expr, $expected_fee: expr, $upstream_force_closed: expr) => { let events = $node.node.get_and_clear_pending_events(); assert_eq!(events.len(), 1); match events[0] { - Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => { + Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => { assert_eq!(fee_earned_msat, $expected_fee); + if fee_earned_msat.is_some() { + // Is the event channel_id in one of the channels between the two nodes? + assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $source_node.node.get_our_node_id() && x.channel_id == source_channel_id.unwrap())); + } assert_eq!(claim_from_onchain_tx, $upstream_force_closed); }, _ => panic!("Unexpected event"), @@ -1571,11 +1575,11 @@ pub fn do_claim_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, } } macro_rules! mid_update_fulfill_dance { - ($node: expr, $prev_node: expr, $new_msgs: expr) => { + ($node: expr, $prev_node: expr, $next_node: expr, $new_msgs: expr) => { { $node.node.handle_update_fulfill_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0); let fee = $node.node.channel_state.lock().unwrap().by_id.get(&next_msgs.as_ref().unwrap().0.channel_id).unwrap().config.forwarding_fee_base_msat; - expect_payment_forwarded!($node, Some(fee as u64), false); + expect_payment_forwarded!($node, $next_node, Some(fee as u64), false); expected_total_fee_msat += fee as u64; check_added_monitors!($node, 1); let new_next_msgs = if $new_msgs { @@ -1599,7 +1603,14 @@ pub fn do_claim_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, assert_eq!(expected_next_node, node.node.get_our_node_id()); let update_next_msgs = !skip_last || idx != expected_route.len() - 1; if next_msgs.is_some() { - mid_update_fulfill_dance!(node, prev_node, update_next_msgs); + // Since we are traversing in reverse, next_node is actually the previous node + let next_node: &Node; + if idx == expected_route.len() - 1 { + next_node = origin_node; + } else { + next_node = expected_route[expected_route.len() - 1 - idx - 1]; + } + mid_update_fulfill_dance!(node, prev_node, next_node, update_next_msgs); } else { assert!(!update_next_msgs); assert!(node.node.get_and_clear_pending_msg_events().is_empty()); diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 68079aaa7..d7bebca0e 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -2684,10 +2684,23 @@ fn test_htlc_on_chain_success() { Event::ChannelClosed { reason: ClosureReason::CommitmentTxConfirmed, .. } => {} _ => panic!("Unexpected event"), } - if let Event::PaymentForwarded { fee_earned_msat: Some(1000), claim_from_onchain_tx: true } = forwarded_events[1] { - } else { panic!(); } - if let Event::PaymentForwarded { fee_earned_msat: Some(1000), claim_from_onchain_tx: true } = forwarded_events[2] { - } else { panic!(); } + let chan_id = Some(chan_1.2); + match forwarded_events[1] { + Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => { + assert_eq!(fee_earned_msat, Some(1000)); + assert_eq!(source_channel_id, chan_id); + assert_eq!(claim_from_onchain_tx, true); + }, + _ => panic!() + } + match forwarded_events[2] { + Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => { + assert_eq!(fee_earned_msat, Some(1000)); + assert_eq!(source_channel_id, chan_id); + assert_eq!(claim_from_onchain_tx, true); + }, + _ => panic!() + } let events = nodes[1].node.get_and_clear_pending_msg_events(); { let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); @@ -5104,8 +5117,9 @@ fn test_onchain_to_onchain_claim() { _ => panic!("Unexpected event"), } match events[1] { - Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => { + Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => { assert_eq!(fee_earned_msat, Some(1000)); + assert_eq!(source_channel_id, Some(chan_1.2)); assert_eq!(claim_from_onchain_tx, true); }, _ => panic!("Unexpected event"), @@ -5273,7 +5287,7 @@ fn test_duplicate_payment_hash_one_failure_one_success() { // Note that the fee paid is effectively double as the HTLC value (including the nodes[1] fee // and nodes[2] fee) is rounded down and then claimed in full. mine_transaction(&nodes[1], &htlc_success_txn[0]); - expect_payment_forwarded!(nodes[1], Some(196*2), true); + expect_payment_forwarded!(nodes[1], nodes[0], Some(196*2), true); let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); assert!(updates.update_add_htlcs.is_empty()); assert!(updates.update_fail_htlcs.is_empty()); @@ -8855,7 +8869,7 @@ fn do_test_onchain_htlc_settlement_after_close(broadcast_alice: bool, go_onchain assert_eq!(carol_updates.update_fulfill_htlcs.len(), 1); nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &carol_updates.update_fulfill_htlcs[0]); - expect_payment_forwarded!(nodes[1], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false); + expect_payment_forwarded!(nodes[1], nodes[0], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false); // If Alice broadcasted but Bob doesn't know yet, here he prepares to tell her about the preimage. if !go_onchain_before_fulfill && broadcast_alice { let events = nodes[1].node.get_and_clear_pending_msg_events(); diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index 346fb98b4..46d5d22b4 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -495,7 +495,7 @@ fn do_retry_with_no_persist(confirm_before_reload: bool) { let bs_htlc_claim_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0); assert_eq!(bs_htlc_claim_txn.len(), 1); check_spends!(bs_htlc_claim_txn[0], as_commitment_tx); - expect_payment_forwarded!(nodes[1], None, false); + expect_payment_forwarded!(nodes[1], nodes[0], None, false); if !confirm_before_reload { mine_transaction(&nodes[0], &as_commitment_tx); diff --git a/lightning/src/ln/reorg_tests.rs b/lightning/src/ln/reorg_tests.rs index 8eb39cfe9..7b36ae0fc 100644 --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@ -138,7 +138,7 @@ fn do_test_onchain_htlc_reorg(local_commitment: bool, claim: bool) { // ChannelManager only polls chain::Watch::release_pending_monitor_events when we // probe it for events, so we probe non-message events here (which should just be the // PaymentForwarded event). - expect_payment_forwarded!(nodes[1], Some(1000), true); + expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), true); } else { // Confirm the timeout tx and check that we fail the HTLC backwards let block = Block { diff --git a/lightning/src/ln/shutdown_tests.rs b/lightning/src/ln/shutdown_tests.rs index 557aff84c..42c0e3b2f 100644 --- a/lightning/src/ln/shutdown_tests.rs +++ b/lightning/src/ln/shutdown_tests.rs @@ -110,7 +110,7 @@ fn updates_shutdown_wait() { assert!(updates.update_fee.is_none()); assert_eq!(updates.update_fulfill_htlcs.len(), 1); nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); - expect_payment_forwarded!(nodes[1], Some(1000), false); + expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false); check_added_monitors!(nodes[1], 1); let updates_2 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false); @@ -279,7 +279,7 @@ fn do_test_shutdown_rebroadcast(recv_count: u8) { assert!(updates.update_fee.is_none()); assert_eq!(updates.update_fulfill_htlcs.len(), 1); nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]); - expect_payment_forwarded!(nodes[1], Some(1000), false); + expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false); check_added_monitors!(nodes[1], 1); let updates_2 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()); commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false); diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index ea50398b5..d7eb16eee 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -343,6 +343,9 @@ pub enum Event { /// This event is generated when a payment has been successfully forwarded through us and a /// forwarding fee earned. PaymentForwarded { + /// The channel between the source node and us. Optional because versions prior to 0.0.107 + /// do not serialize this field. + source_channel_id: Option<[u8; 32]>, /// The fee, in milli-satoshis, which was earned as a result of the payment. /// /// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC @@ -520,10 +523,11 @@ impl Writeable for Event { (0, VecWriteWrapper(outputs), required), }); }, - &Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => { + &Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => { 7u8.write(writer)?; write_tlv_fields!(writer, { (0, fee_earned_msat, option), + (1, source_channel_id, option), (2, claim_from_onchain_tx, required), }); }, @@ -684,12 +688,14 @@ impl MaybeReadable for Event { 7u8 => { let f = || { let mut fee_earned_msat = None; + let mut source_channel_id = None; let mut claim_from_onchain_tx = false; read_tlv_fields!(reader, { (0, fee_earned_msat, option), + (1, source_channel_id, option), (2, claim_from_onchain_tx, required), }); - Ok(Some(Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx })) + Ok(Some(Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx })) }; f() }, diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index a1e92a0f8..95826b7e0 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -20,6 +20,7 @@ pub mod errors; pub mod ser; pub mod message_signing; pub mod invoice; +pub mod persist; pub(crate) mod atomic_counter; pub(crate) mod byte_utils; diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs new file mode 100644 index 000000000..9476331c1 --- /dev/null +++ b/lightning/src/util/persist.rs @@ -0,0 +1,77 @@ +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! This module contains a simple key-value store trait KVStorePersister that +//! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`], +//! and [`ChannelMonitor`] all in one place. + +use core::ops::Deref; +use bitcoin::hashes::hex::ToHex; +use io::{self}; + +use crate::{chain::{keysinterface::{Sign, KeysInterface}, self, transaction::{OutPoint}, chaininterface::{BroadcasterInterface, FeeEstimator}, chainmonitor::{Persist, MonitorUpdateId}, channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}}, ln::channelmanager::ChannelManager, routing::network_graph::NetworkGraph}; +use super::{logger::Logger, ser::Writeable}; + +/// Trait for a key-value store for persisting some writeable object at some key +/// Implementing `KVStorePersister` provides auto-implementations for [`Persister`] +/// and [`Persist`] traits. It uses "manager", "network_graph", +/// and "monitors/{funding_txo_id}_{funding_txo_index}" for keys. +pub trait KVStorePersister { + /// Persist the given writeable using the provided key + fn persist(&self, key: &str, object: &W) -> io::Result<()>; +} + +/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk. +pub trait Persister + where M::Target: 'static + chain::Watch, + T::Target: 'static + BroadcasterInterface, + K::Target: 'static + KeysInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, +{ + /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed. + fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), io::Error>; + + /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed. + fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>; +} + +impl Persister for A + where M::Target: 'static + chain::Watch, + T::Target: 'static + BroadcasterInterface, + K::Target: 'static + KeysInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, +{ + /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed. + fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), io::Error> { + self.persist("manager", channel_manager) + } + + /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed. + fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> { + self.persist("network_graph", network_graph) + } +} + +impl Persist for K { + // TODO: We really need a way for the persister to inform the user that its time to crash/shut + // down once these start returning failure. + // A PermanentFailure implies we need to shut down since we're force-closing channels without + // even broadcasting! + + fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { + let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index); + self.persist(&key, monitor) + .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure) + } + + fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option, monitor: &ChannelMonitor, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> { + let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index); + self.persist(&key, monitor) + .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure) + } +}