//!
//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
+use bitcoin::{BlockHash, Txid};
use core::cmp;
use core::ops::Deref;
use core::str::FromStr;
-use bitcoin::{BlockHash, Txid};
-use crate::{io, log_error};
use crate::prelude::*;
+use crate::{io, log_error};
use crate::chain;
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use crate::chain::chainmonitor::Persist;
-use crate::sign::{EntropySource, ecdsa::EcdsaChannelSigner, SignerProvider};
+use crate::chain::channelmonitor::{
+ ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID,
+};
use crate::chain::transaction::OutPoint;
-use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID};
use crate::ln::channelmanager::AChannelManager;
use crate::routing::gossip::NetworkGraph;
use crate::routing::scoring::WriteableScore;
+use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
use crate::util::logger::Logger;
use crate::util::ser::{Readable, ReadableArgs, Writeable};
/// The alphabet of characters allowed for namespaces and keys.
-pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
+pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str =
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
/// The maximum number of characters namespaces and keys may have.
pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120;
/// `primary_namespace` and `secondary_namespace`.
///
/// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
- fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Result<Vec<u8>, io::Error>;
+ fn read(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
+ ) -> Result<Vec<u8>, io::Error>;
/// Persists the given data under the given `key`.
///
/// Will create the given `primary_namespace` and `secondary_namespace` if not already present
/// in the store.
- fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Result<(), io::Error>;
+ fn write(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
+ ) -> Result<(), io::Error>;
/// Removes any data that had previously been persisted under the given `key`.
///
/// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily
/// Returns successfully if no data will be stored for the given `primary_namespace`,
/// `secondary_namespace`, and `key`, independently of whether it was present before its
/// invokation or not.
- fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Result<(), io::Error>;
+ fn remove(
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
+ ) -> Result<(), io::Error>;
/// Returns a list of keys that are stored under the given `secondary_namespace` in
/// `primary_namespace`.
///
/// Returns the keys in arbitrary order, so users requiring a particular order need to sort the
/// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown.
- fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Result<Vec<String>, io::Error>;
+ fn list(
+ &self, primary_namespace: &str, secondary_namespace: &str,
+ ) -> Result<Vec<String>, io::Error>;
}
/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
}
-
impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A
where
CM::Target: 'static + AChannelManager,
S::Target: WriteableScore<'a>,
{
fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> {
- self.write(CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
+ self.write(
+ CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
- &channel_manager.get_cm().encode())
+ &channel_manager.get_cm().encode(),
+ )
}
fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
- self.write(NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
+ self.write(
+ NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY,
- &network_graph.encode())
+ &network_graph.encode(),
+ )
}
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
- self.write(SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
+ self.write(
+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY,
- &scorer.encode())
+ &scorer.encode(),
+ )
}
}
// Then we should return InProgress rather than UnrecoverableError, implying we should probably
// just shut down the node since we're not retrying persistence!
- fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>) -> chain::ChannelMonitorUpdateStatus {
+ fn persist_new_channel(
+ &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
+ ) -> chain::ChannelMonitorUpdateStatus {
let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
match self.write(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
- &key, &monitor.encode())
- {
+ &key,
+ &monitor.encode(),
+ ) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
- Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
+ Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
}
}
- fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>) -> chain::ChannelMonitorUpdateStatus {
+ fn update_persisted_channel(
+ &self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>,
+ monitor: &ChannelMonitor<ChannelSigner>,
+ ) -> chain::ChannelMonitorUpdateStatus {
let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
match self.write(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
- &key, &monitor.encode())
- {
+ &key,
+ &monitor.encode(),
+ ) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
- Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
+ Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
}
}
monitor_name.as_str(),
) {
Ok(monitor) => monitor,
- Err(_) => return
+ Err(_) => return,
};
match self.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
&monitor,
) {
- Ok(()) => {}
- Err(_e) => return
+ Ok(()) => {},
+ Err(_e) => return,
};
let _ = self.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
let mut res = Vec::new();
for stored_key in kv_store.list(
- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE)?
- {
+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
+ )? {
if stored_key.len() < 66 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
- "Stored key has invalid length"));
+ "Stored key has invalid length",
+ ));
}
let txid = Txid::from_str(stored_key.split_at(64).0).map_err(|_| {
})?;
match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>::read(
- &mut io::Cursor::new(
- kv_store.read(CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, &stored_key)?),
+ &mut io::Cursor::new(kv_store.read(
+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
+ &stored_key,
+ )?),
(&*entropy_source, &*signer_provider),
) {
Ok((block_hash, channel_monitor)) => {
));
}
res.push((block_hash, channel_monitor));
- }
+ },
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
- "Failed to read ChannelMonitor"
+ "Failed to read ChannelMonitor",
))
- }
+ },
}
}
Ok(res)
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
- FE::Target: FeeEstimator
+ FE::Target: FeeEstimator,
{
kv_store: K,
logger: L,
entropy_source: ES,
signer_provider: SP,
broadcaster: BI,
- fee_estimator: FE
+ fee_estimator: FE,
}
#[allow(dead_code)]
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
- FE::Target: FeeEstimator
+ FE::Target: FeeEstimator,
{
/// Constructs a new [`MonitorUpdatingPersister`].
///
/// [`MonitorUpdatingPersister::cleanup_stale_updates`].
pub fn new(
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
- signer_provider: SP, broadcaster: BI, fee_estimator: FE
+ signer_provider: SP, broadcaster: BI, fee_estimator: FE,
) -> Self {
MonitorUpdatingPersister {
kv_store,
entropy_source,
signer_provider,
broadcaster,
- fee_estimator
+ fee_estimator,
}
}
/// It is extremely important that your [`KVStore::read`] implementation uses the
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
/// documentation for [`MonitorUpdatingPersister`].
- pub fn read_all_channel_monitors_with_updates(&self) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error> {
+ pub fn read_all_channel_monitors_with_updates(
+ &self,
+ ) -> Result<
+ Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
+ io::Error,
+ > {
let monitor_list = self.kv_store.list(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
/// function to accomplish this. Take care to limit the number of parallel readers.
pub fn read_channel_monitor_with_updates(
&self, monitor_key: String,
- ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> {
+ ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
+ {
let monitor_name = MonitorName::new(monitor_key)?;
let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
let mut current_update_id = monitor.get_latest_update_id();
Err(err) if err.kind() == io::ErrorKind::NotFound => {
// We can't find any more updates, so we are done.
break;
- }
+ },
Err(err) => return Err(err),
};
- monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
+ monitor
+ .update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
.map_err(|e| {
- log_error!(
- self.logger,
- "Monitor update failed. monitor: {} update: {} reason: {:?}",
- monitor_name.as_str(),
- update_name.as_str(),
- e
- );
- io::Error::new(io::ErrorKind::Other, "Monitor update failed")
- })?;
+ log_error!(
+ self.logger,
+ "Monitor update failed. monitor: {} update: {} reason: {:?}",
+ monitor_name.as_str(),
+ update_name.as_str(),
+ e
+ );
+ io::Error::new(io::ErrorKind::Other, "Monitor update failed")
+ })?;
}
Ok((block_hash, monitor))
}
/// Read a channel monitor.
fn read_monitor(
&self, monitor_name: &MonitorName,
- ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> {
+ ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
+ {
let outpoint: OutPoint = monitor_name.try_into()?;
let mut monitor_cursor = io::Cursor::new(self.kv_store.read(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
} else {
Ok((blockhash, channel_monitor))
}
- }
+ },
Err(e) => {
log_error!(
self.logger,
e,
);
Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor"))
- }
+ },
}
}
for monitor_key in monitor_keys {
let monitor_name = MonitorName::new(monitor_key)?;
let (_, current_monitor) = self.read_monitor(&monitor_name)?;
- let updates = self
- .kv_store
- .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())?;
+ let updates = self.kv_store.list(
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+ monitor_name.as_str(),
+ )?;
for update in updates {
let update_name = UpdateName::new(update)?;
// if the update_id is lower than the stored monitor, delete
}
}
-impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
- Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
+impl<
+ ChannelSigner: EcdsaChannelSigner,
+ K: Deref,
+ L: Deref,
+ ES: Deref,
+ SP: Deref,
+ BI: Deref,
+ FE: Deref,
+ > Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
where
K::Target: KVStore,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
- FE::Target: FeeEstimator
+ FE::Target: FeeEstimator,
{
/// Persists a new channel. This means writing the entire monitor to the
/// parametrized [`KVStore`].
fn persist_new_channel(
- &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>
+ &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus {
// Determine the proper key for this monitor
let monitor_name = MonitorName::from(funding_txo);
monitor_name.as_str(),
&monitor_bytes,
) {
- Ok(_) => {
- chain::ChannelMonitorUpdateStatus::Completed
- }
+ Ok(_) => chain::ChannelMonitorUpdateStatus::Completed,
Err(e) => {
log_error!(
self.logger,
e
);
chain::ChannelMonitorUpdateStatus::UnrecoverableError
- }
+ },
}
}
/// - The update is at [`CLOSED_CHANNEL_UPDATE_ID`]
fn update_persisted_channel(
&self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>,
- monitor: &ChannelMonitor<ChannelSigner>
+ monitor: &ChannelMonitor<ChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus {
if let Some(update) = update {
if update.update_id != CLOSED_CHANNEL_UPDATE_ID
e
);
chain::ChannelMonitorUpdateStatus::UnrecoverableError
- }
+ },
}
} else {
let monitor_name = MonitorName::from(funding_txo);
// the new one in order to determine the cleanup range.
let maybe_old_monitor = match monitor.get_latest_update_id() {
CLOSED_CHANNEL_UPDATE_ID => self.read_monitor(&monitor_name).ok(),
- _ => None
+ _ => None,
};
// We could write this update, but it meets criteria of our design that calls for a full monitor write.
let monitor_update_status = self.persist_new_channel(funding_txo, monitor);
if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status {
- let cleanup_range = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
- // If there is an error while reading old monitor, we skip clean up.
- maybe_old_monitor.map(|(_, ref old_monitor)| {
- let start = old_monitor.get_latest_update_id();
- // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
- let end = cmp::min(
- start.saturating_add(self.maximum_pending_updates),
- CLOSED_CHANNEL_UPDATE_ID - 1,
- );
- (start, end)
- })
- } else {
- let end = monitor.get_latest_update_id();
- let start = end.saturating_sub(self.maximum_pending_updates);
- Some((start, end))
- };
+ let cleanup_range =
+ if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
+ // If there is an error while reading old monitor, we skip clean up.
+ maybe_old_monitor.map(|(_, ref old_monitor)| {
+ let start = old_monitor.get_latest_update_id();
+ // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
+ let end = cmp::min(
+ start.saturating_add(self.maximum_pending_updates),
+ CLOSED_CHANNEL_UPDATE_ID - 1,
+ );
+ (start, end)
+ })
+ } else {
+ let end = monitor.get_latest_update_id();
+ let start = end.saturating_sub(self.maximum_pending_updates);
+ Some((start, end))
+ };
if let Some((start, end)) = cleanup_range {
self.cleanup_in_range(monitor_name, start, end);
let monitor_key = monitor_name.as_str().to_string();
let monitor = match self.read_channel_monitor_with_updates(monitor_key) {
Ok((_block_hash, monitor)) => monitor,
- Err(_) => return
+ Err(_) => return,
};
match self.kv_store.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_name.as_str(),
&monitor.encode(),
) {
- Ok(()) => {}
+ Ok(()) => {},
Err(_e) => return,
};
let _ = self.kv_store.remove(
}
}
-impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
+impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
+ MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
where
ES::Target: EntropySource + Sized,
K::Target: KVStore,
L::Target: Logger,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
- FE::Target: FeeEstimator
+ FE::Target: FeeEstimator,
{
// Cleans up monitor updates for given monitor in range `start..=end`.
fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
Ok(u) => Ok(u.into()),
Err(_) => {
Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name"))
- }
+ },
}
}
use crate::chain::ChannelMonitorUpdateStatus;
use crate::events::{ClosureReason, MessageSendEventsProvider};
use crate::ln::functional_test_utils::*;
- use crate::util::test_utils::{self, TestLogger, TestStore};
- use crate::{check_added_monitors, check_closed_broadcast};
use crate::sync::Arc;
use crate::util::test_channel_signer::TestChannelSigner;
+ use crate::util::test_utils::{self, TestLogger, TestStore};
+ use crate::{check_added_monitors, check_closed_broadcast};
const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
#[test]
fn monitor_from_outpoint_works() {
let monitor_name1 = MonitorName::from(OutPoint {
- txid: Txid::from_str("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
+ txid: Txid::from_str(
+ "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
+ )
+ .unwrap(),
index: 1,
});
- assert_eq!(monitor_name1.as_str(), "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1");
+ assert_eq!(
+ monitor_name1.as_str(),
+ "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1"
+ );
let monitor_name2 = MonitorName::from(OutPoint {
- txid: Txid::from_str("f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef").unwrap(),
+ txid: Txid::from_str(
+ "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef",
+ )
+ .unwrap(),
index: u16::MAX,
});
- assert_eq!(monitor_name2.as_str(), "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535");
+ assert_eq!(
+ monitor_name2.as_str(),
+ "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535"
+ );
}
#[test]
fn bad_monitor_string_fails() {
- assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string()).is_err());
- assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string()).is_err());
- assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string()).is_err());
+ assert!(MonitorName::new(
+ "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string()
+ )
+ .is_err());
+ assert!(MonitorName::new(
+ "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string()
+ )
+ .is_err());
+ assert!(MonitorName::new(
+ "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string()
+ )
+ .is_err());
}
// Exercise the `MonitorUpdatingPersister` with real channels and payments.
// Check that the persisted channel data is empty before any channels are
// open.
- let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
+ let mut persisted_chan_data_0 =
+ persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_0.len(), 0);
- let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
+ let mut persisted_chan_data_1 =
+ persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 0);
// Helper to make sure the channel is on the expected update ID.
macro_rules! check_persisted_data {
($expected_update_id: expr) => {
- persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
+ persisted_chan_data_0 =
+ persister_0.read_all_channel_monitors_with_updates().unwrap();
// check that we stored only one monitor
assert_eq!(persisted_chan_data_0.len(), 1);
for (_, mon) in persisted_chan_data_0.iter() {
// if the CM is at consolidation threshold, ensure no updates are stored.
let monitor_name = MonitorName::from(mon.get_funding_txo().0);
if mon.get_latest_update_id() % persister_0_max_pending_updates == 0
- || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
+ || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID
+ {
assert_eq!(
- persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
- monitor_name.as_str()).unwrap().len(),
+ persister_0
+ .kv_store
+ .list(
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+ monitor_name.as_str()
+ )
+ .unwrap()
+ .len(),
0,
"updates stored when they shouldn't be in persister 0"
);
}
}
- persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
+ persisted_chan_data_1 =
+ persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 1);
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
let monitor_name = MonitorName::from(mon.get_funding_txo().0);
// if the CM is at consolidation threshold, ensure no updates are stored.
if mon.get_latest_update_id() % persister_1_max_pending_updates == 0
- || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
+ || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID
+ {
assert_eq!(
- persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
- monitor_name.as_str()).unwrap().len(),
+ persister_1
+ .kv_store
+ .list(
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+ monitor_name.as_str()
+ )
+ .unwrap()
+ .len(),
0,
"updates stored when they shouldn't be in persister 1"
);
let (_, monitor) = &persisted_chan_data[0];
let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
// The channel should have 0 updates, as it wrote a full monitor and consolidated.
- assert_eq!(persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0);
- assert_eq!(persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0);
+ assert_eq!(
+ persister_0
+ .kv_store
+ .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())
+ .unwrap()
+ .len(),
+ 0
+ );
+ assert_eq!(
+ persister_1
+ .kv_store
+ .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())
+ .unwrap()
+ .len(),
+ 0
+ );
}
// Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a
let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap();
let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0];
- let txid = Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap();
+ let txid =
+ Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be")
+ .unwrap();
let test_txo = OutPoint { txid, index: 0 };
let ro_persister = MonitorUpdatingPersister {
match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) {
ChannelMonitorUpdateStatus::UnrecoverableError => {
// correct result
- }
+ },
ChannelMonitorUpdateStatus::Completed => {
panic!("Completed persisting new channel when shouldn't have")
- }
+ },
ChannelMonitorUpdateStatus::InProgress => {
panic!("Returned InProgress when shouldn't have")
- }
+ },
}
match ro_persister.update_persisted_channel(test_txo, Some(cmu), &added_monitors[0].1) {
ChannelMonitorUpdateStatus::UnrecoverableError => {
// correct result
- }
+ },
ChannelMonitorUpdateStatus::Completed => {
panic!("Completed persisting new channel when shouldn't have")
- }
+ },
ChannelMonitorUpdateStatus::InProgress => {
panic!("Returned InProgress when shouldn't have")
- }
+ },
}
added_monitors.clear();
}
let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
persister_0
.kv_store
- .write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str(), &[0u8; 1])
+ .write(
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+ monitor_name.as_str(),
+ UpdateName::from(1).as_str(),
+ &[0u8; 1],
+ )
.unwrap();
// Do the stale update cleanup
// Confirm the stale update is unreadable/gone
assert!(persister_0
.kv_store
- .read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str())
+ .read(
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+ monitor_name.as_str(),
+ UpdateName::from(1).as_str()
+ )
.is_err());
// Force close.
// Write an update near u64::MAX
persister_0
.kv_store
- .write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str(), &[0u8; 1])
+ .write(
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+ monitor_name.as_str(),
+ UpdateName::from(u64::MAX - 1).as_str(),
+ &[0u8; 1],
+ )
.unwrap();
// Do the stale update cleanup
// Confirm the stale update is unreadable/gone
assert!(persister_0
.kv_store
- .read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str())
+ .read(
+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+ monitor_name.as_str(),
+ UpdateName::from(u64::MAX - 1).as_str()
+ )
.is_err());
}
- fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool where P::Target: Persist<ChannelSigner> {
+ fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool
+ where
+ P::Target: Persist<ChannelSigner>,
+ {
true
}