From 5e43070ef43ea81774d96187c1659deaac444c57 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 3 Feb 2020 23:46:29 -0500 Subject: [PATCH] Move pending-HTLC-updated ChannelMonitor from ManyChannelMonitor This is important for a number of reasons: * Firstly, I hit this trying to implement rescan in the demo bitcoinrpc client - if individual ChannelMonitors are out of sync with each other, we cannot add them all into a ManyChannelMonitor together and then rescan, but need to rescan them individually without having to do a bunch of manual work. Of the three return values in ChannelMonitor::block_connected, only the HTLCsource stuff that is moved here makes no sense to be exposed to the user. * Secondly, the logic currently in ManyChannelMonitor cannot be reproduced by the user! HTLCSource is deliberately an opaque type but we use its data to decide which things to keep when inserting into the HashMap. This would prevent a user from properly implementing a replacement ManyChannelMonitor, which is unacceptable. * Finally, by moving the tracking into ChannelMonitor, we can serialize them out, which prevents us from forgetting them when loading from disk, though there are still other races which need to be handled to make this fully safe (see TODOs in ChannelManager). This is safe as no two entries can have the same HTLCSource across different channels (or, if they did, it would be a rather serious bug), though note that, IIRC, when this code was added, the HTLCSource field in the values was not present. We also take this opportunity to rename the fetch function to match our other event interfaces, makaing it clear that by calling the function the set of HTLCUpdates will also be cleared. --- fuzz/src/chanmon_consistency.rs | 4 +- lightning/src/ln/channelmanager.rs | 4 +- lightning/src/ln/channelmonitor.rs | 117 ++++++++++++++--------------- lightning/src/util/test_utils.rs | 4 +- 4 files changed, 63 insertions(+), 66 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index c6f09abd..bf0dda4d 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -121,8 +121,8 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMon ret } - fn fetch_pending_htlc_updated(&self) -> Vec { - return self.simple_monitor.fetch_pending_htlc_updated(); + fn get_and_clear_pending_htlcs_updated(&self) -> Vec { + return self.simple_monitor.get_and_clear_pending_htlcs_updated(); } } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 015894e5..5cc9783c 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2549,7 +2549,7 @@ impl events::MessageSendEventsProvider for Ch // restart. This is doubly true for the fail/fulfill-backs from monitor events! { //TODO: This behavior should be documented. - for htlc_update in self.monitor.fetch_pending_htlc_updated() { + for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() { if let Some(preimage) = htlc_update.payment_preimage { log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0)); self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage); @@ -2574,7 +2574,7 @@ impl events::EventsProvider for ChannelManage // restart. This is doubly true for the fail/fulfill-backs from monitor events! { //TODO: This behavior should be documented. - for htlc_update in self.monitor.fetch_pending_htlc_updated() { + for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() { if let Some(preimage) = htlc_update.payment_preimage { log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0)); self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage); diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index 68674ac3..eaf207cd 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -93,11 +93,13 @@ pub struct MonitorUpdateError(pub &'static str); /// Simple structure send back by ManyChannelMonitor in case of HTLC detected onchain from a /// forward channel and from which info are needed to update HTLC in a backward channel. +#[derive(Clone, PartialEq)] pub struct HTLCUpdate { pub(super) payment_hash: PaymentHash, pub(super) payment_preimage: Option, pub(super) source: HTLCSource } +impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source }); /// Simple trait indicating ability to track a set of ChannelMonitors and multiplex events between /// them. Generally should be implemented by keeping a local SimpleManyChannelMonitor and passing @@ -130,8 +132,12 @@ pub trait ManyChannelMonitor: Send + Sync { fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; /// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated - /// with success or failure backward - fn fetch_pending_htlc_updated(&self) -> Vec; + /// with success or failure. + /// + /// You should probably just call through to + /// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return + /// the full list. + fn get_and_clear_pending_htlcs_updated(&self) -> Vec; } /// A simple implementation of a ManyChannelMonitor and ChainListener. Can be used to create a @@ -153,7 +159,6 @@ pub struct SimpleManyChannelMonitor { chain_monitor: Arc, broadcaster: Arc, pending_events: Mutex>, - pending_htlc_updated: Mutex)>>>, logger: Arc, fee_estimator: Arc } @@ -162,11 +167,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys> ChainListen fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) { let block_hash = header.bitcoin_hash(); let mut new_events: Vec = Vec::with_capacity(0); - let mut htlc_updated_infos = Vec::new(); { let mut monitors = self.monitors.lock().unwrap(); for monitor in monitors.values_mut() { - let (txn_outputs, spendable_outputs, mut htlc_updated) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator); + let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator); if spendable_outputs.len() > 0 { new_events.push(events::Event::SpendableOutputs { outputs: spendable_outputs, @@ -178,35 +182,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys> ChainListen self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey); } } - htlc_updated_infos.append(&mut htlc_updated); - } - } - { - // ChannelManager will just need to fetch pending_htlc_updated and pass state backward - let mut pending_htlc_updated = self.pending_htlc_updated.lock().unwrap(); - for htlc in htlc_updated_infos.drain(..) { - match pending_htlc_updated.entry(htlc.2) { - hash_map::Entry::Occupied(mut e) => { - // In case of reorg we may have htlc outputs solved in a different way so - // we prefer to keep claims but don't store duplicate updates for a given - // (payment_hash, HTLCSource) pair. - let mut existing_claim = false; - e.get_mut().retain(|htlc_data| { - if htlc.0 == htlc_data.0 { - if htlc_data.1.is_some() { - existing_claim = true; - true - } else { false } - } else { true } - }); - if !existing_claim { - e.get_mut().push((htlc.0, htlc.1)); - } - } - hash_map::Entry::Vacant(e) => { - e.insert(vec![(htlc.0, htlc.1)]); - } - } } } let mut pending_events = self.pending_events.lock().unwrap(); @@ -231,7 +206,6 @@ impl Simpl chain_monitor, broadcaster, pending_events: Mutex::new(Vec::new()), - pending_htlc_updated: Mutex::new(HashMap::new()), logger, fee_estimator: feeest, }; @@ -284,17 +258,10 @@ impl ManyChannelMonitor for SimpleManyChann } } - fn fetch_pending_htlc_updated(&self) -> Vec { - let mut updated = self.pending_htlc_updated.lock().unwrap(); - let mut pending_htlcs_updated = Vec::with_capacity(updated.len()); - for (k, v) in updated.drain() { - for htlc_data in v { - pending_htlcs_updated.push(HTLCUpdate { - payment_hash: k, - payment_preimage: htlc_data.1, - source: htlc_data.0, - }); - } + fn get_and_clear_pending_htlcs_updated(&self) -> Vec { + let mut pending_htlcs_updated = Vec::new(); + for chan in self.monitors.lock().unwrap().values_mut() { + pending_htlcs_updated.append(&mut chan.get_and_clear_pending_htlcs_updated()); } pending_htlcs_updated } @@ -640,6 +607,8 @@ pub struct ChannelMonitor { payment_preimages: HashMap, + pending_htlcs_updated: Vec, + destination_script: Script, // Thanks to data loss protection, we may be able to claim our non-htlc funds // back, this is the script we have to spend from but we need to @@ -750,6 +719,7 @@ impl PartialEq for ChannelMonitor { self.current_remote_commitment_number != other.current_remote_commitment_number || self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx || self.payment_preimages != other.payment_preimages || + self.pending_htlcs_updated != other.pending_htlcs_updated || self.destination_script != other.destination_script || self.to_remote_rescue != other.to_remote_rescue || self.pending_claim_requests != other.pending_claim_requests || @@ -938,6 +908,11 @@ impl ChannelMonitor { writer.write_all(&payment_preimage.0[..])?; } + writer.write_all(&byte_utils::be64_to_array(self.pending_htlcs_updated.len() as u64))?; + for data in self.pending_htlcs_updated.iter() { + data.write(writer)?; + } + self.last_block_hash.write(writer)?; self.destination_script.write(writer)?; if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue { @@ -1056,6 +1031,8 @@ impl ChannelMonitor { current_remote_commitment_number: 1 << 48, payment_preimages: HashMap::new(), + pending_htlcs_updated: Vec::new(), + destination_script: destination_script, to_remote_rescue: None, @@ -1419,6 +1396,14 @@ impl ChannelMonitor { res } + /// Get the list of HTLCs who's status has been updated on chain. This should be called by + /// ChannelManager via ManyChannelMonitor::get_and_clear_pending_htlcs_updated(). + pub fn get_and_clear_pending_htlcs_updated(&mut self) -> Vec { + let mut ret = Vec::new(); + mem::swap(&mut ret, &mut self.pending_htlcs_updated); + ret + } + /// Can only fail if idx is < get_min_seen_secret pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> { for i in 0..self.old_secrets.len() { @@ -2402,7 +2387,7 @@ impl ChannelMonitor { /// Eventually this should be pub and, roughly, implement ChainListener, however this requires /// &mut self, as well as returns new spendable outputs and outpoints to watch for spending of /// on-chain. - fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec)>, Vec, Vec<(HTLCSource, Option, PaymentHash)>) { + fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec)>, Vec) { for tx in txn_matched { let mut output_val = 0; for out in tx.output.iter() { @@ -2415,7 +2400,6 @@ impl ChannelMonitor { log_trace!(self, "Block {} at height {} connected with {} txn matched", block_hash, height, txn_matched.len()); let mut watch_outputs = Vec::new(); let mut spendable_outputs = Vec::new(); - let mut htlc_updated = Vec::new(); let mut bump_candidates = HashSet::new(); for tx in txn_matched { if tx.input.len() == 1 { @@ -2474,10 +2458,7 @@ impl ChannelMonitor { // While all commitment/HTLC-Success/HTLC-Timeout transactions have one input, HTLCs // can also be resolved in a few other ways which can have more than one output. Thus, // we call is_resolving_htlc_output here outside of the tx.input.len() == 1 check. - let mut updated = self.is_resolving_htlc_output(&tx, height); - if updated.len() > 0 { - htlc_updated.append(&mut updated); - } + self.is_resolving_htlc_output(&tx, height); // Scan all input to verify is one of the outpoint spent is of interest for us let mut claimed_outputs_material = Vec::new(); @@ -2600,7 +2581,11 @@ impl ChannelMonitor { }, OnchainEvent::HTLCUpdate { htlc_update } => { log_trace!(self, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0)); - htlc_updated.push((htlc_update.0, None, htlc_update.1)); + self.pending_htlcs_updated.push(HTLCUpdate { + payment_hash: htlc_update.1, + payment_preimage: None, + source: htlc_update.0, + }); }, OnchainEvent::ContentiousOutpoint { outpoint, .. } => { self.claimable_outpoints.remove(&outpoint); @@ -2632,7 +2617,7 @@ impl ChannelMonitor { for &(ref txid, ref output_scripts) in watch_outputs.iter() { self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect()); } - (watch_outputs, spendable_outputs, htlc_updated) + (watch_outputs, spendable_outputs) } fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator) { @@ -2752,9 +2737,7 @@ impl ChannelMonitor { /// Check if any transaction broadcasted is resolving HTLC output by a success or timeout on a local /// or remote commitment tx, if so send back the source, preimage if found and payment_hash of resolved HTLC - fn is_resolving_htlc_output(&mut self, tx: &Transaction, height: u32) -> Vec<(HTLCSource, Option, PaymentHash)> { - let mut htlc_updated = Vec::new(); - + fn is_resolving_htlc_output(&mut self, tx: &Transaction, height: u32) { 'outer_loop: for input in &tx.input { let mut payment_data = None; let revocation_sig_claim = (input.witness.len() == 3 && HTLCType::scriptlen_to_htlctype(input.witness[2].len()) == Some(HTLCType::OfferedHTLC) && input.witness[1].len() == 33) @@ -2854,10 +2837,18 @@ impl ChannelMonitor { let mut payment_preimage = PaymentPreimage([0; 32]); if accepted_preimage_claim { payment_preimage.0.copy_from_slice(&input.witness[3]); - htlc_updated.push((source, Some(payment_preimage), payment_hash)); + self.pending_htlcs_updated.push(HTLCUpdate { + source, + payment_preimage: Some(payment_preimage), + payment_hash + }); } else if offered_preimage_claim { payment_preimage.0.copy_from_slice(&input.witness[1]); - htlc_updated.push((source, Some(payment_preimage), payment_hash)); + self.pending_htlcs_updated.push(HTLCUpdate { + source, + payment_preimage: Some(payment_preimage), + payment_hash + }); } else { log_info!(self, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1); match self.onchain_events_waiting_threshold_conf.entry(height + ANTI_REORG_DELAY - 1) { @@ -2880,7 +2871,6 @@ impl ChannelMonitor { } } } - htlc_updated } /// Lightning security model (i.e being able to redeem/timeout HTLC or penalize coutnerparty onchain) lays on the assumption of claim transactions getting confirmed before timelock expiration @@ -3221,6 +3211,12 @@ impl> ReadableArgs>::read(reader)? { @@ -3321,6 +3317,7 @@ impl> ReadableArgs for TestChannelMon self.update_ret.lock().unwrap().clone() } - fn fetch_pending_htlc_updated(&self) -> Vec { - return self.simple_monitor.fetch_pending_htlc_updated(); + fn get_and_clear_pending_htlcs_updated(&self) -> Vec { + return self.simple_monitor.get_and_clear_pending_htlcs_updated(); } } -- 2.30.2