//! and [`ChannelMonitor`] all in one place.
use core::ops::Deref;
-use bitcoin::hashes::hex::ToHex;
+use bitcoin::hashes::hex::{FromHex, ToHex};
+use bitcoin::{BlockHash, Txid};
+
use crate::io;
use crate::prelude::{Vec, String};
use crate::routing::scoring::WriteableScore;
use crate::routing::router::Router;
use crate::routing::gossip::NetworkGraph;
use crate::util::logger::Logger;
-use crate::util::ser::Writeable;
+use crate::util::ser::{ReadableArgs, Writeable};
/// The alphabet of characters allowed for namespaces and keys.
pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
fn list(&self, namespace: &str, sub_namespace: &str) -> io::Result<Vec<String>>;
}
-/// Trait for a key-value store for persisting some writeable object at some key
-/// Implementing `KVStorePersister` provides auto-implementations for [`Persister`]
-/// and [`Persist`] traits. It uses "manager", "network_graph",
-/// and "monitors/{funding_txo_id}_{funding_txo_index}" for keys.
-pub trait KVStorePersister {
- /// Persist the given writeable using the provided key
- fn persist<W: Writeable>(&self, key: &str, object: &W) -> io::Result<()>;
-}
-
/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
pub trait Persister<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>>
where M::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
}
-impl<'a, A: KVStorePersister, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, M, T, ES, NS, SP, F, R, L, S> for A
+
+impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, M, T, ES, NS, SP, F, R, L, S> for A
where M::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
T::Target: 'static + BroadcasterInterface,
ES::Target: 'static + EntropySource,
R::Target: 'static + Router,
L::Target: 'static + Logger,
{
- /// Persist the given ['ChannelManager'] to disk with the name "manager", returning an error if persistence failed.
+ /// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed.
fn persist_manager(&self, channel_manager: &ChannelManager<M, T, ES, NS, SP, F, R, L>) -> Result<(), io::Error> {
- self.persist("manager", channel_manager)
+ self.write(CHANNEL_MANAGER_PERSISTENCE_NAMESPACE,
+ CHANNEL_MANAGER_PERSISTENCE_SUB_NAMESPACE,
+ CHANNEL_MANAGER_PERSISTENCE_KEY,
+ &channel_manager.encode())
}
- /// Persist the given [`NetworkGraph`] to disk with the name "network_graph", returning an error if persistence failed.
+ /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
- self.persist("network_graph", network_graph)
+ self.write(NETWORK_GRAPH_PERSISTENCE_NAMESPACE,
+ NETWORK_GRAPH_PERSISTENCE_SUB_NAMESPACE,
+ NETWORK_GRAPH_PERSISTENCE_KEY,
+ &network_graph.encode())
}
- /// Persist the given [`WriteableScore`] to disk with name "scorer", returning an error if persistence failed.
+ /// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
- self.persist("scorer", &scorer)
+ self.write(SCORER_PERSISTENCE_NAMESPACE,
+ SCORER_PERSISTENCE_SUB_NAMESPACE,
+ SCORER_PERSISTENCE_KEY,
+ &scorer.encode())
}
}
-impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStorePersister> Persist<ChannelSigner> for K {
+impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
// down once these start returning failure.
- // A PermanentFailure implies we should probably just shut down the node since we're
- // force-closing channels without even broadcasting!
+ // Then we should return InProgress rather than UnrecoverableError, implying we should probably
+ // just shut down the node since we're not retrying persistence!
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
- let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
- match self.persist(&key, monitor) {
+ let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
+ match self.write(
+ CHANNEL_MONITOR_PERSISTENCE_NAMESPACE,
+ CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE,
+ &key, &monitor.encode())
+ {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
- Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
+ Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
}
}
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
- let key = format!("monitors/{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
- match self.persist(&key, monitor) {
+ let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
+ match self.write(
+ CHANNEL_MONITOR_PERSISTENCE_NAMESPACE,
+ CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE,
+ &key, &monitor.encode())
+ {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
- Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
+ Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
+ }
+ }
+}
+
+/// Read previously persisted [`ChannelMonitor`]s from the store.
+pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
+ kv_store: K, entropy_source: ES, signer_provider: SP,
+) -> io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
+where
+ K::Target: KVStore,
+ ES::Target: EntropySource + Sized,
+ SP::Target: SignerProvider + Sized,
+{
+ let mut res = Vec::new();
+
+ for stored_key in kv_store.list(
+ CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE)?
+ {
+ if stored_key.len() < 66 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Stored key has invalid length"));
+ }
+
+ let txid = Txid::from_hex(stored_key.split_at(64).0).map_err(|_| {
+ io::Error::new(io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
+ })?;
+
+ let index: u16 = stored_key.split_at(65).1.parse().map_err(|_| {
+ io::Error::new(io::ErrorKind::InvalidData, "Invalid tx index in stored key")
+ })?;
+
+ match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>::read(
+ &mut io::Cursor::new(
+ kv_store.read(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SUB_NAMESPACE, &stored_key)?),
+ (&*entropy_source, &*signer_provider),
+ ) {
+ Ok((block_hash, channel_monitor)) => {
+ if channel_monitor.get_funding_txo().0.txid != txid
+ || channel_monitor.get_funding_txo().0.index != index
+ {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "ChannelMonitor was stored under the wrong key",
+ ));
+ }
+ res.push((block_hash, channel_monitor));
+ }
+ Err(_) => {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Failed to deserialize ChannelMonitor"
+ ))
+ }
}
}
+ Ok(res)
}