]> git.bitcoin.ninja Git - rust-lightning/commitdiff
`rustfmt`: Run on `util/persist.rs`
authorElias Rohrer <dev@tnull.de>
Wed, 18 Sep 2024 07:40:36 +0000 (09:40 +0200)
committerElias Rohrer <dev@tnull.de>
Thu, 19 Sep 2024 08:09:22 +0000 (10:09 +0200)
lightning/src/util/persist.rs

index 23d871bd49fe2b1bee97f69e5af84409f28ad4f4..d546749dd483d0b480a20f377f04b9b9cd66f727 100644 (file)
 //!
 //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
 
+use bitcoin::{BlockHash, Txid};
 use core::cmp;
 use core::ops::Deref;
 use core::str::FromStr;
-use bitcoin::{BlockHash, Txid};
 
-use crate::{io, log_error};
 use crate::prelude::*;
+use crate::{io, log_error};
 
 use crate::chain;
 use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use crate::chain::chainmonitor::Persist;
-use crate::sign::{EntropySource, ecdsa::EcdsaChannelSigner, SignerProvider};
+use crate::chain::channelmonitor::{
+       ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID,
+};
 use crate::chain::transaction::OutPoint;
-use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID};
 use crate::ln::channelmanager::AChannelManager;
 use crate::routing::gossip::NetworkGraph;
 use crate::routing::scoring::WriteableScore;
+use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
 use crate::util::logger::Logger;
 use crate::util::ser::{Readable, ReadableArgs, Writeable};
 
 /// The alphabet of characters allowed for namespaces and keys.
-pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
+pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str =
+       "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
 
 /// The maximum number of characters namespaces and keys may have.
 pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120;
@@ -124,12 +127,16 @@ pub trait KVStore {
        /// `primary_namespace` and `secondary_namespace`.
        ///
        /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
-       fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Result<Vec<u8>, io::Error>;
+       fn read(
+               &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
+       ) -> Result<Vec<u8>, io::Error>;
        /// Persists the given data under the given `key`.
        ///
        /// Will create the given `primary_namespace` and `secondary_namespace` if not already present
        /// in the store.
-       fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Result<(), io::Error>;
+       fn write(
+               &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
+       ) -> Result<(), io::Error>;
        /// Removes any data that had previously been persisted under the given `key`.
        ///
        /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily
@@ -145,13 +152,17 @@ pub trait KVStore {
        /// Returns successfully if no data will be stored for the given `primary_namespace`,
        /// `secondary_namespace`, and `key`, independently of whether it was present before its
        /// invokation or not.
-       fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Result<(), io::Error>;
+       fn remove(
+               &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
+       ) -> Result<(), io::Error>;
        /// Returns a list of keys that are stored under the given `secondary_namespace` in
        /// `primary_namespace`.
        ///
        /// Returns the keys in arbitrary order, so users requiring a particular order need to sort the
        /// returned keys. Returns an empty list if `primary_namespace` or `secondary_namespace` is unknown.
-       fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Result<Vec<String>, io::Error>;
+       fn list(
+               &self, primary_namespace: &str, secondary_namespace: &str,
+       ) -> Result<Vec<String>, io::Error>;
 }
 
 /// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
@@ -175,7 +186,6 @@ where
        fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
 }
 
-
 impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A
 where
        CM::Target: 'static + AChannelManager,
@@ -183,24 +193,30 @@ where
        S::Target: WriteableScore<'a>,
 {
        fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> {
-               self.write(CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
+               self.write(
+                       CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
                        CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
                        CHANNEL_MANAGER_PERSISTENCE_KEY,
-                       &channel_manager.get_cm().encode())
+                       &channel_manager.get_cm().encode(),
+               )
        }
 
        fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
-               self.write(NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
+               self.write(
+                       NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
                        NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
                        NETWORK_GRAPH_PERSISTENCE_KEY,
-                       &network_graph.encode())
+                       &network_graph.encode(),
+               )
        }
 
        fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
-               self.write(SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
+               self.write(
+                       SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
                        SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
                        SCORER_PERSISTENCE_KEY,
-                       &scorer.encode())
+                       &scorer.encode(),
+               )
        }
 }
 
@@ -210,27 +226,34 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSign
        // Then we should return InProgress rather than UnrecoverableError, implying we should probably
        // just shut down the node since we're not retrying persistence!
 
-       fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>) -> chain::ChannelMonitorUpdateStatus {
+       fn persist_new_channel(
+               &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
+       ) -> chain::ChannelMonitorUpdateStatus {
                let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
                match self.write(
                        CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
                        CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
-                       &key, &monitor.encode())
-               {
+                       &key,
+                       &monitor.encode(),
+               ) {
                        Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
-                       Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
+                       Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
                }
        }
 
-       fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>) -> chain::ChannelMonitorUpdateStatus {
+       fn update_persisted_channel(
+               &self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>,
+               monitor: &ChannelMonitor<ChannelSigner>,
+       ) -> chain::ChannelMonitorUpdateStatus {
                let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
                match self.write(
                        CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
                        CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
-                       &key, &monitor.encode())
-               {
+                       &key,
+                       &monitor.encode(),
+               ) {
                        Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
-                       Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
+                       Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
                }
        }
 
@@ -242,7 +265,7 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSign
                        monitor_name.as_str(),
                ) {
                        Ok(monitor) => monitor,
-                       Err(_) => return
+                       Err(_) => return,
                };
                match self.write(
                        ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -250,8 +273,8 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSign
                        monitor_name.as_str(),
                        &monitor,
                ) {
-                       Ok(()) => {}
-                       Err(_e) => return
+                       Ok(()) => {},
+                       Err(_e) => return,
                };
                let _ = self.remove(
                        CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -274,12 +297,14 @@ where
        let mut res = Vec::new();
 
        for stored_key in kv_store.list(
-               CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE)?
-       {
+               CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
+               CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
+       )? {
                if stored_key.len() < 66 {
                        return Err(io::Error::new(
                                io::ErrorKind::InvalidData,
-                               "Stored key has invalid length"));
+                               "Stored key has invalid length",
+                       ));
                }
 
                let txid = Txid::from_str(stored_key.split_at(64).0).map_err(|_| {
@@ -291,8 +316,11 @@ where
                })?;
 
                match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>::read(
-                       &mut io::Cursor::new(
-                               kv_store.read(CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, &stored_key)?),
+                       &mut io::Cursor::new(kv_store.read(
+                               CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
+                               CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
+                               &stored_key,
+                       )?),
                        (&*entropy_source, &*signer_provider),
                ) {
                        Ok((block_hash, channel_monitor)) => {
@@ -305,13 +333,13 @@ where
                                        ));
                                }
                                res.push((block_hash, channel_monitor));
-                       }
+                       },
                        Err(_) => {
                                return Err(io::Error::new(
                                        io::ErrorKind::InvalidData,
-                                       "Failed to read ChannelMonitor"
+                                       "Failed to read ChannelMonitor",
                                ))
-                       }
+                       },
                }
        }
        Ok(res)
@@ -407,7 +435,7 @@ where
        ES::Target: EntropySource + Sized,
        SP::Target: SignerProvider + Sized,
        BI::Target: BroadcasterInterface,
-       FE::Target: FeeEstimator
+       FE::Target: FeeEstimator,
 {
        kv_store: K,
        logger: L,
@@ -415,7 +443,7 @@ where
        entropy_source: ES,
        signer_provider: SP,
        broadcaster: BI,
-       fee_estimator: FE
+       fee_estimator: FE,
 }
 
 #[allow(dead_code)]
@@ -427,7 +455,7 @@ where
        ES::Target: EntropySource + Sized,
        SP::Target: SignerProvider + Sized,
        BI::Target: BroadcasterInterface,
-       FE::Target: FeeEstimator
+       FE::Target: FeeEstimator,
 {
        /// Constructs a new [`MonitorUpdatingPersister`].
        ///
@@ -447,7 +475,7 @@ where
        /// [`MonitorUpdatingPersister::cleanup_stale_updates`].
        pub fn new(
                kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
-               signer_provider: SP, broadcaster: BI, fee_estimator: FE
+               signer_provider: SP, broadcaster: BI, fee_estimator: FE,
        ) -> Self {
                MonitorUpdatingPersister {
                        kv_store,
@@ -456,7 +484,7 @@ where
                        entropy_source,
                        signer_provider,
                        broadcaster,
-                       fee_estimator
+                       fee_estimator,
                }
        }
 
@@ -465,7 +493,12 @@ where
        /// It is extremely important that your [`KVStore::read`] implementation uses the
        /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
        /// documentation for [`MonitorUpdatingPersister`].
-       pub fn read_all_channel_monitors_with_updates(&self) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error> {
+       pub fn read_all_channel_monitors_with_updates(
+               &self,
+       ) -> Result<
+               Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
+               io::Error,
+       > {
                let monitor_list = self.kv_store.list(
                        CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
                        CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -496,7 +529,8 @@ where
        /// function to accomplish this. Take care to limit the number of parallel readers.
        pub fn read_channel_monitor_with_updates(
                &self, monitor_key: String,
-       ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> {
+       ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
+       {
                let monitor_name = MonitorName::new(monitor_key)?;
                let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
                let mut current_update_id = monitor.get_latest_update_id();
@@ -511,21 +545,22 @@ where
                                Err(err) if err.kind() == io::ErrorKind::NotFound => {
                                        // We can't find any more updates, so we are done.
                                        break;
-                               }
+                               },
                                Err(err) => return Err(err),
                        };
 
-                       monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
+                       monitor
+                               .update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
                                .map_err(|e| {
-                                       log_error!(
-                                               self.logger,
-                                               "Monitor update failed. monitor: {} update: {} reason: {:?}",
-                                               monitor_name.as_str(),
-                                               update_name.as_str(),
-                                               e
-                                       );
-                                       io::Error::new(io::ErrorKind::Other, "Monitor update failed")
-                               })?;
+                               log_error!(
+                                       self.logger,
+                                       "Monitor update failed. monitor: {} update: {} reason: {:?}",
+                                       monitor_name.as_str(),
+                                       update_name.as_str(),
+                                       e
+                               );
+                               io::Error::new(io::ErrorKind::Other, "Monitor update failed")
+                       })?;
                }
                Ok((block_hash, monitor))
        }
@@ -533,7 +568,8 @@ where
        /// Read a channel monitor.
        fn read_monitor(
                &self, monitor_name: &MonitorName,
-       ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> {
+       ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
+       {
                let outpoint: OutPoint = monitor_name.try_into()?;
                let mut monitor_cursor = io::Cursor::new(self.kv_store.read(
                        CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -564,7 +600,7 @@ where
                                } else {
                                        Ok((blockhash, channel_monitor))
                                }
-                       }
+                       },
                        Err(e) => {
                                log_error!(
                                        self.logger,
@@ -573,7 +609,7 @@ where
                                        e,
                                );
                                Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor"))
-                       }
+                       },
                }
        }
 
@@ -613,9 +649,10 @@ where
                for monitor_key in monitor_keys {
                        let monitor_name = MonitorName::new(monitor_key)?;
                        let (_, current_monitor) = self.read_monitor(&monitor_name)?;
-                       let updates = self
-                               .kv_store
-                               .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())?;
+                       let updates = self.kv_store.list(
+                               CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+                               monitor_name.as_str(),
+                       )?;
                        for update in updates {
                                let update_name = UpdateName::new(update)?;
                                // if the update_id is lower than the stored monitor, delete
@@ -633,20 +670,27 @@ where
        }
 }
 
-impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
-       Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
+impl<
+               ChannelSigner: EcdsaChannelSigner,
+               K: Deref,
+               L: Deref,
+               ES: Deref,
+               SP: Deref,
+               BI: Deref,
+               FE: Deref,
+       > Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
 where
        K::Target: KVStore,
        L::Target: Logger,
        ES::Target: EntropySource + Sized,
        SP::Target: SignerProvider + Sized,
        BI::Target: BroadcasterInterface,
-       FE::Target: FeeEstimator
+       FE::Target: FeeEstimator,
 {
        /// Persists a new channel. This means writing the entire monitor to the
        /// parametrized [`KVStore`].
        fn persist_new_channel(
-               &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>
+               &self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
        ) -> chain::ChannelMonitorUpdateStatus {
                // Determine the proper key for this monitor
                let monitor_name = MonitorName::from(funding_txo);
@@ -662,9 +706,7 @@ where
                        monitor_name.as_str(),
                        &monitor_bytes,
                ) {
-                       Ok(_) => {
-                               chain::ChannelMonitorUpdateStatus::Completed
-                       }
+                       Ok(_) => chain::ChannelMonitorUpdateStatus::Completed,
                        Err(e) => {
                                log_error!(
                                        self.logger,
@@ -675,7 +717,7 @@ where
                                        e
                                );
                                chain::ChannelMonitorUpdateStatus::UnrecoverableError
-                       }
+                       },
                }
        }
 
@@ -690,7 +732,7 @@ where
        ///   - The update is at [`CLOSED_CHANNEL_UPDATE_ID`]
        fn update_persisted_channel(
                &self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>,
-               monitor: &ChannelMonitor<ChannelSigner>
+               monitor: &ChannelMonitor<ChannelSigner>,
        ) -> chain::ChannelMonitorUpdateStatus {
                if let Some(update) = update {
                        if update.update_id != CLOSED_CHANNEL_UPDATE_ID
@@ -715,7 +757,7 @@ where
                                                        e
                                                );
                                                chain::ChannelMonitorUpdateStatus::UnrecoverableError
-                                       }
+                                       },
                                }
                        } else {
                                let monitor_name = MonitorName::from(funding_txo);
@@ -723,29 +765,30 @@ where
                                // the new one in order to determine the cleanup range.
                                let maybe_old_monitor = match monitor.get_latest_update_id() {
                                        CLOSED_CHANNEL_UPDATE_ID => self.read_monitor(&monitor_name).ok(),
-                                       _ => None
+                                       _ => None,
                                };
 
                                // We could write this update, but it meets criteria of our design that calls for a full monitor write.
                                let monitor_update_status = self.persist_new_channel(funding_txo, monitor);
 
                                if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status {
-                                       let cleanup_range = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
-                                               // If there is an error while reading old monitor, we skip clean up.
-                                               maybe_old_monitor.map(|(_, ref old_monitor)| {
-                                                       let start = old_monitor.get_latest_update_id();
-                                                       // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
-                                                       let end = cmp::min(
-                                                               start.saturating_add(self.maximum_pending_updates),
-                                                               CLOSED_CHANNEL_UPDATE_ID - 1,
-                                                       );
-                                                       (start, end)
-                                               })
-                                       } else {
-                                               let end = monitor.get_latest_update_id();
-                                               let start = end.saturating_sub(self.maximum_pending_updates);
-                                               Some((start, end))
-                                       };
+                                       let cleanup_range =
+                                               if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
+                                                       // If there is an error while reading old monitor, we skip clean up.
+                                                       maybe_old_monitor.map(|(_, ref old_monitor)| {
+                                                               let start = old_monitor.get_latest_update_id();
+                                                               // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
+                                                               let end = cmp::min(
+                                                                       start.saturating_add(self.maximum_pending_updates),
+                                                                       CLOSED_CHANNEL_UPDATE_ID - 1,
+                                                               );
+                                                               (start, end)
+                                                       })
+                                               } else {
+                                                       let end = monitor.get_latest_update_id();
+                                                       let start = end.saturating_sub(self.maximum_pending_updates);
+                                                       Some((start, end))
+                                               };
 
                                        if let Some((start, end)) = cleanup_range {
                                                self.cleanup_in_range(monitor_name, start, end);
@@ -765,7 +808,7 @@ where
                let monitor_key = monitor_name.as_str().to_string();
                let monitor = match self.read_channel_monitor_with_updates(monitor_key) {
                        Ok((_block_hash, monitor)) => monitor,
-                       Err(_) => return
+                       Err(_) => return,
                };
                match self.kv_store.write(
                        ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -773,7 +816,7 @@ where
                        monitor_name.as_str(),
                        &monitor.encode(),
                ) {
-                       Ok(()) => {}
+                       Ok(()) => {},
                        Err(_e) => return,
                };
                let _ = self.kv_store.remove(
@@ -785,14 +828,15 @@ where
        }
 }
 
-impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
+impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
+       MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
 where
        ES::Target: EntropySource + Sized,
        K::Target: KVStore,
        L::Target: Logger,
        SP::Target: SignerProvider + Sized,
        BI::Target: BroadcasterInterface,
-       FE::Target: FeeEstimator
+       FE::Target: FeeEstimator,
 {
        // Cleans up monitor updates for given monitor in range `start..=end`.
        fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
@@ -883,7 +927,7 @@ impl UpdateName {
                        Ok(u) => Ok(u.into()),
                        Err(_) => {
                                Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name"))
-                       }
+                       },
                }
        }
 
@@ -905,10 +949,10 @@ mod tests {
        use crate::chain::ChannelMonitorUpdateStatus;
        use crate::events::{ClosureReason, MessageSendEventsProvider};
        use crate::ln::functional_test_utils::*;
-       use crate::util::test_utils::{self, TestLogger, TestStore};
-       use crate::{check_added_monitors, check_closed_broadcast};
        use crate::sync::Arc;
        use crate::util::test_channel_signer::TestChannelSigner;
+       use crate::util::test_utils::{self, TestLogger, TestStore};
+       use crate::{check_added_monitors, check_closed_broadcast};
 
        const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
 
@@ -928,23 +972,44 @@ mod tests {
        #[test]
        fn monitor_from_outpoint_works() {
                let monitor_name1 = MonitorName::from(OutPoint {
-                       txid: Txid::from_str("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(),
+                       txid: Txid::from_str(
+                               "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
+                       )
+                       .unwrap(),
                        index: 1,
                });
-               assert_eq!(monitor_name1.as_str(), "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1");
+               assert_eq!(
+                       monitor_name1.as_str(),
+                       "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1"
+               );
 
                let monitor_name2 = MonitorName::from(OutPoint {
-                       txid: Txid::from_str("f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef").unwrap(),
+                       txid: Txid::from_str(
+                               "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef",
+                       )
+                       .unwrap(),
                        index: u16::MAX,
                });
-               assert_eq!(monitor_name2.as_str(), "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535");
+               assert_eq!(
+                       monitor_name2.as_str(),
+                       "f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535"
+               );
        }
 
        #[test]
        fn bad_monitor_string_fails() {
-               assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string()).is_err());
-               assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string()).is_err());
-               assert!(MonitorName::new("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string()).is_err());
+               assert!(MonitorName::new(
+                       "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef".to_string()
+               )
+               .is_err());
+               assert!(MonitorName::new(
+                       "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536".to_string()
+               )
+               .is_err());
+               assert!(MonitorName::new(
+                       "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21".to_string()
+               )
+               .is_err());
        }
 
        // Exercise the `MonitorUpdatingPersister` with real channels and payments.
@@ -997,15 +1062,18 @@ mod tests {
 
                // Check that the persisted channel data is empty before any channels are
                // open.
-               let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
+               let mut persisted_chan_data_0 =
+                       persister_0.read_all_channel_monitors_with_updates().unwrap();
                assert_eq!(persisted_chan_data_0.len(), 0);
-               let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
+               let mut persisted_chan_data_1 =
+                       persister_1.read_all_channel_monitors_with_updates().unwrap();
                assert_eq!(persisted_chan_data_1.len(), 0);
 
                // Helper to make sure the channel is on the expected update ID.
                macro_rules! check_persisted_data {
                        ($expected_update_id: expr) => {
-                               persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
+                               persisted_chan_data_0 =
+                                       persister_0.read_all_channel_monitors_with_updates().unwrap();
                                // check that we stored only one monitor
                                assert_eq!(persisted_chan_data_0.len(), 1);
                                for (_, mon) in persisted_chan_data_0.iter() {
@@ -1015,26 +1083,41 @@ mod tests {
                                        // if the CM is at consolidation threshold, ensure no updates are stored.
                                        let monitor_name = MonitorName::from(mon.get_funding_txo().0);
                                        if mon.get_latest_update_id() % persister_0_max_pending_updates == 0
-                                                       || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
+                                               || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID
+                                       {
                                                assert_eq!(
-                                                       persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
-                                                               monitor_name.as_str()).unwrap().len(),
+                                                       persister_0
+                                                               .kv_store
+                                                               .list(
+                                                                       CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+                                                                       monitor_name.as_str()
+                                                               )
+                                                               .unwrap()
+                                                               .len(),
                                                        0,
                                                        "updates stored when they shouldn't be in persister 0"
                                                );
                                        }
                                }
-                               persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
+                               persisted_chan_data_1 =
+                                       persister_1.read_all_channel_monitors_with_updates().unwrap();
                                assert_eq!(persisted_chan_data_1.len(), 1);
                                for (_, mon) in persisted_chan_data_1.iter() {
                                        assert_eq!(mon.get_latest_update_id(), $expected_update_id);
                                        let monitor_name = MonitorName::from(mon.get_funding_txo().0);
                                        // if the CM is at consolidation threshold, ensure no updates are stored.
                                        if mon.get_latest_update_id() % persister_1_max_pending_updates == 0
-                                                       || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
+                                               || mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID
+                                       {
                                                assert_eq!(
-                                                       persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
-                                                               monitor_name.as_str()).unwrap().len(),
+                                                       persister_1
+                                                               .kv_store
+                                                               .list(
+                                                                       CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+                                                                       monitor_name.as_str()
+                                                               )
+                                                               .unwrap()
+                                                               .len(),
                                                        0,
                                                        "updates stored when they shouldn't be in persister 1"
                                                );
@@ -1102,8 +1185,22 @@ mod tests {
                let (_, monitor) = &persisted_chan_data[0];
                let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
                // The channel should have 0 updates, as it wrote a full monitor and consolidated.
-               assert_eq!(persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0);
-               assert_eq!(persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str()).unwrap().len(), 0);
+               assert_eq!(
+                       persister_0
+                               .kv_store
+                               .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())
+                               .unwrap()
+                               .len(),
+                       0
+               );
+               assert_eq!(
+                       persister_1
+                               .kv_store
+                               .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str())
+                               .unwrap()
+                               .len(),
+                       0
+               );
        }
 
        // Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a
@@ -1126,7 +1223,9 @@ mod tests {
                        let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
                        let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap();
                        let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0];
-                       let txid = Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap();
+                       let txid =
+                               Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be")
+                                       .unwrap();
                        let test_txo = OutPoint { txid, index: 0 };
 
                        let ro_persister = MonitorUpdatingPersister {
@@ -1141,24 +1240,24 @@ mod tests {
                        match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) {
                                ChannelMonitorUpdateStatus::UnrecoverableError => {
                                        // correct result
-                               }
+                               },
                                ChannelMonitorUpdateStatus::Completed => {
                                        panic!("Completed persisting new channel when shouldn't have")
-                               }
+                               },
                                ChannelMonitorUpdateStatus::InProgress => {
                                        panic!("Returned InProgress when shouldn't have")
-                               }
+                               },
                        }
                        match ro_persister.update_persisted_channel(test_txo, Some(cmu), &added_monitors[0].1) {
                                ChannelMonitorUpdateStatus::UnrecoverableError => {
                                        // correct result
-                               }
+                               },
                                ChannelMonitorUpdateStatus::Completed => {
                                        panic!("Completed persisting new channel when shouldn't have")
-                               }
+                               },
                                ChannelMonitorUpdateStatus::InProgress => {
                                        panic!("Returned InProgress when shouldn't have")
-                               }
+                               },
                        }
                        added_monitors.clear();
                }
@@ -1228,7 +1327,12 @@ mod tests {
                let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
                persister_0
                        .kv_store
-                       .write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str(), &[0u8; 1])
+                       .write(
+                               CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+                               monitor_name.as_str(),
+                               UpdateName::from(1).as_str(),
+                               &[0u8; 1],
+                       )
                        .unwrap();
 
                // Do the stale update cleanup
@@ -1237,7 +1341,11 @@ mod tests {
                // Confirm the stale update is unreadable/gone
                assert!(persister_0
                        .kv_store
-                       .read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(1).as_str())
+                       .read(
+                               CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+                               monitor_name.as_str(),
+                               UpdateName::from(1).as_str()
+                       )
                        .is_err());
 
                // Force close.
@@ -1253,7 +1361,12 @@ mod tests {
                // Write an update near u64::MAX
                persister_0
                        .kv_store
-                       .write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str(), &[0u8; 1])
+                       .write(
+                               CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+                               monitor_name.as_str(),
+                               UpdateName::from(u64::MAX - 1).as_str(),
+                               &[0u8; 1],
+                       )
                        .unwrap();
 
                // Do the stale update cleanup
@@ -1262,11 +1375,18 @@ mod tests {
                // Confirm the stale update is unreadable/gone
                assert!(persister_0
                        .kv_store
-                       .read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_name.as_str(), UpdateName::from(u64::MAX - 1).as_str())
+                       .read(
+                               CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
+                               monitor_name.as_str(),
+                               UpdateName::from(u64::MAX - 1).as_str()
+                       )
                        .is_err());
        }
 
-       fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool where P::Target: Persist<ChannelSigner> {
+       fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool
+       where
+               P::Target: Persist<ChannelSigner>,
+       {
                true
        }