From 61197390d60b0190ed0d60e81ea5da70bd79322d Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 28 Aug 2024 09:32:35 -0700 Subject: [PATCH] Store Broadcaster and FeeEstimator on MonitorUpdatingPersister. `MonitorUpdatingPersister` does not currently correctly archive monitors because it neglects any unapplied updates. In order to start applying these updates, the archiving methods will require access to instances of `BroadcasterInterface` and `FeeEstimator`. This commit requires that the `MonitorUpdatingPersister` be instantiated with those instances, obviating the need for passing them around, and laying the foundation for the following commit. --- lightning/src/util/persist.rs | 86 +++++++++++++++++------------------ 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index c30d13b31..a845cc178 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -400,28 +400,34 @@ where /// If you have many stale updates stored (such as after a crash with pending lazy deletes), and /// would like to get rid of them, consider using the /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. -pub struct MonitorUpdatingPersister +pub struct MonitorUpdatingPersister where K::Target: KVStore, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator { kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, signer_provider: SP, + broadcaster: BI, + fee_estimator: FE } #[allow(dead_code)] -impl - MonitorUpdatingPersister +impl + MonitorUpdatingPersister where K::Target: KVStore, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator { /// Constructs a new [`MonitorUpdatingPersister`]. /// @@ -441,7 +447,7 @@ where /// [`MonitorUpdatingPersister::cleanup_stale_updates`]. pub fn new( kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, - signer_provider: SP, + signer_provider: SP, broadcaster: BI, fee_estimator: FE ) -> Self { MonitorUpdatingPersister { kv_store, @@ -449,6 +455,8 @@ where maximum_pending_updates, entropy_source, signer_provider, + broadcaster, + fee_estimator } } @@ -457,24 +465,14 @@ where /// It is extremely important that your [`KVStore::read`] implementation uses the /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the /// documentation for [`MonitorUpdatingPersister`]. - pub fn read_all_channel_monitors_with_updates( - &self, broadcaster: &B, fee_estimator: &F, - ) -> Result::EcdsaSigner>)>, io::Error> - where - B::Target: BroadcasterInterface, - F::Target: FeeEstimator, - { + pub fn read_all_channel_monitors_with_updates(&self) -> Result::EcdsaSigner>)>, io::Error> { let monitor_list = self.kv_store.list( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, )?; let mut res = Vec::with_capacity(monitor_list.len()); for monitor_key in monitor_list { - res.push(self.read_channel_monitor_with_updates( - broadcaster, - fee_estimator, - monitor_key, - )?) + res.push(self.read_channel_monitor_with_updates(monitor_key)?) } Ok(res) } @@ -496,13 +494,9 @@ where /// /// Loading a large number of monitors will be faster if done in parallel. You can use this /// function to accomplish this. Take care to limit the number of parallel readers. - pub fn read_channel_monitor_with_updates( - &self, broadcaster: &B, fee_estimator: &F, monitor_key: String, - ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> - where - B::Target: BroadcasterInterface, - F::Target: FeeEstimator, - { + pub fn read_channel_monitor_with_updates( + &self, monitor_key: String, + ) -> Result<(BlockHash, ChannelMonitor<::EcdsaSigner>), io::Error> { let monitor_name = MonitorName::new(monitor_key)?; let (block_hash, monitor) = self.read_monitor(&monitor_name)?; let mut current_update_id = monitor.get_latest_update_id(); @@ -521,7 +515,7 @@ where Err(err) => return Err(err), }; - monitor.update_monitor(&update, broadcaster, fee_estimator, &self.logger) + monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger) .map_err(|e| { log_error!( self.logger, @@ -639,13 +633,15 @@ where } } -impl - Persist for MonitorUpdatingPersister +impl + Persist for MonitorUpdatingPersister where K::Target: KVStore, L::Target: Logger, ES::Target: EntropySource + Sized, SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator { /// Persists a new channel. This means writing the entire monitor to the /// parametrized [`KVStore`]. @@ -788,12 +784,14 @@ where } } -impl MonitorUpdatingPersister +impl MonitorUpdatingPersister where ES::Target: EntropySource + Sized, K::Target: KVStore, L::Target: Logger, - SP::Target: SignerProvider + Sized + SP::Target: SignerProvider + Sized, + BI::Target: BroadcasterInterface, + FE::Target: FeeEstimator { // Cleans up monitor updates for given monitor in range `start..=end`. fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) { @@ -962,6 +960,8 @@ mod tests { maximum_pending_updates: persister_0_max_pending_updates, entropy_source: &chanmon_cfgs[0].keys_manager, signer_provider: &chanmon_cfgs[0].keys_manager, + broadcaster: &chanmon_cfgs[0].tx_broadcaster, + fee_estimator: &chanmon_cfgs[0].fee_estimator, }; let persister_1 = MonitorUpdatingPersister { kv_store: &TestStore::new(false), @@ -969,6 +969,8 @@ mod tests { maximum_pending_updates: persister_1_max_pending_updates, entropy_source: &chanmon_cfgs[1].keys_manager, signer_provider: &chanmon_cfgs[1].keys_manager, + broadcaster: &chanmon_cfgs[1].tx_broadcaster, + fee_estimator: &chanmon_cfgs[1].fee_estimator, }; let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let chain_mon_0 = test_utils::TestChainMonitor::new( @@ -991,23 +993,18 @@ mod tests { node_cfgs[1].chain_monitor = chain_mon_1; let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster; - let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster; // Check that the persisted channel data is empty before any channels are // open. - let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates( - &broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap(); + let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_0.len(), 0); - let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates( - &broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap(); + let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_1.len(), 0); // Helper to make sure the channel is on the expected update ID. macro_rules! check_persisted_data { ($expected_update_id: expr) => { - persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates( - &broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap(); + persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); // check that we stored only one monitor assert_eq!(persisted_chan_data_0.len(), 1); for (_, mon) in persisted_chan_data_0.iter() { @@ -1026,8 +1023,7 @@ mod tests { ); } } - persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates( - &broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap(); + persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_1.len(), 1); for (_, mon) in persisted_chan_data_1.iter() { assert_eq!(mon.get_latest_update_id(), $expected_update_id); @@ -1095,7 +1091,7 @@ mod tests { check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID); // Make sure the expected number of stale updates is present. - let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap(); + let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap(); let (_, monitor) = &persisted_chan_data[0]; let monitor_name = MonitorName::from(monitor.get_funding_txo().0); // The channel should have 0 updates, as it wrote a full monitor and consolidated. @@ -1129,6 +1125,8 @@ mod tests { maximum_pending_updates: 11, entropy_source: node_cfgs[0].keys_manager, signer_provider: node_cfgs[0].keys_manager, + broadcaster: node_cfgs[0].tx_broadcaster, + fee_estimator: node_cfgs[0].fee_estimator, }; match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) { ChannelMonitorUpdateStatus::UnrecoverableError => { @@ -1168,6 +1166,8 @@ mod tests { maximum_pending_updates: test_max_pending_updates, entropy_source: &chanmon_cfgs[0].keys_manager, signer_provider: &chanmon_cfgs[0].keys_manager, + broadcaster: &chanmon_cfgs[0].tx_broadcaster, + fee_estimator: &chanmon_cfgs[0].fee_estimator, }; let persister_1 = MonitorUpdatingPersister { kv_store: &TestStore::new(false), @@ -1175,6 +1175,8 @@ mod tests { maximum_pending_updates: test_max_pending_updates, entropy_source: &chanmon_cfgs[1].keys_manager, signer_provider: &chanmon_cfgs[1].keys_manager, + broadcaster: &chanmon_cfgs[1].tx_broadcaster, + fee_estimator: &chanmon_cfgs[1].fee_estimator, }; let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let chain_mon_0 = test_utils::TestChainMonitor::new( @@ -1198,11 +1200,9 @@ mod tests { let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let nodes = create_network(2, &node_cfgs, &node_chanmgrs); - let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster; - // Check that the persisted channel data is empty before any channels are // open. - let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap(); + let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data.len(), 0); // Create some initial channel @@ -1213,7 +1213,7 @@ mod tests { send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000); // Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible) - let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap(); + let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap(); let (_, monitor) = &persisted_chan_data[0]; let monitor_name = MonitorName::from(monitor.get_funding_txo().0); persister_0 -- 2.39.5