Merge pull request #1378 from ViktorTigerstrom/2022-03-include-htlc-min-max
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Thu, 21 Apr 2022 18:09:20 +0000 (18:09 +0000)
committerGitHub <noreply@github.com>
Thu, 21 Apr 2022 18:09:20 +0000 (18:09 +0000)
Include htlc min/max for ChannelDetails and ChannelCounterparty

14 files changed:
lightning-background-processor/Cargo.toml
lightning-background-processor/src/lib.rs
lightning-persister/src/lib.rs
lightning-persister/src/util.rs
lightning/src/ln/chanmon_update_fail_tests.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/payment_tests.rs
lightning/src/ln/reorg_tests.rs
lightning/src/ln/shutdown_tests.rs
lightning/src/util/events.rs
lightning/src/util/mod.rs
lightning/src/util/persist.rs [new file with mode: 0644]

index bd6d54d871a2a20a49392f34e5810e64e6e0a8b7..16ec763fb8c3a4f48f5bb3ea773e4372b12984f3 100644 (file)
@@ -16,8 +16,8 @@ rustdoc-args = ["--cfg", "docsrs"]
 [dependencies]
 bitcoin = "0.27"
 lightning = { version = "0.0.106", path = "../lightning", features = ["std"] }
-lightning-persister = { version = "0.0.106", path = "../lightning-persister" }
 
 [dev-dependencies]
 lightning = { version = "0.0.106", path = "../lightning", features = ["_test_utils"] }
 lightning-invoice = { version = "0.14.0", path = "../lightning-invoice" }
+lightning-persister = { version = "0.0.106", path = "../lightning-persister" }
index 73f420c98a429ddba67b20490bf9dbb060a9e034..6beee915b309772ae500bd61211a929e6382682d 100644 (file)
@@ -20,6 +20,7 @@ use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescr
 use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
 use lightning::util::events::{Event, EventHandler, EventsProvider};
 use lightning::util::logger::Logger;
+use lightning::util::persist::Persister;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::thread;
@@ -80,22 +81,6 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
 #[cfg(test)]
 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
 
-/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
-pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
-where
-       M::Target: 'static + chain::Watch<Signer>,
-       T::Target: 'static + BroadcasterInterface,
-       K::Target: 'static + KeysInterface<Signer = Signer>,
-       F::Target: 'static + FeeEstimator,
-       L::Target: 'static + Logger,
-{
-       /// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
-       /// (which will cause the [`BackgroundProcessor`] which called this method to exit).
-       fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;
-
-       /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
-       fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
-}
 
 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
 struct DecoratingEventHandler<
@@ -138,12 +123,12 @@ impl BackgroundProcessor {
        ///
        /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
        /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
-       /// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's
+       /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
        /// provided implementation.
        ///
        /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
-       /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_network_graph`]
-       /// for Rust-Lightning's provided implementation.
+       /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See the `lightning-persister` crate
+       /// for LDK's provided implementation.
        ///
        /// Typically, users should either implement [`Persister::persist_manager`] to never return an
        /// error or call [`join`] and handle any error that may arise. For the latter case,
@@ -161,8 +146,8 @@ impl BackgroundProcessor {
        /// [`stop`]: Self::stop
        /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
        /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
-       /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
-       /// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph
+       /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
+       /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
        /// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
        /// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
        pub fn start<
@@ -180,7 +165,7 @@ impl BackgroundProcessor {
                CMH: 'static + Deref + Send + Sync,
                RMH: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
-               PS: 'static + Send + Persister<Signer, CW, T, K, F, L>,
+               PS: 'static + Deref + Send,
                M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
                CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
                NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
@@ -202,6 +187,7 @@ impl BackgroundProcessor {
                CMH::Target: 'static + ChannelMessageHandler,
                RMH::Target: 'static + RoutingMessageHandler,
                UMH::Target: 'static + CustomMessageHandler,
+               PS::Target: 'static + Persister<Signer, CW, T, K, F, L>
        {
                let stop_thread = Arc::new(AtomicBool::new(false));
                let stop_thread_clone = stop_thread.clone();
@@ -365,10 +351,11 @@ mod tests {
        use lightning::util::logger::Logger;
        use lightning::util::ser::Writeable;
        use lightning::util::test_utils;
+       use lightning::util::persist::KVStorePersister;
        use lightning_invoice::payment::{InvoicePayer, RetryAttempts};
        use lightning_invoice::utils::DefaultRouter;
        use lightning_persister::FilesystemPersister;
-       use std::fs;
+       use std::fs::{self, File};
        use std::ops::Deref;
        use std::path::PathBuf;
        use std::sync::{Arc, Mutex};
@@ -414,12 +401,14 @@ mod tests {
        struct Persister {
                data_dir: String,
                graph_error: Option<(std::io::ErrorKind, &'static str)>,
-               manager_error: Option<(std::io::ErrorKind, &'static str)>
+               manager_error: Option<(std::io::ErrorKind, &'static str)>,
+               filesystem_persister: FilesystemPersister,
        }
 
        impl Persister {
                fn new(data_dir: String) -> Self {
-                       Self { data_dir, graph_error: None, manager_error: None }
+                       let filesystem_persister = FilesystemPersister::new(data_dir.clone());
+                       Self { data_dir, graph_error: None, manager_error: None, filesystem_persister }
                }
 
                fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
@@ -431,25 +420,21 @@ mod tests {
                }
        }
 
-       impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for Persister where
-               M::Target: 'static + chain::Watch<Signer>,
-               T::Target: 'static + BroadcasterInterface,
-               K::Target: 'static + KeysInterface<Signer = Signer>,
-               F::Target: 'static + FeeEstimator,
-               L::Target: 'static + Logger,
-       {
-               fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
-                       match self.manager_error {
-                               None => FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager),
-                               Some((error, message)) => Err(std::io::Error::new(error, message)),
+       impl KVStorePersister for Persister {
+               fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
+                       if key == "manager" {
+                               if let Some((error, message)) = self.manager_error {
+                                       return Err(std::io::Error::new(error, message))
+                               }
                        }
-               }
 
-               fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
-                       match self.graph_error {
-                               None => FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph),
-                               Some((error, message)) => Err(std::io::Error::new(error, message)),
+                       if key == "network_graph" {
+                               if let Some((error, message)) = self.graph_error {
+                                       return Err(std::io::Error::new(error, message))
+                               }
                        }
+
+                       self.filesystem_persister.persist(key, object)
                }
        }
 
@@ -576,7 +561,7 @@ mod tests {
 
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = Persister::new(data_dir);
+               let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: &_| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
 
@@ -637,7 +622,7 @@ mod tests {
                // `FRESHNESS_TIMER`.
                let nodes = create_nodes(1, "test_timer_tick_called".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = Persister::new(data_dir);
+               let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: &_| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
                loop {
@@ -660,7 +645,7 @@ mod tests {
                open_channel!(nodes[0], nodes[1], 100000);
 
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test");
+               let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: &_| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
                match bg_processor.join() {
@@ -677,7 +662,7 @@ mod tests {
                // Test that if we encounter an error during network graph persistence, an error gets returned.
                let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test");
+               let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: &_| {};
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
 
@@ -695,7 +680,7 @@ mod tests {
                let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
                let channel_value = 100000;
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = Persister::new(data_dir.clone());
+               let persister = Arc::new(Persister::new(data_dir.clone()));
 
                // Set up a background event handler for FundingGenerationReady events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
@@ -726,7 +711,8 @@ mod tests {
                // Set up a background event handler for SpendableOutputs events.
                let (sender, receiver) = std::sync::mpsc::sync_channel(1);
                let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
-               let bg_processor = BackgroundProcessor::start(Persister::new(data_dir), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
+               let persister = Arc::new(Persister::new(data_dir));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
 
                // Force close the channel and check that the SpendableOutputs event was handled.
                nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
@@ -752,7 +738,7 @@ mod tests {
 
                // Initiate the background processors to watch each node.
                let data_dir = nodes[0].persister.get_data_dir();
-               let persister = Persister::new(data_dir);
+               let persister = Arc::new(Persister::new(data_dir));
                let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
                let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
                let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
index 4500621276dbef797ac42f94008cd72f2cfb8686..c23baf8ad34bccb7ea7efcaa32639a43f4b23c42 100644 (file)
@@ -15,20 +15,13 @@ extern crate bitcoin;
 extern crate libc;
 
 use bitcoin::hash_types::{BlockHash, Txid};
-use bitcoin::hashes::hex::{FromHex, ToHex};
-use lightning::routing::network_graph::NetworkGraph;
-use crate::util::DiskWriteable;
-use lightning::chain;
-use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
-use lightning::chain::chainmonitor;
+use bitcoin::hashes::hex::FromHex;
+use lightning::chain::channelmonitor::ChannelMonitor;
 use lightning::chain::keysinterface::{Sign, KeysInterface};
-use lightning::chain::transaction::OutPoint;
-use lightning::ln::channelmanager::ChannelManager;
-use lightning::util::logger::Logger;
 use lightning::util::ser::{ReadableArgs, Writeable};
+use lightning::util::persist::KVStorePersister;
 use std::fs;
-use std::io::{Cursor, Error, Write};
+use std::io::Cursor;
 use std::ops::Deref;
 use std::path::{Path, PathBuf};
 
@@ -48,31 +41,6 @@ pub struct FilesystemPersister {
        path_to_channel_data: String,
 }
 
-impl<Signer: Sign> DiskWriteable for ChannelMonitor<Signer> {
-       fn write_to_file<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
-               self.write(writer)
-       }
-}
-
-impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> DiskWriteable for ChannelManager<Signer, M, T, K, F, L>
-where
-       M::Target: chain::Watch<Signer>,
-       T::Target: BroadcasterInterface,
-       K::Target: KeysInterface<Signer=Signer>,
-       F::Target: FeeEstimator,
-       L::Target: Logger,
-{
-       fn write_to_file<W: Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
-               self.write(writer)
-       }
-}
-
-impl DiskWriteable for NetworkGraph {
-       fn write_to_file<W: Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
-               self.write(writer)
-       }
-}
-
 impl FilesystemPersister {
        /// Initialize a new FilesystemPersister and set the path to the individual channels'
        /// files.
@@ -87,43 +55,14 @@ impl FilesystemPersister {
                self.path_to_channel_data.clone()
        }
 
-       pub(crate) fn path_to_monitor_data(&self) -> PathBuf {
-               let mut path = PathBuf::from(self.path_to_channel_data.clone());
-               path.push("monitors");
-               path
-       }
-
-       /// Writes the provided `ChannelManager` to the path provided at `FilesystemPersister`
-       /// initialization, within a file called "manager".
-       pub fn persist_manager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>(
-               data_dir: String,
-               manager: &ChannelManager<Signer, M, T, K, F, L>
-       ) -> Result<(), std::io::Error>
-       where
-               M::Target: chain::Watch<Signer>,
-               T::Target: BroadcasterInterface,
-               K::Target: KeysInterface<Signer=Signer>,
-               F::Target: FeeEstimator,
-               L::Target: Logger,
-       {
-               let path = PathBuf::from(data_dir);
-               util::write_to_file(path, "manager".to_string(), manager)
-       }
-
-       /// Write the provided `NetworkGraph` to the path provided at `FilesystemPersister`
-       /// initialization, within a file called "network_graph"
-       pub fn persist_network_graph(data_dir: String, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
-               let path = PathBuf::from(data_dir);
-               util::write_to_file(path, "network_graph".to_string(), network_graph)
-       }
-
        /// Read `ChannelMonitor`s from disk.
        pub fn read_channelmonitors<Signer: Sign, K: Deref> (
                &self, keys_manager: K
        ) -> Result<Vec<(BlockHash, ChannelMonitor<Signer>)>, std::io::Error>
                where K::Target: KeysInterface<Signer=Signer> + Sized,
        {
-               let path = self.path_to_monitor_data();
+               let mut path = PathBuf::from(&self.path_to_channel_data);
+               path.push("monitors");
                if !Path::new(&path).exists() {
                        return Ok(Vec::new());
                }
@@ -180,22 +119,11 @@ impl FilesystemPersister {
        }
 }
 
-impl<ChannelSigner: Sign> chainmonitor::Persist<ChannelSigner> for FilesystemPersister {
-       // TODO: We really need a way for the persister to inform the user that its time to crash/shut
-       // down once these start returning failure.
-       // A PermanentFailure implies we need to shut down since we're force-closing channels without
-       // even broadcasting!
-
-       fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
-               let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
-               util::write_to_file(self.path_to_monitor_data(), filename, monitor)
-                       .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
-       }
-
-       fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option<ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
-               let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
-               util::write_to_file(self.path_to_monitor_data(), filename, monitor)
-                       .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
+impl KVStorePersister for FilesystemPersister {
+       fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
+               let mut dest_file = PathBuf::from(self.path_to_channel_data.clone());
+               dest_file.push(key);
+               util::write_to_file(dest_file, object)
        }
 }
 
index f26296794c6858a28fc4aa5bce46055a55657cb6..25bd00f5e9539ee8b486b79cb7436278015a5698 100644 (file)
@@ -2,27 +2,20 @@
 extern crate winapi;
 
 use std::fs;
-use std::path::{Path, PathBuf};
-use std::io::{BufWriter, Write};
+use std::path::PathBuf;
+use std::io::BufWriter;
 
 #[cfg(not(target_os = "windows"))]
 use std::os::unix::io::AsRawFd;
 
+use lightning::util::ser::Writeable;
+
 #[cfg(target_os = "windows")]
 use {
        std::ffi::OsStr,
        std::os::windows::ffi::OsStrExt
 };
 
-pub(crate) trait DiskWriteable {
-       fn write_to_file<W: Write>(&self, writer: &mut W) -> Result<(), std::io::Error>;
-}
-
-pub(crate) fn get_full_filepath(mut filepath: PathBuf, filename: String) -> String {
-       filepath.push(filename);
-       filepath.to_str().unwrap().to_string()
-}
-
 #[cfg(target_os = "windows")]
 macro_rules! call {
        ($e: expr) => (
@@ -40,45 +33,43 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<winapi::shared::ntdef::W
 }
 
 #[allow(bare_trait_objects)]
-pub(crate) fn write_to_file<D: DiskWriteable>(path: PathBuf, filename: String, data: &D) -> std::io::Result<()> {
-       fs::create_dir_all(path.clone())?;
+pub(crate) fn write_to_file<W: Writeable>(dest_file: PathBuf, data: &W) -> std::io::Result<()> {
+       let mut tmp_file = dest_file.clone();
+       tmp_file.set_extension("tmp");
+
+       let parent_directory = dest_file.parent().unwrap();
+       fs::create_dir_all(parent_directory)?;
        // Do a crazy dance with lots of fsync()s to be overly cautious here...
        // We never want to end up in a state where we've lost the old data, or end up using the
        // old data on power loss after we've returned.
        // The way to atomically write a file on Unix platforms is:
        // open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
-       let filename_with_path = get_full_filepath(path, filename);
-       let tmp_filename = format!("{}.tmp", filename_with_path.clone());
-
        {
                // Note that going by rust-lang/rust@d602a6b, on MacOS it is only safe to use
                // rust stdlib 1.36 or higher.
-               let mut buf = BufWriter::new(fs::File::create(&tmp_filename)?);
-               data.write_to_file(&mut buf)?;
+               let mut buf = BufWriter::new(fs::File::create(&tmp_file)?);
+               data.write(&mut buf)?;
                buf.into_inner()?.sync_all()?;
        }
        // Fsync the parent directory on Unix.
        #[cfg(not(target_os = "windows"))]
        {
-               fs::rename(&tmp_filename, &filename_with_path)?;
-               let path = Path::new(&filename_with_path).parent().unwrap();
-               let dir_file = fs::OpenOptions::new().read(true).open(path)?;
+               fs::rename(&tmp_file, &dest_file)?;
+               let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
                unsafe { libc::fsync(dir_file.as_raw_fd()); }
        }
        #[cfg(target_os = "windows")]
        {
-               let src = PathBuf::from(tmp_filename.clone());
-               let dst = PathBuf::from(filename_with_path.clone());
-               if Path::new(&filename_with_path.clone()).exists() {
+               if dest_file.exists() {
                        unsafe {winapi::um::winbase::ReplaceFileW(
-                               path_to_windows_str(dst).as_ptr(), path_to_windows_str(src).as_ptr(), std::ptr::null(),
+                               path_to_windows_str(dest_file).as_ptr(), path_to_windows_str(tmp_file).as_ptr(), std::ptr::null(),
                                winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS,
                                std::ptr::null_mut() as *mut winapi::ctypes::c_void,
                                std::ptr::null_mut() as *mut winapi::ctypes::c_void
                        )};
                } else {
                        call!(unsafe {winapi::um::winbase::MoveFileExW(
-                               path_to_windows_str(src).as_ptr(), path_to_windows_str(dst).as_ptr(),
+                               path_to_windows_str(tmp_file).as_ptr(), path_to_windows_str(dest_file).as_ptr(),
                                winapi::um::winbase::MOVEFILE_WRITE_THROUGH | winapi::um::winbase::MOVEFILE_REPLACE_EXISTING
                        )});
                }
@@ -88,15 +79,17 @@ pub(crate) fn write_to_file<D: DiskWriteable>(path: PathBuf, filename: String, d
 
 #[cfg(test)]
 mod tests {
-       use super::{DiskWriteable, get_full_filepath, write_to_file};
+       use lightning::util::ser::{Writer, Writeable};
+
+       use super::{write_to_file};
        use std::fs;
        use std::io;
        use std::io::Write;
        use std::path::PathBuf;
 
        struct TestWriteable{}
-       impl DiskWriteable for TestWriteable {
-               fn write_to_file<W: Write>(&self, writer: &mut W) -> Result<(), io::Error> {
+       impl Writeable for TestWriteable {
+               fn write<W: Writer>(&self, writer: &mut W) -> Result<(), std::io::Error> {
                        writer.write_all(&[42; 1])
                }
        }
@@ -114,7 +107,9 @@ mod tests {
                let mut perms = fs::metadata(path.to_string()).unwrap().permissions();
                perms.set_readonly(true);
                fs::set_permissions(path.to_string(), perms).unwrap();
-               match write_to_file(PathBuf::from(path.to_string()), filename, &test_writeable) {
+               let mut dest_file = PathBuf::from(path);
+               dest_file.push(filename);
+               match write_to_file(dest_file, &test_writeable) {
                        Err(e) => assert_eq!(e.kind(), io::ErrorKind::PermissionDenied),
                        _ => panic!("Unexpected error message")
                }
@@ -132,10 +127,12 @@ mod tests {
        fn test_rename_failure() {
                let test_writeable = TestWriteable{};
                let filename = "test_rename_failure_filename";
-               let path = PathBuf::from("test_rename_failure_dir");
+               let path = "test_rename_failure_dir";
+               let mut dest_file = PathBuf::from(path);
+               dest_file.push(filename);
                // Create the channel data file and make it a directory.
-               fs::create_dir_all(get_full_filepath(path.clone(), filename.to_string())).unwrap();
-               match write_to_file(path.clone(), filename.to_string(), &test_writeable) {
+               fs::create_dir_all(dest_file.clone()).unwrap();
+               match write_to_file(dest_file, &test_writeable) {
                        Err(e) => assert_eq!(e.raw_os_error(), Some(libc::EISDIR)),
                        _ => panic!("Unexpected Ok(())")
                }
@@ -145,16 +142,18 @@ mod tests {
        #[test]
        fn test_diskwriteable_failure() {
                struct FailingWriteable {}
-               impl DiskWriteable for FailingWriteable {
-                       fn write_to_file<W: Write>(&self, _writer: &mut W) -> Result<(), std::io::Error> {
+               impl Writeable for FailingWriteable {
+                       fn write<W: Writer>(&self, _writer: &mut W) -> Result<(), std::io::Error> {
                                Err(std::io::Error::new(std::io::ErrorKind::Other, "expected failure"))
                        }
                }
 
                let filename = "test_diskwriteable_failure";
-               let path = PathBuf::from("test_diskwriteable_failure_dir");
+               let path = "test_diskwriteable_failure_dir";
                let test_writeable = FailingWriteable{};
-               match write_to_file(path.clone(), filename.to_string(), &test_writeable) {
+               let mut dest_file = PathBuf::from(path);
+               dest_file.push(filename);
+               match write_to_file(dest_file, &test_writeable) {
                        Err(e) => {
                                assert_eq!(e.kind(), std::io::ErrorKind::Other);
                                assert_eq!(e.get_ref().unwrap().to_string(), "expected failure");
@@ -171,12 +170,13 @@ mod tests {
        fn test_tmp_file_creation_failure() {
                let test_writeable = TestWriteable{};
                let filename = "test_tmp_file_creation_failure_filename".to_string();
-               let path = PathBuf::from("test_tmp_file_creation_failure_dir");
-
-               // Create the tmp file and make it a directory.
-               let tmp_path = get_full_filepath(path.clone(), format!("{}.tmp", filename.clone()));
-               fs::create_dir_all(tmp_path).unwrap();
-               match write_to_file(path, filename, &test_writeable) {
+               let path = "test_tmp_file_creation_failure_dir";
+               let mut dest_file = PathBuf::from(path);
+               dest_file.push(filename);
+               let mut tmp_file = dest_file.clone();
+               tmp_file.set_extension("tmp");
+               fs::create_dir_all(tmp_file).unwrap();
+               match write_to_file(dest_file, &test_writeable) {
                        Err(e) => {
                                #[cfg(not(target_os = "windows"))]
                                assert_eq!(e.raw_os_error(), Some(libc::EISDIR));
index 94af00c7c72e8eebd3263a0a16e50a93c2e32cd7..58fe30ba2618026311066f347da4a99f2d5f4c3f 100644 (file)
@@ -1102,7 +1102,7 @@ fn test_monitor_update_fail_reestablish() {
        assert!(updates.update_fee.is_none());
        assert_eq!(updates.update_fulfill_htlcs.len(), 1);
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
-       expect_payment_forwarded!(nodes[1], Some(1000), false);
+       expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
        check_added_monitors!(nodes[1], 1);
        assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
        commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
@@ -2087,7 +2087,7 @@ fn test_fail_htlc_on_broadcast_after_claim() {
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]);
        let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
        check_added_monitors!(nodes[1], 1);
-       expect_payment_forwarded!(nodes[1], Some(1000), false);
+       expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
 
        mine_transaction(&nodes[1], &bs_txn[0]);
        check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
@@ -2423,7 +2423,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f
        let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
 
        create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
-       let chan_2 = create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known()).2;
+       let chan_id_2 = create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known()).2;
 
        let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 100_000);
 
@@ -2450,7 +2450,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f
        }
 
        let fulfill_msg = msgs::UpdateFulfillHTLC {
-               channel_id: chan_2,
+               channel_id: chan_id_2,
                htlc_id: 0,
                payment_preimage,
        };
@@ -2468,7 +2468,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f
                assert_eq!(fulfill_msg, cs_updates.update_fulfill_htlcs[0]);
        }
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &fulfill_msg);
-       expect_payment_forwarded!(nodes[1], Some(1000), false);
+       expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
        check_added_monitors!(nodes[1], 1);
 
        let mut bs_updates = None;
index 020fc22cd1b79c4f684e047dff9355cd0825b41a..93746f0fde05cbc6b3ae3272a6851f36ef7d993c 100644 (file)
@@ -4043,7 +4043,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
                                                } else { None };
 
                                                let mut pending_events = self.pending_events.lock().unwrap();
+
+                                               let source_channel_id = Some(prev_outpoint.to_channel_id());
                                                pending_events.push(events::Event::PaymentForwarded {
+                                                       source_channel_id,
                                                        fee_earned_msat,
                                                        claim_from_onchain_tx: from_onchain,
                                                });
index 3977952e9847cda1d42ea78f8ce8bababc9391e0..9bbd6c0e92aad17decad799c6b866d67e3fe1958 100644 (file)
@@ -1327,12 +1327,16 @@ macro_rules! expect_payment_path_successful {
 }
 
 macro_rules! expect_payment_forwarded {
-       ($node: expr, $expected_fee: expr, $upstream_force_closed: expr) => {
+       ($node: expr, $source_node: expr, $expected_fee: expr, $upstream_force_closed: expr) => {
                let events = $node.node.get_and_clear_pending_events();
                assert_eq!(events.len(), 1);
                match events[0] {
-                       Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => {
+                       Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
                                assert_eq!(fee_earned_msat, $expected_fee);
+                               if fee_earned_msat.is_some() {
+                                       // Is the event channel_id in one of the channels between the two nodes?
+                                       assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $source_node.node.get_our_node_id() && x.channel_id == source_channel_id.unwrap()));
+                               }
                                assert_eq!(claim_from_onchain_tx, $upstream_force_closed);
                        },
                        _ => panic!("Unexpected event"),
@@ -1571,11 +1575,11 @@ pub fn do_claim_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>,
                        }
                }
                macro_rules! mid_update_fulfill_dance {
-                       ($node: expr, $prev_node: expr, $new_msgs: expr) => {
+                       ($node: expr, $prev_node: expr, $next_node: expr, $new_msgs: expr) => {
                                {
                                        $node.node.handle_update_fulfill_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0);
                                        let fee = $node.node.channel_state.lock().unwrap().by_id.get(&next_msgs.as_ref().unwrap().0.channel_id).unwrap().config.forwarding_fee_base_msat;
-                                       expect_payment_forwarded!($node, Some(fee as u64), false);
+                                       expect_payment_forwarded!($node, $next_node, Some(fee as u64), false);
                                        expected_total_fee_msat += fee as u64;
                                        check_added_monitors!($node, 1);
                                        let new_next_msgs = if $new_msgs {
@@ -1599,7 +1603,14 @@ pub fn do_claim_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>,
                        assert_eq!(expected_next_node, node.node.get_our_node_id());
                        let update_next_msgs = !skip_last || idx != expected_route.len() - 1;
                        if next_msgs.is_some() {
-                               mid_update_fulfill_dance!(node, prev_node, update_next_msgs);
+                               // Since we are traversing in reverse, next_node is actually the previous node
+                               let next_node: &Node;
+                               if idx == expected_route.len() - 1 {
+                                       next_node = origin_node;
+                               } else {
+                                       next_node = expected_route[expected_route.len() - 1 - idx - 1];
+                               }
+                               mid_update_fulfill_dance!(node, prev_node, next_node, update_next_msgs);
                        } else {
                                assert!(!update_next_msgs);
                                assert!(node.node.get_and_clear_pending_msg_events().is_empty());
index 68079aaa76db83a24a30766156bdd9e5be2a7c37..d7bebca0e4dff41088c4583632a302f700124314 100644 (file)
@@ -2684,10 +2684,23 @@ fn test_htlc_on_chain_success() {
                Event::ChannelClosed { reason: ClosureReason::CommitmentTxConfirmed, .. } => {}
                _ => panic!("Unexpected event"),
        }
-       if let Event::PaymentForwarded { fee_earned_msat: Some(1000), claim_from_onchain_tx: true } = forwarded_events[1] {
-               } else { panic!(); }
-       if let Event::PaymentForwarded { fee_earned_msat: Some(1000), claim_from_onchain_tx: true } = forwarded_events[2] {
-               } else { panic!(); }
+       let chan_id = Some(chan_1.2);
+       match forwarded_events[1] {
+               Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
+                       assert_eq!(fee_earned_msat, Some(1000));
+                       assert_eq!(source_channel_id, chan_id);
+                       assert_eq!(claim_from_onchain_tx, true);
+               },
+               _ => panic!()
+       }
+       match forwarded_events[2] {
+               Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
+                       assert_eq!(fee_earned_msat, Some(1000));
+                       assert_eq!(source_channel_id, chan_id);
+                       assert_eq!(claim_from_onchain_tx, true);
+               },
+               _ => panic!()
+       }
        let events = nodes[1].node.get_and_clear_pending_msg_events();
        {
                let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
@@ -5104,8 +5117,9 @@ fn test_onchain_to_onchain_claim() {
                _ => panic!("Unexpected event"),
        }
        match events[1] {
-               Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => {
+               Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
                        assert_eq!(fee_earned_msat, Some(1000));
+                       assert_eq!(source_channel_id, Some(chan_1.2));
                        assert_eq!(claim_from_onchain_tx, true);
                },
                _ => panic!("Unexpected event"),
@@ -5273,7 +5287,7 @@ fn test_duplicate_payment_hash_one_failure_one_success() {
        // Note that the fee paid is effectively double as the HTLC value (including the nodes[1] fee
        // and nodes[2] fee) is rounded down and then claimed in full.
        mine_transaction(&nodes[1], &htlc_success_txn[0]);
-       expect_payment_forwarded!(nodes[1], Some(196*2), true);
+       expect_payment_forwarded!(nodes[1], nodes[0], Some(196*2), true);
        let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
        assert!(updates.update_add_htlcs.is_empty());
        assert!(updates.update_fail_htlcs.is_empty());
@@ -8855,7 +8869,7 @@ fn do_test_onchain_htlc_settlement_after_close(broadcast_alice: bool, go_onchain
        assert_eq!(carol_updates.update_fulfill_htlcs.len(), 1);
 
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &carol_updates.update_fulfill_htlcs[0]);
-       expect_payment_forwarded!(nodes[1], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false);
+       expect_payment_forwarded!(nodes[1], nodes[0], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false);
        // If Alice broadcasted but Bob doesn't know yet, here he prepares to tell her about the preimage.
        if !go_onchain_before_fulfill && broadcast_alice {
                let events = nodes[1].node.get_and_clear_pending_msg_events();
index 346fb98b41eda3d895f9887575764db0ecc44503..46d5d22b49a07112f680086c3e701d902088fcd1 100644 (file)
@@ -495,7 +495,7 @@ fn do_retry_with_no_persist(confirm_before_reload: bool) {
        let bs_htlc_claim_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
        assert_eq!(bs_htlc_claim_txn.len(), 1);
        check_spends!(bs_htlc_claim_txn[0], as_commitment_tx);
-       expect_payment_forwarded!(nodes[1], None, false);
+       expect_payment_forwarded!(nodes[1], nodes[0], None, false);
 
        if !confirm_before_reload {
                mine_transaction(&nodes[0], &as_commitment_tx);
index 8eb39cfe92a98cacc6ab0bd854d85334a8ded939..7b36ae0fc4a9d2af97badcbb0c503a1262fdf64d 100644 (file)
@@ -138,7 +138,7 @@ fn do_test_onchain_htlc_reorg(local_commitment: bool, claim: bool) {
                // ChannelManager only polls chain::Watch::release_pending_monitor_events when we
                // probe it for events, so we probe non-message events here (which should just be the
                // PaymentForwarded event).
-               expect_payment_forwarded!(nodes[1], Some(1000), true);
+               expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), true);
        } else {
                // Confirm the timeout tx and check that we fail the HTLC backwards
                let block = Block {
index 557aff84c0dad7c6d595e4a6da6f1bcf24d6fced..42c0e3b2f328cc1545edfdf3287dddab9e766ffe 100644 (file)
@@ -110,7 +110,7 @@ fn updates_shutdown_wait() {
        assert!(updates.update_fee.is_none());
        assert_eq!(updates.update_fulfill_htlcs.len(), 1);
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
-       expect_payment_forwarded!(nodes[1], Some(1000), false);
+       expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
        check_added_monitors!(nodes[1], 1);
        let updates_2 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
        commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
@@ -279,7 +279,7 @@ fn do_test_shutdown_rebroadcast(recv_count: u8) {
        assert!(updates.update_fee.is_none());
        assert_eq!(updates.update_fulfill_htlcs.len(), 1);
        nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
-       expect_payment_forwarded!(nodes[1], Some(1000), false);
+       expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
        check_added_monitors!(nodes[1], 1);
        let updates_2 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
        commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
index ea50398b5e54ad2d2d1c691dadd31b0f3fe341fb..d7eb16eee5267107224fd4b5ff2240f6b56ea1bb 100644 (file)
@@ -343,6 +343,9 @@ pub enum Event {
        /// This event is generated when a payment has been successfully forwarded through us and a
        /// forwarding fee earned.
        PaymentForwarded {
+               /// The channel between the source node and us. Optional because versions prior to 0.0.107
+               /// do not serialize this field.
+               source_channel_id: Option<[u8; 32]>,
                /// The fee, in milli-satoshis, which was earned as a result of the payment.
                ///
                /// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC
@@ -520,10 +523,11 @@ impl Writeable for Event {
                                        (0, VecWriteWrapper(outputs), required),
                                });
                        },
-                       &Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => {
+                       &Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
                                7u8.write(writer)?;
                                write_tlv_fields!(writer, {
                                        (0, fee_earned_msat, option),
+                                       (1, source_channel_id, option),
                                        (2, claim_from_onchain_tx, required),
                                });
                        },
@@ -684,12 +688,14 @@ impl MaybeReadable for Event {
                        7u8 => {
                                let f = || {
                                        let mut fee_earned_msat = None;
+                                       let mut source_channel_id = None;
                                        let mut claim_from_onchain_tx = false;
                                        read_tlv_fields!(reader, {
                                                (0, fee_earned_msat, option),
+                                               (1, source_channel_id, option),
                                                (2, claim_from_onchain_tx, required),
                                        });
-                                       Ok(Some(Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx }))
+                                       Ok(Some(Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx }))
                                };
                                f()
                        },
index a1e92a0f8cfe29ce2f72ff300b65f021645ab616..95826b7e06ee73e0a02b3e1b36cf8ac6ef9ed12e 100644 (file)
@@ -20,6 +20,7 @@ pub mod errors;
 pub mod ser;
 pub mod message_signing;
 pub mod invoice;
+pub mod persist;
 
 pub(crate) mod atomic_counter;
 pub(crate) mod byte_utils;
diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs
new file mode 100644 (file)
index 0000000..9476331
--- /dev/null
@@ -0,0 +1,77 @@
+// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
+// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
+// You may not use this file except in accordance with one or both of these
+// licenses.
+
+//! This module contains a simple key-value store trait KVStorePersister that
+//! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`],
+//! and [`ChannelMonitor`] all in one place.
+
+use core::ops::Deref;
+use bitcoin::hashes::hex::ToHex;
+use io::{self};
+
+use crate::{chain::{keysinterface::{Sign, KeysInterface}, self, transaction::{OutPoint}, chaininterface::{BroadcasterInterface, FeeEstimator}, chainmonitor::{Persist, MonitorUpdateId}, channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}}, ln::channelmanager::ChannelManager, routing::network_graph::NetworkGraph};
+use super::{logger::Logger, ser::Writeable};
+
+/// Trait for a key-value store for persisting some writeable object at some key
+/// Implementing `KVStorePersister` provides auto-implementations for [`Persister`]
+/// and [`Persist`] traits.  It uses "manager", "network_graph",
+/// and "monitors/{funding_txo_id}_{funding_txo_index}" for keys.
+pub trait KVStorePersister {
+       /// Persist the given writeable using the provided key
+       fn persist<W: Writeable>(&self, key: &str, object: &W) -> io::Result<()>;
+}
+
+/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
+pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
+       where M::Target: 'static + chain::Watch<Signer>,
+               T::Target: 'static + BroadcasterInterface,
+               K::Target: 'static + KeysInterface<Signer = Signer>,
+               F::Target: 'static + FeeEstimator,
+               L::Target: 'static + Logger,
+{
+       /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
+       fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), io::Error>;
+
+       /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
+       fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>;
+}
+
+impl<A: KVStorePersister, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Persister<Signer, M, T, K, F, L> for A
+       where M::Target: 'static + chain::Watch<Signer>,
+               T::Target: 'static + BroadcasterInterface,
+               K::Target: 'static + KeysInterface<Signer = Signer>,
+               F::Target: 'static + FeeEstimator,
+               L::Target: 'static + Logger,
+{
+       /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
+       fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), io::Error> {
+               self.persist("manager", channel_manager)
+       }
+
+       /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
+       fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> {
+               self.persist("network_graph", network_graph)
+       }
+}
+
+impl<ChannelSigner: Sign, K: KVStorePersister> Persist<ChannelSigner> for K {
+       // TODO: We really need a way for the persister to inform the user that its time to crash/shut
+       // down once these start returning failure.
+       // A PermanentFailure implies we need to shut down since we're force-closing channels without
+       // even broadcasting!
+
+       fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
+               let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
+               self.persist(&key, monitor)
+                       .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
+       }
+
+       fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option<ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
+               let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
+               self.persist(&key, monitor)
+                       .map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
+       }
+}