Add would_broadcast_at_height functionality to Channel
authorValentine Wallace <vwallace@protonmail.com>
Mon, 10 Aug 2020 18:43:09 +0000 (14:43 -0400)
committerMatt Corallo <git@bluematt.me>
Tue, 11 Aug 2020 17:59:30 +0000 (13:59 -0400)
In service to the larger refactor of removing the Channel's reference
to its ChannelMonitor.

lightning/src/ln/channel.rs
lightning/src/ln/channelmanager.rs
lightning/src/ln/channelmonitor.rs

index b8f297c45bc075eb413d6ead88c37afc91e79e09..1112e89e3bd556ae53c83b8521a23d175e777dac 100644 (file)
@@ -33,7 +33,7 @@ use ln::chan_utils;
 use chain::chaininterface::{FeeEstimator,ConfirmationTarget};
 use chain::transaction::OutPoint;
 use chain::keysinterface::{ChannelKeys, KeysInterface};
-use util::transaction_utils;
+use util::{byte_utils, transaction_utils};
 use util::ser::{Readable, Writeable, Writer};
 use util::logger::Logger;
 use util::errors::APIError;
@@ -43,6 +43,7 @@ use std;
 use std::default::Default;
 use std::{cmp,mem,fmt};
 use std::ops::Deref;
+use std::collections::HashMap;
 use bitcoin::hashes::hex::ToHex;
 
 #[cfg(test)]
@@ -109,10 +110,7 @@ enum InboundHTLCState {
        /// Note that we have to keep an eye on the HTLC until we've received a broadcastable
        /// commitment transaction without it as otherwise we'll have to force-close the channel to
        /// claim it before the timeout (obviously doesn't apply to revoked HTLCs that we can't claim
-       /// anyway). That said, ChannelMonitor does this for us (see
-       /// ChannelMonitor::would_broadcast_at_height) so we actually remove the HTLC from our own
-       /// local state before then, once we're sure that the next commitment_signed and
-       /// ChannelMonitor::provide_latest_local_commitment_tx_info will not include this HTLC.
+       /// anyway).
        LocalRemoved(InboundHTLCRemovalReason),
 }
 
@@ -293,6 +291,8 @@ pub(super) struct Channel<ChanSigner: ChannelKeys> {
        pending_inbound_htlcs: Vec<InboundHTLCOutput>,
        pending_outbound_htlcs: Vec<OutboundHTLCOutput>,
        holding_cell_htlc_updates: Vec<HTLCUpdateAwaitingACK>,
+       payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
+       pending_drops: Vec<InboundHTLCOutput>,
 
        /// When resending CS/RAA messages on channel monitor restoration or on reconnect, we always
        /// need to ensure we resend them in the order we originally generated them. Note that because
@@ -510,6 +510,8 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        pending_inbound_htlcs: Vec::new(),
                        pending_outbound_htlcs: Vec::new(),
                        holding_cell_htlc_updates: Vec::new(),
+                       payment_preimages: HashMap::new(),
+                       pending_drops: Vec::new(),
                        pending_update_fee: None,
                        holding_cell_update_fee: None,
                        next_local_htlc_id: 0,
@@ -738,6 +740,8 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        pending_inbound_htlcs: Vec::new(),
                        pending_outbound_htlcs: Vec::new(),
                        holding_cell_htlc_updates: Vec::new(),
+                       payment_preimages: HashMap::new(),
+                       pending_drops: Vec::new(),
                        pending_update_fee: None,
                        holding_cell_update_fee: None,
                        next_local_htlc_id: 0,
@@ -795,6 +799,70 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                Ok(chan)
        }
 
+       pub(super) fn monitor_would_broadcast_at_height<L: Deref>(&self, height: u32, logger: &L) -> bool where L::Target: Logger {
+               let remote_keys = self.build_remote_transaction_keys().unwrap();
+               let (_tx, _size, remote_htlc_outputs) = self.build_commitment_transaction(self.cur_remote_commitment_transaction_number, &remote_keys, false, true, self.feerate_per_kw, logger);
+
+               let curr_remote_outputs: Vec<&HTLCOutputInCommitment> = remote_htlc_outputs
+                       .iter().map(|&(ref a, _)| a).collect();
+
+               macro_rules! add_htlc_output {
+                       ($htlc: expr, $offered: expr, $list: expr) => {
+                               $list.push(HTLCOutputInCommitment{
+                                       offered: $offered,
+                                       amount_msat: $htlc.amount_msat,
+                                       cltv_expiry: $htlc.cltv_expiry,
+                                       payment_hash: $htlc.payment_hash,
+                                       transaction_output_index: None
+                               });
+                       }
+               }
+
+               let mut prev_remote_htlc_outputs = Vec::new();
+               let mut local_outputs = Vec::new();
+               let awaiting_raa = (self.channel_state & ChannelState::AwaitingRemoteRevoke as u32) != 0;
+               for ref htlc in self.pending_inbound_htlcs.iter() {
+                       match htlc.state {
+                               InboundHTLCState::AwaitingAnnouncedRemoteRevoke(_) => add_htlc_output!(htlc, false, local_outputs),
+                               InboundHTLCState::Committed => add_htlc_output!(htlc, false, local_outputs),
+                               InboundHTLCState::LocalRemoved(_) => {
+                                       add_htlc_output!(htlc, false, local_outputs);
+                                       if awaiting_raa {
+                                               add_htlc_output!(htlc, true, prev_remote_htlc_outputs)
+                                       }
+                               },
+                               _ => {},
+                       }
+               }
+               for ref htlc in self.pending_outbound_htlcs.iter() {
+                       match htlc.state {
+                               OutboundHTLCState::LocalAnnounced(_) => add_htlc_output!(htlc, true, local_outputs),
+                               OutboundHTLCState::Committed => add_htlc_output!(htlc, true, local_outputs),
+                               OutboundHTLCState::RemoteRemoved(_) => {
+                                       add_htlc_output!(htlc, true, local_outputs);
+                                       if awaiting_raa {
+                                               add_htlc_output!(htlc, false, prev_remote_htlc_outputs)
+                                       }
+                               },
+                               OutboundHTLCState::AwaitingRemoteRevokeToRemove(_) => add_htlc_output!(htlc, true, local_outputs),
+                               OutboundHTLCState::AwaitingRemovedRemoteRevoke(_) => {
+                                       if awaiting_raa {
+                                               add_htlc_output!(htlc, false, prev_remote_htlc_outputs)
+                                       }
+                               },
+                       }
+               }
+
+               for ref htlc in self.pending_drops.iter() {
+                       add_htlc_output!(htlc, false, local_outputs);
+               }
+
+               let local_htlc_outputs = local_outputs.iter().collect();
+               let prev_remote_outputs = prev_remote_htlc_outputs.iter().collect();
+               let remote_outputs = [curr_remote_outputs, prev_remote_outputs].concat();
+               ChannelMonitor::<ChanSigner>::would_broadcast_at_height_given_htlcs(local_htlc_outputs, remote_outputs, height, &self.payment_preimages, logger)
+       }
+
        // Utilities to build transactions:
 
        fn get_commitment_transaction_number_obscure_factor(&self) -> u64 {
@@ -1216,6 +1284,7 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                // We have to put the payment_preimage in the channel_monitor right away here to ensure we
                // can claim it even if the channel hits the chain before we see their next commitment.
                self.latest_monitor_update_id += 1;
+               self.payment_preimages.insert(payment_hash_calc, payment_preimage_arg.clone());
                let monitor_update = ChannelMonitorUpdate {
                        update_id: self.latest_monitor_update_id,
                        updates: vec![ChannelMonitorUpdateStep::PaymentPreimage {
@@ -2011,6 +2080,12 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        return Err((None, ChannelError::Close(format!("Got wrong number of HTLC signatures ({}) from remote. It must be {}", msg.htlc_signatures.len(), local_commitment_tx.1))));
                }
 
+               // A LocalRemoved HTLC need to be monitored for expiration until we receive a
+               // broadcastable commitment tx without said HTLC. Now that we've confirmed that
+               // this commitment signed message provides said commitment tx, we can drop the
+               // LocalRemoved HTLCs we were previously watching for.
+               self.pending_drops.clear();
+
                // TODO: Merge these two, sadly they are currently both required to be passed separately to
                // ChannelMonitor:
                let mut htlcs_without_source = Vec::with_capacity(local_commitment_tx.2.len());
@@ -2303,17 +2378,34 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                        // Take references explicitly so that we can hold multiple references to self.
                        let pending_inbound_htlcs: &mut Vec<_> = &mut self.pending_inbound_htlcs;
                        let pending_outbound_htlcs: &mut Vec<_> = &mut self.pending_outbound_htlcs;
-
-                       // We really shouldnt have two passes here, but retain gives a non-mutable ref (Rust bug)
-                       pending_inbound_htlcs.retain(|htlc| {
-                               if let &InboundHTLCState::LocalRemoved(ref reason) = &htlc.state {
-                                       log_trace!(logger, " ...removing inbound LocalRemoved {}", log_bytes!(htlc.payment_hash.0));
-                                       if let &InboundHTLCRemovalReason::Fulfill(_) = reason {
-                                               value_to_self_msat_diff += htlc.amount_msat as i64;
+                       let pending_drops: &mut Vec<_> = &mut self.pending_drops;
+
+                       // LocalRemoved HTLCs are saved in pending_drops so we can properly
+                       // calculate whether to broadcast a commitment transaction due to an
+                       // expiring HTLC or whether the ChannelMonitor will take care of it for
+                       // us.
+                       let mut inbounds = Vec::new();
+                       for htlc in pending_inbound_htlcs.drain(..) {
+                               match htlc.state {
+                                       InboundHTLCState::LocalRemoved(_) => {
+                                               log_trace!(logger, " ...removing inbound LocalRemoved {}", log_bytes!(htlc.payment_hash.0));
+                                               pending_drops.push(htlc);
+                                       },
+                                       _ => inbounds.push(htlc),
+                               }
+                       }
+                       for htlc in pending_drops.iter() {
+                               match htlc.state {
+                                       InboundHTLCState::LocalRemoved(ref reason) => {
+                                               if let &InboundHTLCRemovalReason::Fulfill(_) = reason {
+                                                       value_to_self_msat_diff += htlc.amount_msat as i64;
+                                               }
                                        }
-                                       false
-                               } else { true }
-                       });
+                                       _ => unreachable!(),
+                               };
+                       }
+                       mem::swap(pending_inbound_htlcs, &mut inbounds);
+                       // We really shouldnt have two passes here, but retain gives a non-mutable ref (Rust bug)
                        pending_outbound_htlcs.retain(|htlc| {
                                if let &OutboundHTLCState::AwaitingRemovedRemoteRevoke(ref fail_reason) = &htlc.state {
                                        log_trace!(logger, " ...removing outbound AwaitingRemovedRemoteRevoke {}", log_bytes!(htlc.payment_hash.0));
@@ -3115,14 +3207,6 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
                self.user_id
        }
 
-       /// May only be called after funding has been initiated (ie is_funding_initiated() is true)
-       pub fn channel_monitor(&mut self) -> &mut ChannelMonitor<ChanSigner> {
-               if self.channel_state < ChannelState::FundingSent as u32 {
-                       panic!("Can't get a channel monitor until funding has been created");
-               }
-               self.channel_monitor.as_mut().unwrap()
-       }
-
        /// Guaranteed to be Some after both FundingLocked messages have been exchanged (and, thus,
        /// is_usable() returns true).
        /// Allowed in any state (including after shutdown)
@@ -4121,6 +4205,21 @@ impl<ChanSigner: ChannelKeys + Writeable> Writeable for Channel<ChanSigner> {
                        }
                }
 
+               (self.pending_drops.len() as u64).write(writer)?;
+               for htlc in self.pending_drops.iter() {
+                       htlc.htlc_id.write(writer)?;
+                       htlc.amount_msat.write(writer)?;
+                       htlc.cltv_expiry.write(writer)?;
+                       htlc.payment_hash.write(writer)?;
+                       match &htlc.state {
+                               &InboundHTLCState::LocalRemoved(ref removal_reason) => {
+                                       4u8.write(writer)?;
+                                       removal_reason.write(writer)?;
+                               },
+                               _ => unreachable!(),
+                       }
+               }
+
                (self.pending_outbound_htlcs.len() as u64).write(writer)?;
                for htlc in self.pending_outbound_htlcs.iter() {
                        htlc.htlc_id.write(writer)?;
@@ -4151,6 +4250,11 @@ impl<ChanSigner: ChannelKeys + Writeable> Writeable for Channel<ChanSigner> {
                        }
                }
 
+               writer.write_all(&byte_utils::be64_to_array(self.payment_preimages.len() as u64))?;
+               for payment_preimage in self.payment_preimages.values() {
+                       writer.write_all(&payment_preimage.0[..])?;
+               }
+
                (self.holding_cell_htlc_updates.len() as u64).write(writer)?;
                for update in self.holding_cell_htlc_updates.iter() {
                        match update {
@@ -4248,6 +4352,8 @@ impl<ChanSigner: ChannelKeys + Writeable> Writeable for Channel<ChanSigner> {
        }
 }
 
+const MAX_ALLOC_SIZE: usize = 64*1024;
+
 impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
        fn read<R : ::std::io::Read>(reader: &mut R) -> Result<Self, DecodeError> {
                let _ver: u8 = Readable::read(reader)?;
@@ -4292,6 +4398,21 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
                        });
                }
 
+               let pending_drops_count: u64 = Readable::read(reader)?;
+               let mut pending_drops = Vec::with_capacity(cmp::min(pending_drops_count as usize, OUR_MAX_HTLCS as usize));
+               for _ in 0..pending_drops_count {
+                       pending_drops.push(InboundHTLCOutput {
+                               htlc_id: Readable::read(reader)?,
+                               amount_msat: Readable::read(reader)?,
+                               cltv_expiry: Readable::read(reader)?,
+                               payment_hash: Readable::read(reader)?,
+                               state: match <u8 as Readable>::read(reader)? {
+                                       4 => InboundHTLCState::LocalRemoved(Readable::read(reader)?),
+                                       _ => return Err(DecodeError::InvalidValue),
+                               },
+                       });
+               }
+
                let pending_outbound_htlc_count: u64 = Readable::read(reader)?;
                let mut pending_outbound_htlcs = Vec::with_capacity(cmp::min(pending_outbound_htlc_count as usize, OUR_MAX_HTLCS as usize));
                for _ in 0..pending_outbound_htlc_count {
@@ -4312,6 +4433,16 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
                        });
                }
 
+               let payment_preimages_len: u64 = Readable::read(reader)?;
+               let mut payment_preimages = HashMap::with_capacity(cmp::min(payment_preimages_len as usize, MAX_ALLOC_SIZE / 32));
+               for _ in 0..payment_preimages_len {
+                       let preimage: PaymentPreimage = Readable::read(reader)?;
+                       let hash = PaymentHash(Sha256::hash(&preimage.0[..]).into_inner());
+                       if let Some(_) = payment_preimages.insert(hash, preimage) {
+                               return Err(DecodeError::InvalidValue);
+                       }
+               }
+
                let holding_cell_htlc_update_count: u64 = Readable::read(reader)?;
                let mut holding_cell_htlc_updates = Vec::with_capacity(cmp::min(holding_cell_htlc_update_count as usize, OUR_MAX_HTLCS as usize*2));
                for _ in 0..holding_cell_htlc_update_count {
@@ -4428,6 +4559,8 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for Channel<ChanSigner> {
                        pending_inbound_htlcs,
                        pending_outbound_htlcs,
                        holding_cell_htlc_updates,
+                       payment_preimages,
+                       pending_drops,
 
                        resend_order,
 
index a17eb1a610bc132f736dcf42438b3689d20bb3f0..748053595ffeaa8174d6d9757a96dfcc3e43cdf8 100644 (file)
@@ -3104,7 +3104,7 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
                                                }
                                        }
                                }
-                               if channel.is_funding_initiated() && channel.channel_monitor().would_broadcast_at_height(height, &self.logger) {
+                               if channel.is_funding_initiated() && channel.monitor_would_broadcast_at_height(height, &self.logger) {
                                        if let Some(short_id) = channel.get_short_channel_id() {
                                                short_to_id.remove(&short_id);
                                        }
index b69636e0797b5ef9327da195f785474127d92d0f..f80cac38ad5003964619d060c1d7926fdef671ee 100644 (file)
@@ -1993,7 +1993,26 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                self.last_block_hash = block_hash.clone();
        }
 
-       pub(super) fn would_broadcast_at_height<L: Deref>(&self, height: u32, logger: &L) -> bool where L::Target: Logger {
+       fn would_broadcast_at_height<L: Deref>(&self, height: u32, logger: &L) -> bool where L::Target: Logger {
+               let local_outputs: Vec<&HTLCOutputInCommitment> = self.current_local_commitment_tx.htlc_outputs
+                       .iter().map(|&(ref a, _, _)| a).collect();
+               let mut prev_remote_outputs = Vec::new();
+               if let Some(ref txid) = self.prev_remote_commitment_txid {
+                       if let Some(ref htlc_outputs) = self.remote_claimable_outpoints.get(txid) {
+                               prev_remote_outputs = htlc_outputs.iter().map(|&(ref a, _)| a).collect();
+                       }
+               }
+               let mut curr_remote_outputs = Vec::new();
+               if let Some(ref txid) = self.current_remote_commitment_txid {
+                       if let Some(ref htlc_outputs) = self.remote_claimable_outpoints.get(txid) {
+                               curr_remote_outputs = htlc_outputs.iter().map(|&(ref a, _)| a).collect()
+                       }
+               }
+               let remote_outputs = [curr_remote_outputs, prev_remote_outputs].concat();
+               ChannelMonitor::<ChanSigner>::would_broadcast_at_height_given_htlcs(local_outputs, remote_outputs, height, &self.payment_preimages, logger)
+       }
+
+       pub(super) fn would_broadcast_at_height_given_htlcs<L: Deref>(local_htlc_outputs: Vec<&HTLCOutputInCommitment>, remote_htlc_outputs: Vec<&HTLCOutputInCommitment>, height: u32, preimages: &HashMap<PaymentHash, PaymentPreimage>, logger: &L) -> bool where L::Target: Logger {
                // We need to consider all HTLCs which are:
                //  * in any unrevoked remote commitment transaction, as they could broadcast said
                //    transactions and we'd end up in a race, or
@@ -2032,7 +2051,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                                        //  with CHECK_CLTV_EXPIRY_SANITY_2.
                                        let htlc_outbound = $local_tx == htlc.offered;
                                        if ( htlc_outbound && htlc.cltv_expiry + LATENCY_GRACE_PERIOD_BLOCKS <= height) ||
-                                          (!htlc_outbound && htlc.cltv_expiry <= height + CLTV_CLAIM_BUFFER && self.payment_preimages.contains_key(&htlc.payment_hash)) {
+                                          (!htlc_outbound && htlc.cltv_expiry <= height + CLTV_CLAIM_BUFFER && preimages.contains_key(&htlc.payment_hash)) {
                                                log_info!(logger, "Force-closing channel due to {} HTLC timeout, HTLC expiry is {}", if htlc_outbound { "outbound" } else { "inbound "}, htlc.cltv_expiry);
                                                return true;
                                        }
@@ -2040,18 +2059,8 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                        }
                }
 
-               scan_commitment!(self.current_local_commitment_tx.htlc_outputs.iter().map(|&(ref a, _, _)| a), true);
-
-               if let Some(ref txid) = self.current_remote_commitment_txid {
-                       if let Some(ref htlc_outputs) = self.remote_claimable_outpoints.get(txid) {
-                               scan_commitment!(htlc_outputs.iter().map(|&(ref a, _)| a), false);
-                       }
-               }
-               if let Some(ref txid) = self.prev_remote_commitment_txid {
-                       if let Some(ref htlc_outputs) = self.remote_claimable_outpoints.get(txid) {
-                               scan_commitment!(htlc_outputs.iter().map(|&(ref a, _)| a), false);
-                       }
-               }
+               scan_commitment!(local_htlc_outputs, true);
+               scan_commitment!(remote_htlc_outputs, false);
 
                false
        }