]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Store Broadcaster and FeeEstimator on MonitorUpdatingPersister.
authorArik Sosman <git@arik.io>
Wed, 28 Aug 2024 16:32:35 +0000 (09:32 -0700)
committerArik Sosman <git@arik.io>
Wed, 28 Aug 2024 19:19:38 +0000 (12:19 -0700)
`MonitorUpdatingPersister` does not currently correctly archive
monitors because it neglects any unapplied updates. In order to start
applying these updates, the archiving methods will require access to
instances of `BroadcasterInterface` and `FeeEstimator`.

This commit requires that the `MonitorUpdatingPersister` be
instantiated with those instances, obviating the need for passing
them around, and laying the foundation for the following commit.

lightning/src/util/persist.rs

index c30d13b31873999e9ec0bb3e7606b3310599ea33..a845cc1780020eb41bd427d66aab4f60f2749a37 100644 (file)
@@ -400,28 +400,34 @@ where
 /// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
 /// would like to get rid of them, consider using the
 /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
-pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref>
+pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
 where
        K::Target: KVStore,
        L::Target: Logger,
        ES::Target: EntropySource + Sized,
        SP::Target: SignerProvider + Sized,
+       BI::Target: BroadcasterInterface,
+       FE::Target: FeeEstimator
 {
        kv_store: K,
        logger: L,
        maximum_pending_updates: u64,
        entropy_source: ES,
        signer_provider: SP,
+       broadcaster: BI,
+       fee_estimator: FE
 }
 
 #[allow(dead_code)]
-impl<K: Deref, L: Deref, ES: Deref, SP: Deref>
-       MonitorUpdatingPersister<K, L, ES, SP>
+impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
+       MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
 where
        K::Target: KVStore,
        L::Target: Logger,
        ES::Target: EntropySource + Sized,
        SP::Target: SignerProvider + Sized,
+       BI::Target: BroadcasterInterface,
+       FE::Target: FeeEstimator
 {
        /// Constructs a new [`MonitorUpdatingPersister`].
        ///
@@ -441,7 +447,7 @@ where
        /// [`MonitorUpdatingPersister::cleanup_stale_updates`].
        pub fn new(
                kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
-               signer_provider: SP,
+               signer_provider: SP, broadcaster: BI, fee_estimator: FE
        ) -> Self {
                MonitorUpdatingPersister {
                        kv_store,
@@ -449,6 +455,8 @@ where
                        maximum_pending_updates,
                        entropy_source,
                        signer_provider,
+                       broadcaster,
+                       fee_estimator
                }
        }
 
@@ -457,24 +465,14 @@ where
        /// It is extremely important that your [`KVStore::read`] implementation uses the
        /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
        /// documentation for [`MonitorUpdatingPersister`].
-       pub fn read_all_channel_monitors_with_updates<B: Deref, F: Deref>(
-               &self, broadcaster: &B, fee_estimator: &F,
-       ) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
-       where
-               B::Target: BroadcasterInterface,
-               F::Target: FeeEstimator,
-       {
+       pub fn read_all_channel_monitors_with_updates(&self) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error> {
                let monitor_list = self.kv_store.list(
                        CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
                        CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
                )?;
                let mut res = Vec::with_capacity(monitor_list.len());
                for monitor_key in monitor_list {
-                       res.push(self.read_channel_monitor_with_updates(
-                               broadcaster,
-                               fee_estimator,
-                               monitor_key,
-                       )?)
+                       res.push(self.read_channel_monitor_with_updates(monitor_key)?)
                }
                Ok(res)
        }
@@ -496,13 +494,9 @@ where
        ///
        /// Loading a large number of monitors will be faster if done in parallel. You can use this
        /// function to accomplish this. Take care to limit the number of parallel readers.
-       pub fn read_channel_monitor_with_updates<B: Deref, F: Deref>(
-               &self, broadcaster: &B, fee_estimator: &F, monitor_key: String,
-       ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
-       where
-               B::Target: BroadcasterInterface,
-               F::Target: FeeEstimator,
-       {
+       pub fn read_channel_monitor_with_updates(
+               &self, monitor_key: String,
+       ) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> {
                let monitor_name = MonitorName::new(monitor_key)?;
                let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
                let mut current_update_id = monitor.get_latest_update_id();
@@ -521,7 +515,7 @@ where
                                Err(err) => return Err(err),
                        };
 
-                       monitor.update_monitor(&update, broadcaster, fee_estimator, &self.logger)
+                       monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
                                .map_err(|e| {
                                        log_error!(
                                                self.logger,
@@ -639,13 +633,15 @@ where
        }
 }
 
-impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref>
-       Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP>
+impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
+       Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
 where
        K::Target: KVStore,
        L::Target: Logger,
        ES::Target: EntropySource + Sized,
        SP::Target: SignerProvider + Sized,
+       BI::Target: BroadcasterInterface,
+       FE::Target: FeeEstimator
 {
        /// Persists a new channel. This means writing the entire monitor to the
        /// parametrized [`KVStore`].
@@ -788,12 +784,14 @@ where
        }
 }
 
-impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
+impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
 where
        ES::Target: EntropySource + Sized,
        K::Target: KVStore,
        L::Target: Logger,
-       SP::Target: SignerProvider + Sized
+       SP::Target: SignerProvider + Sized,
+       BI::Target: BroadcasterInterface,
+       FE::Target: FeeEstimator
 {
        // Cleans up monitor updates for given monitor in range `start..=end`.
        fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
@@ -962,6 +960,8 @@ mod tests {
                        maximum_pending_updates: persister_0_max_pending_updates,
                        entropy_source: &chanmon_cfgs[0].keys_manager,
                        signer_provider: &chanmon_cfgs[0].keys_manager,
+                       broadcaster: &chanmon_cfgs[0].tx_broadcaster,
+                       fee_estimator: &chanmon_cfgs[0].fee_estimator,
                };
                let persister_1 = MonitorUpdatingPersister {
                        kv_store: &TestStore::new(false),
@@ -969,6 +969,8 @@ mod tests {
                        maximum_pending_updates: persister_1_max_pending_updates,
                        entropy_source: &chanmon_cfgs[1].keys_manager,
                        signer_provider: &chanmon_cfgs[1].keys_manager,
+                       broadcaster: &chanmon_cfgs[1].tx_broadcaster,
+                       fee_estimator: &chanmon_cfgs[1].fee_estimator,
                };
                let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
                let chain_mon_0 = test_utils::TestChainMonitor::new(
@@ -991,23 +993,18 @@ mod tests {
                node_cfgs[1].chain_monitor = chain_mon_1;
                let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
                let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-               let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
-               let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;
 
                // Check that the persisted channel data is empty before any channels are
                // open.
-               let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
-                       &broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
+               let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
                assert_eq!(persisted_chan_data_0.len(), 0);
-               let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
-                       &broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap();
+               let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
                assert_eq!(persisted_chan_data_1.len(), 0);
 
                // Helper to make sure the channel is on the expected update ID.
                macro_rules! check_persisted_data {
                        ($expected_update_id: expr) => {
-                               persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
-                                       &broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
+                               persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
                                // check that we stored only one monitor
                                assert_eq!(persisted_chan_data_0.len(), 1);
                                for (_, mon) in persisted_chan_data_0.iter() {
@@ -1026,8 +1023,7 @@ mod tests {
                                                );
                                        }
                                }
-                               persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
-                                       &broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap();
+                               persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
                                assert_eq!(persisted_chan_data_1.len(), 1);
                                for (_, mon) in persisted_chan_data_1.iter() {
                                        assert_eq!(mon.get_latest_update_id(), $expected_update_id);
@@ -1095,7 +1091,7 @@ mod tests {
                check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID);
 
                // Make sure the expected number of stale updates is present.
-               let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
+               let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
                let (_, monitor) = &persisted_chan_data[0];
                let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
                // The channel should have 0 updates, as it wrote a full monitor and consolidated.
@@ -1129,6 +1125,8 @@ mod tests {
                                maximum_pending_updates: 11,
                                entropy_source: node_cfgs[0].keys_manager,
                                signer_provider: node_cfgs[0].keys_manager,
+                               broadcaster: node_cfgs[0].tx_broadcaster,
+                               fee_estimator: node_cfgs[0].fee_estimator,
                        };
                        match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) {
                                ChannelMonitorUpdateStatus::UnrecoverableError => {
@@ -1168,6 +1166,8 @@ mod tests {
                        maximum_pending_updates: test_max_pending_updates,
                        entropy_source: &chanmon_cfgs[0].keys_manager,
                        signer_provider: &chanmon_cfgs[0].keys_manager,
+                       broadcaster: &chanmon_cfgs[0].tx_broadcaster,
+                       fee_estimator: &chanmon_cfgs[0].fee_estimator,
                };
                let persister_1 = MonitorUpdatingPersister {
                        kv_store: &TestStore::new(false),
@@ -1175,6 +1175,8 @@ mod tests {
                        maximum_pending_updates: test_max_pending_updates,
                        entropy_source: &chanmon_cfgs[1].keys_manager,
                        signer_provider: &chanmon_cfgs[1].keys_manager,
+                       broadcaster: &chanmon_cfgs[1].tx_broadcaster,
+                       fee_estimator: &chanmon_cfgs[1].fee_estimator,
                };
                let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
                let chain_mon_0 = test_utils::TestChainMonitor::new(
@@ -1198,11 +1200,9 @@ mod tests {
                let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
                let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
 
-               let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
-
                // Check that the persisted channel data is empty before any channels are
                // open.
-               let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
+               let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
                assert_eq!(persisted_chan_data.len(), 0);
 
                // Create some initial channel
@@ -1213,7 +1213,7 @@ mod tests {
                send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
 
                // Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible)
-               let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
+               let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
                let (_, monitor) = &persisted_chan_data[0];
                let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
                persister_0