From: Matt Corallo Date: Wed, 4 Apr 2018 15:56:54 +0000 (-0400) Subject: Completely rewrite channel HTLC tracking and processing X-Git-Tag: v0.0.12~412^2~5 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=374ea1f05e301b7f2c47781ad4f7a3ac5ac909fd;p=rust-lightning Completely rewrite channel HTLC tracking and processing It obviously was nonsensical prior, though its now way more complicated...there's refactoring work to be done here, but it should at least kinda sorta work now. --- diff --git a/fuzz/fuzz_targets/channel_target.rs b/fuzz/fuzz_targets/channel_target.rs index b64399610..20fd4b435 100644 --- a/fuzz/fuzz_targets/channel_target.rs +++ b/fuzz/fuzz_targets/channel_target.rs @@ -8,7 +8,7 @@ use bitcoin::util::hash::Sha256dHash; use bitcoin::network::serialize::{serialize, BitcoinHash}; use lightning::ln::channel::Channel; -use lightning::ln::channelmanager::PendingForwardHTLCInfo; +use lightning::ln::channelmanager::{HTLCFailReason, PendingForwardHTLCInfo}; use lightning::ln::msgs; use lightning::ln::msgs::MsgDecodable; use lightning::chain::chaininterface::{FeeEstimator, ConfirmationTarget}; @@ -241,11 +241,11 @@ pub fn do_test(data: &[u8]) { }, 4 => { let update_fail_htlc = decode_msg_with_len16!(msgs::UpdateFailHTLC, 32 + 8, 1); - return_err!(channel.update_fail_htlc(&update_fail_htlc)); + return_err!(channel.update_fail_htlc(&update_fail_htlc, HTLCFailReason::dummy())); }, 5 => { let update_fail_malformed_htlc = decode_msg!(msgs::UpdateFailMalformedHTLC, 32+8+32+2); - return_err!(channel.update_fail_malformed_htlc(&update_fail_malformed_htlc)); + return_err!(channel.update_fail_malformed_htlc(&update_fail_malformed_htlc, HTLCFailReason::dummy())); }, 6 => { let commitment_signed = decode_msg_with_len16!(msgs::CommitmentSigned, 32+64, 64); diff --git a/src/ln/channel.rs b/src/ln/channel.rs index 4b6cd08ff..fe92f9d6f 100644 --- a/src/ln/channel.rs +++ b/src/ln/channel.rs @@ -17,7 +17,7 @@ use crypto::hkdf::{hkdf_extract,hkdf_expand}; use ln::msgs; use ln::msgs::{HandleError, MsgEncodable}; use ln::channelmonitor::ChannelMonitor; -use ln::channelmanager::PendingForwardHTLCInfo; +use ln::channelmanager::{PendingForwardHTLCInfo, HTLCFailReason}; use ln::chan_utils::{TxCreationKeys,HTLCOutputInCommitment}; use ln::chan_utils; use chain::chaininterface::{FeeEstimator,ConfirmationTarget}; @@ -25,7 +25,7 @@ use util::{transaction_utils,rng}; use util::sha2::Sha256; use std::default::Default; -use std::cmp; +use std::{cmp,mem}; use std::time::Instant; pub struct ChannelKeys { @@ -84,19 +84,73 @@ impl ChannelKeys { #[derive(PartialEq)] enum HTLCState { + /// Added by remote, to be included in next local commitment tx. RemoteAnnounced, + /// Included in a received commitment_signed message (implying we've revoke_and_ack'ed it), but + /// the remote side hasn't yet revoked their previous state, which we need them to do before we + /// accept this HTLC. Implies AwaitingRemoteRevoke. + /// We also have not yet included this HTLC in a commitment_signed message, and are waiting on + /// a remote revoke_and_ack on a previous state before we can do so. + AwaitingRemoteRevokeToAnnounce, + /// Included in a received commitment_signed message (implying we've revoke_and_ack'ed it), but + /// the remote side hasn't yet revoked their previous state, which we need them to do before we + /// accept this HTLC. Implies AwaitingRemoteRevoke. + /// We have included this HTLC in our latest commitment_signed and are now just waiting on a + /// revoke_and_ack. + AwaitingAnnouncedRemoteRevoke, + /// Added by us and included in a commitment_signed (if we were AwaitingRemoteRevoke when we + /// created it we would have put it in the holding cell instead). When they next revoke_and_ack + /// we will promote to Committed (note that they may not accept it until the next time we + /// revoke, but we dont really care about that: + /// * they've revoked, so worst case we can announce an old state and get our (option on) + /// money back (though we wont), and, + /// * we'll send them a revoke when they send a commitment_signed, and since only they're + /// allowed to remove it, the "can only be removed once committed on both sides" requirement + /// doesn't matter to us and its up to them to enforce it, worst-case they jump ahead but + /// we'll never get out of sync). LocalAnnounced, Committed, + /// Remote removed this (outbound) HTLC. We're waiting on their commitment_signed to finalize + /// the change (though they'll need to revoke before we fail the payment). + RemoteRemoved, + /// Remote removed this and sent a commitment_signed (implying we've revoke_and_ack'ed it), but + /// the remote side hasn't yet revoked their previous state, which we need them to do before we + /// can do any backwards failing. Implies AwaitingRemoteRevoke. + /// We also have not yet removed this HTLC in a commitment_signed message, and are waiting on a + /// remote revoke_and_ack on a previous state before we can do so. + AwaitingRemoteRevokeToRemove, + /// Remote removed this and sent a commitment_signed (implying we've revoke_and_ack'ed it), but + /// the remote side hasn't yet revoked their previous state, which we need them to do before we + /// can do any backwards failing. Implies AwaitingRemoteRevoke. + /// We have removed this HTLC in our latest commitment_signed and are now just waiting on a + /// revoke_and_ack to drop completely. + AwaitingRemovedRemoteRevoke, + /// Removed by us and a new commitment_signed was sent (if we were AwaitingRemoteRevoke when we + /// created it we would have put it in the holding cell instead). When they next revoke_and_ack + /// we'll promote to LocalRemovedAwaitingCommitment if we fulfilled, otherwise we'll drop at + /// that point. + /// Note that we have to keep an eye on the HTLC until we've received a broadcastable + /// commitment transaction without it as otherwise we'll have to force-close the channel to + /// claim it before the timeout (obviously doesn't apply to revoked HTLCs that we can't claim + /// anyway). + LocalRemoved, + /// Removed by us, sent a new commitment_signed and got a revoke_and_ack. Just waiting on an + /// updated local commitment transaction. + LocalRemovedAwaitingCommitment, } -struct HTLCOutput { +struct HTLCOutput { //TODO: Refactor into Outbound/InboundHTLCOutput (will save memory and fewer panics) outbound: bool, // ie to an HTLC-Timeout transaction htlc_id: u64, amount_msat: u64, cltv_expiry: u32, payment_hash: [u8; 32], state: HTLCState, - // state == RemoteAnnounced implies pending_forward_state, otherwise it must be None + /// If we're in a Remote* removed state, set if they failed, otherwise None + fail_reason: Option, + /// If we're in LocalRemoved*, set to true if we fulfilled the HTLC, and can claim money + local_removed_fulfilled: bool, + /// state pre-committed Remote* implies pending_forward_state, otherwise it must be None pending_forward_state: Option, } @@ -113,13 +167,23 @@ impl HTLCOutput { } /// See AwaitingRemoteRevoke ChannelState for more info -struct HTLCOutputAwaitingACK { - // always outbound - amount_msat: u64, - cltv_expiry: u32, - payment_hash: [u8; 32], - onion_routing_packet: msgs::OnionPacket, - time_created: Instant, //TODO: Some kind of timeout thing-a-majig +enum HTLCUpdateAwaitingACK { + AddHTLC { + // always outbound + amount_msat: u64, + cltv_expiry: u32, + payment_hash: [u8; 32], + onion_routing_packet: msgs::OnionPacket, + time_created: Instant, //TODO: Some kind of timeout thing-a-majig + }, + ClaimHTLC { + payment_preimage: [u8; 32], + payment_hash: [u8; 32], // Only here for effecient duplicate detection + }, + FailHTLC { + payment_hash: [u8; 32], + err_packet: msgs::OnionErrorPacket, + }, } enum ChannelState { @@ -184,7 +248,7 @@ pub struct Channel { cur_remote_commitment_transaction_number: u64, value_to_self_msat: u64, // Excluding all pending_htlcs, excluding fees pending_htlcs: Vec, - holding_cell_htlcs: Vec, + holding_cell_htlc_updates: Vec, next_local_htlc_id: u64, next_remote_htlc_id: u64, channel_update_count: u32, @@ -321,7 +385,7 @@ impl Channel { cur_remote_commitment_transaction_number: (1 << 48) - 1, value_to_self_msat: channel_value_satoshis * 1000, //TODO: give them something on open? Parameterize it? pending_htlcs: Vec::new(), - holding_cell_htlcs: Vec::new(), + holding_cell_htlc_updates: Vec::new(), next_local_htlc_id: 0, next_remote_htlc_id: 0, channel_update_count: 0, @@ -441,7 +505,7 @@ impl Channel { cur_remote_commitment_transaction_number: (1 << 48) - 1, value_to_self_msat: msg.push_msat, pending_htlcs: Vec::new(), - holding_cell_htlcs: Vec::new(), + holding_cell_htlc_updates: Vec::new(), next_local_htlc_id: 0, next_remote_htlc_id: 0, channel_update_count: 0, @@ -548,9 +612,23 @@ impl Channel { let dust_limit_satoshis = if local { self.our_dust_limit_satoshis } else { self.their_dust_limit_satoshis }; let mut remote_htlc_total_msat = 0; let mut local_htlc_total_msat = 0; + let mut value_to_self_msat_offset = 0; for ref htlc in self.pending_htlcs.iter() { - if htlc.state == HTLCState::Committed || htlc.state == (if generated_by_local { HTLCState::LocalAnnounced } else { HTLCState::RemoteAnnounced }) { + let include = match htlc.state { + HTLCState::RemoteAnnounced => !generated_by_local, + HTLCState::AwaitingRemoteRevokeToAnnounce => !generated_by_local, + HTLCState::AwaitingAnnouncedRemoteRevoke => true, + HTLCState::LocalAnnounced => generated_by_local, + HTLCState::Committed => true, + HTLCState::RemoteRemoved => generated_by_local, + HTLCState::AwaitingRemoteRevokeToRemove => generated_by_local, + HTLCState::AwaitingRemovedRemoteRevoke => false, + HTLCState::LocalRemoved => !generated_by_local, + HTLCState::LocalRemovedAwaitingCommitment => false, + }; + + if include { if htlc.outbound == local { // "offered HTLC output" if htlc.amount_msat / 1000 >= dust_limit_satoshis + (self.feerate_per_kw * HTLC_TIMEOUT_TX_WEIGHT / 1000) { let htlc_in_tx = htlc.get_in_commitment(true); @@ -573,12 +651,29 @@ impl Channel { } else { remote_htlc_total_msat += htlc.amount_msat; } + } else { + match htlc.state { + HTLCState::AwaitingRemoteRevokeToRemove|HTLCState::AwaitingRemovedRemoteRevoke => { + if generated_by_local && htlc.fail_reason.is_none() { + value_to_self_msat_offset -= htlc.amount_msat as i64; + } + }, + HTLCState::LocalRemoved => { + if !generated_by_local && htlc.local_removed_fulfilled { + value_to_self_msat_offset += htlc.amount_msat as i64; + } + }, + HTLCState::LocalRemovedAwaitingCommitment => { + value_to_self_msat_offset += htlc.amount_msat as i64; + }, + _ => {}, + } } } let total_fee: u64 = self.feerate_per_kw * (COMMITMENT_TX_BASE_WEIGHT + (txouts.len() as u64) * COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000; - let value_to_self: i64 = ((self.value_to_self_msat - local_htlc_total_msat) as i64) / 1000 - if self.channel_outbound { total_fee as i64 } else { 0 }; - let value_to_remote: i64 = (((self.channel_value_satoshis * 1000 - self.value_to_self_msat - remote_htlc_total_msat) / 1000) as i64) - if self.channel_outbound { 0 } else { total_fee as i64 }; + let value_to_self: i64 = ((self.value_to_self_msat - local_htlc_total_msat) as i64 + value_to_self_msat_offset) / 1000 - if self.channel_outbound { total_fee as i64 } else { 0 }; + let value_to_remote: i64 = (((self.channel_value_satoshis * 1000 - self.value_to_self_msat - remote_htlc_total_msat) as i64 - value_to_self_msat_offset) / 1000) - if self.channel_outbound { 0 } else { total_fee as i64 }; let value_to_a = if local { value_to_self } else { value_to_remote }; let value_to_b = if local { value_to_remote } else { value_to_self }; @@ -607,12 +702,9 @@ impl Channel { let mut htlcs_used: Vec = Vec::new(); for (idx, out) in txouts.drain(..).enumerate() { outputs.push(out.0); - match out.1 { - Some(out_htlc) => { - htlcs_used.push(out_htlc); - htlcs_used.last_mut().unwrap().transaction_output_index = idx as u32; - }, - None => {} + if let Some(out_htlc) = out.1 { + htlcs_used.push(out_htlc); + htlcs_used.last_mut().unwrap().transaction_output_index = idx as u32; } } @@ -770,7 +862,7 @@ impl Channel { /// Builds the htlc-success or htlc-timeout transaction which spends a given HTLC output /// @local is used only to convert relevant internal structures which refer to remote vs local /// to decide value of outputs and direction of HTLCs. - fn build_htlc_transaction(&self, prev_hash: &Sha256dHash, htlc: &HTLCOutputInCommitment, local: bool, keys: &TxCreationKeys) -> Result { + fn build_htlc_transaction(&self, prev_hash: &Sha256dHash, htlc: &HTLCOutputInCommitment, local: bool, keys: &TxCreationKeys) -> Transaction { let mut txins: Vec = Vec::new(); txins.push(TxIn { prev_hash: prev_hash.clone(), @@ -794,12 +886,12 @@ impl Channel { value: htlc.amount_msat / 1000 - total_fee //TODO: BOLT 3 does not specify if we should add amount_msat before dividing or if we should divide by 1000 before subtracting (as we do here) }); - Ok(Transaction { + Transaction { version: 2, lock_time: if htlc.offered { htlc.cltv_expiry } else { 0 }, input: txins, output: txouts, - }) + } } /// Signs a transaction created by build_htlc_transaction. If the transaction is an @@ -843,7 +935,7 @@ impl Channel { Ok(()) } - pub fn get_update_fulfill_htlc(&mut self, payment_preimage: [u8; 32]) -> Result { + pub fn get_update_fulfill_htlc(&mut self, payment_preimage_arg: [u8; 32]) -> Result, HandleError> { // Either ChannelFunded got set (which means it wont bet unset) or there is no way any // caller thought we could have something claimed (cause we wouldn't have accepted in an // incoming HTLC anyway). If we got to ShutdownComplete, callers aren't allowed to call us, @@ -854,69 +946,141 @@ impl Channel { assert_eq!(self.channel_state & ChannelState::ShutdownComplete as u32, 0); let mut sha = Sha256::new(); - sha.input(&payment_preimage); - let mut payment_hash = [0; 32]; - sha.result(&mut payment_hash); + sha.input(&payment_preimage_arg); + let mut payment_hash_calc = [0; 32]; + sha.result(&mut payment_hash_calc); + + // Now update local state: + if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == (ChannelState::AwaitingRemoteRevoke as u32) { + for pending_update in self.holding_cell_htlc_updates.iter() { + match pending_update { + &HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_preimage, .. } => { + if payment_preimage_arg == *payment_preimage { + return Ok(None); + } + }, + &HTLCUpdateAwaitingACK::FailHTLC { ref payment_hash, .. } => { + if payment_hash_calc == *payment_hash { + return Err(HandleError{err: "Unable to find a pending HTLC which matched the given payment preimage", msg: None}); + } + }, + _ => {} + } + } + self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::ClaimHTLC { + payment_preimage: payment_preimage_arg, payment_hash: payment_hash_calc, + }); + return Ok(None); + } let mut htlc_id = 0; let mut htlc_amount_msat = 0; - //TODO: swap_remove since we dont need to maintain ordering here - self.pending_htlcs.retain(|ref htlc| { - if !htlc.outbound && htlc.payment_hash == payment_hash { + for htlc in self.pending_htlcs.iter_mut() { + if !htlc.outbound && htlc.payment_hash == payment_hash_calc { if htlc_id != 0 { panic!("Duplicate HTLC payment_hash, you probably re-used payment preimages, NEVER DO THIS!"); } htlc_id = htlc.htlc_id; htlc_amount_msat += htlc.amount_msat; - false - } else { true } - }); + if htlc.state == HTLCState::Committed { + htlc.state = HTLCState::LocalRemoved; + htlc.local_removed_fulfilled = true; + } else if htlc.state == HTLCState::RemoteAnnounced { + panic!("Somehow forwarded HTLC prior to remote revocation!"); + } else if htlc.state == HTLCState::LocalRemoved || htlc.state == HTLCState::LocalRemovedAwaitingCommitment { + return Err(HandleError{err: "Unable to find a pending HTLC which matched the given payment preimage", msg: None}); + } else { + panic!("Have an inbound HTLC when not awaiting remote revoke that had a garbage state"); + } + } + } if htlc_amount_msat == 0 { return Err(HandleError{err: "Unable to find a pending HTLC which matched the given payment preimage", msg: None}); } - self.channel_monitor.provide_payment_preimage(&payment_preimage); + self.channel_monitor.provide_payment_preimage(&payment_preimage_arg); - //TODO: This is racy af, they may have pending messages in flight to us that will not have - //received this yet! - self.value_to_self_msat += htlc_amount_msat; - Ok(msgs::UpdateFulfillHTLC { + Ok(Some(msgs::UpdateFulfillHTLC { channel_id: self.channel_id(), htlc_id: htlc_id, - payment_preimage: payment_preimage, - }) + payment_preimage: payment_preimage_arg, + })) } - pub fn get_update_fail_htlc(&mut self, payment_hash: &[u8; 32], err_packet: msgs::OnionErrorPacket) -> Result { + pub fn get_update_fulfill_htlc_and_commit(&mut self, payment_preimage: [u8; 32]) -> Result, HandleError> { + match self.get_update_fulfill_htlc(payment_preimage)? { + Some(update_fulfill_htlc) => + Ok(Some((update_fulfill_htlc, self.send_commitment_no_status_check()?))), + None => Ok(None) + } + } + + pub fn get_update_fail_htlc(&mut self, payment_hash_arg: &[u8; 32], err_packet: msgs::OnionErrorPacket) -> Result, HandleError> { if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) { return Err(HandleError{err: "Was asked to fail an HTLC when channel was not in an operational state", msg: None}); } assert_eq!(self.channel_state & ChannelState::ShutdownComplete as u32, 0); + // Now update local state: + if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == (ChannelState::AwaitingRemoteRevoke as u32) { + for pending_update in self.holding_cell_htlc_updates.iter() { + match pending_update { + &HTLCUpdateAwaitingACK::ClaimHTLC { ref payment_hash, .. } => { + if *payment_hash_arg == *payment_hash { + return Err(HandleError{err: "Unable to find a pending HTLC which matched the given payment preimage", msg: None}); + } + }, + &HTLCUpdateAwaitingACK::FailHTLC { ref payment_hash, .. } => { + if *payment_hash_arg == *payment_hash { + return Ok(None); + } + }, + _ => {} + } + } + self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::FailHTLC { + payment_hash: payment_hash_arg.clone(), + err_packet, + }); + return Ok(None); + } + let mut htlc_id = 0; let mut htlc_amount_msat = 0; - //TODO: swap_remove since we dont need to maintain ordering here - self.pending_htlcs.retain(|ref htlc| { - if !htlc.outbound && htlc.payment_hash == *payment_hash { + for htlc in self.pending_htlcs.iter_mut() { + if !htlc.outbound && htlc.payment_hash == *payment_hash_arg { if htlc_id != 0 { panic!("Duplicate HTLC payment_hash, you probably re-used payment preimages, NEVER DO THIS!"); } htlc_id = htlc.htlc_id; htlc_amount_msat += htlc.amount_msat; - false - } else { true } - }); + if htlc.state == HTLCState::Committed { + htlc.state = HTLCState::LocalRemoved; + } else if htlc.state == HTLCState::RemoteAnnounced { + panic!("Somehow forwarded HTLC prior to remote revocation!"); + } else if htlc.state == HTLCState::LocalRemoved || htlc.state == HTLCState::LocalRemovedAwaitingCommitment { + return Err(HandleError{err: "Unable to find a pending HTLC which matched the given payment preimage", msg: None}); + } else { + panic!("Have an inbound HTLC when not awaiting remote revoke that had a garbage state"); + } + } + } if htlc_amount_msat == 0 { return Err(HandleError{err: "Unable to find a pending HTLC which matched the given payment preimage", msg: None}); } - //TODO: This is racy af, they may have pending messages in flight to us that will not have - //received this yet! - - Ok(msgs::UpdateFailHTLC { + Ok(Some(msgs::UpdateFailHTLC { channel_id: self.channel_id(), htlc_id, reason: err_packet - }) + })) + } + + pub fn get_update_fail_htlc_and_commit(&mut self, payment_hash: &[u8; 32], err_packet: msgs::OnionErrorPacket) -> Result, HandleError> { + match self.get_update_fail_htlc(payment_hash, err_packet)? { + Some(update_fail_htlc) => + Ok(Some((update_fail_htlc, self.send_commitment_no_status_check()?))), + None => Ok(None) + } } // Message handlers: @@ -1075,12 +1239,29 @@ impl Channel { } /// Returns (inbound_htlc_count, outbound_htlc_count, htlc_outbound_value_msat, htlc_inbound_value_msat) - fn get_pending_htlc_stats(&self) -> (u32, u32, u64, u64) { + /// If its for a remote update check, we need to be more lax about checking against messages we + /// sent but they may not have received/processed before they sent this message. Further, for + /// our own sends, we're more conservative and even consider things they've removed against + /// totals, though there is little reason to outside of further avoiding any race condition + /// issues. + fn get_pending_htlc_stats(&self, for_remote_update_check: bool) -> (u32, u32, u64, u64) { let mut inbound_htlc_count: u32 = 0; let mut outbound_htlc_count: u32 = 0; let mut htlc_outbound_value_msat = 0; let mut htlc_inbound_value_msat = 0; for ref htlc in self.pending_htlcs.iter() { + match htlc.state { + HTLCState::RemoteAnnounced => {}, + HTLCState::AwaitingRemoteRevokeToAnnounce => {}, + HTLCState::AwaitingAnnouncedRemoteRevoke => {}, + HTLCState::LocalAnnounced => { if for_remote_update_check { continue; } }, + HTLCState::Committed => {}, + HTLCState::RemoteRemoved => { if for_remote_update_check { continue; } }, + HTLCState::AwaitingRemoteRevokeToRemove => { if for_remote_update_check { continue; } }, + HTLCState::AwaitingRemovedRemoteRevoke => { if for_remote_update_check { continue; } }, + HTLCState::LocalRemoved => {}, + HTLCState::LocalRemovedAwaitingCommitment => { if for_remote_update_check { continue; } }, + } if !htlc.outbound { inbound_htlc_count += 1; htlc_inbound_value_msat += htlc.amount_msat; @@ -1103,7 +1284,7 @@ impl Channel { return Err(HandleError{err: "Remote side tried to send less than our minimum HTLC value", msg: None}); } - let (inbound_htlc_count, _, htlc_outbound_value_msat, htlc_inbound_value_msat) = self.get_pending_htlc_stats(); + let (inbound_htlc_count, _, htlc_outbound_value_msat, htlc_inbound_value_msat) = self.get_pending_htlc_stats(true); if inbound_htlc_count + 1 > OUR_MAX_HTLCS as u32 { return Err(HandleError{err: "Remote tried to push more than our max accepted HTLCs", msg: None}); } @@ -1136,6 +1317,8 @@ impl Channel { payment_hash: msg.payment_hash, cltv_expiry: msg.cltv_expiry, state: HTLCState::RemoteAnnounced, + fail_reason: None, + local_removed_fulfilled: false, pending_forward_state: Some(pending_forward_state), }); @@ -1143,9 +1326,8 @@ impl Channel { } /// Removes an outbound HTLC which has been commitment_signed by the remote end - fn remove_outbound_htlc(&mut self, htlc_id: u64, check_preimage: Option<[u8; 32]>) -> Result { - let mut found_idx = None; - for (idx, ref htlc) in self.pending_htlcs.iter().enumerate() { + fn mark_outbound_htlc_removed(&mut self, htlc_id: u64, check_preimage: Option<[u8; 32]>, fail_reason: Option) -> Result<(), HandleError> { + for mut htlc in self.pending_htlcs.iter_mut() { if htlc.outbound && htlc.htlc_id == htlc_id { match check_preimage { None => {}, @@ -1154,53 +1336,20 @@ impl Channel { return Err(HandleError{err: "Remote tried to fulfill HTLC with an incorrect preimage", msg: None}); } }; - found_idx = Some(idx); - break; - } - } - match found_idx { - None => Err(HandleError{err: "Remote tried to fulfill/fail an HTLC we couldn't find", msg: None}), - Some(idx) => { - Ok(self.pending_htlcs.swap_remove(idx)) - } - } - } - - /// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them - /// fulfilling or failing the last pending HTLC) - fn free_holding_cell_htlcs(&mut self) -> Result, msgs::CommitmentSigned)>, HandleError> { - if self.holding_cell_htlcs.len() != 0 { - let mut new_htlcs = self.holding_cell_htlcs.split_off(0); - let mut update_add_msgs = Vec::with_capacity(new_htlcs.len()); - let mut err = None; - for new_htlc in new_htlcs.drain(..) { - // Note that this *can* fail, though it should be due to rather-rare conditions on - // fee races with adding too many outputs which push our total payments just over - // the limit. In case its less rare than I anticipate, we may want to revisit - // handling this case better and maybe fufilling some of the HTLCs while attempting - // to rebalance channels. - if self.holding_cell_htlcs.len() != 0 { - self.holding_cell_htlcs.push(new_htlc); + if htlc.state == HTLCState::LocalAnnounced { + return Err(HandleError{err: "Remote tried to fulfill HTLC before it had been committed", msg: None}); + } else if htlc.state == HTLCState::Committed { + htlc.state = HTLCState::RemoteRemoved; + htlc.fail_reason = fail_reason; + } else if htlc.state == HTLCState::AwaitingRemoteRevokeToRemove || htlc.state == HTLCState::AwaitingRemovedRemoteRevoke || htlc.state == HTLCState::RemoteRemoved { + return Err(HandleError{err: "Remote tried to fulfill HTLC that they'd already fulfilled", msg: None}); } else { - match self.send_htlc(new_htlc.amount_msat, new_htlc.payment_hash, new_htlc.cltv_expiry, new_htlc.onion_routing_packet.clone()) { - Ok(update_add_msg_option) => update_add_msgs.push(update_add_msg_option.unwrap()), - Err(e) => { - self.holding_cell_htlcs.push(new_htlc); - err = Some(e); - } - } + panic!("Got a non-outbound state on an outbound HTLC"); } + return Ok(()); } - //TODO: Need to examine the type of err - if its a fee issue or similar we may want to - //fail it back the route, if its a temporary issue we can ignore it... - if update_add_msgs.len() > 0 { - Ok(Some((update_add_msgs, self.send_commitment()?))) - } else { - Err(err.unwrap()) - } - } else { - Ok(None) } + Err(HandleError{err: "Remote tried to fulfill/fail an HTLC we couldn't find", msg: None}) } pub fn update_fulfill_htlc(&mut self, msg: &msgs::UpdateFulfillHTLC) -> Result<(), HandleError> { @@ -1213,51 +1362,27 @@ impl Channel { let mut payment_hash = [0; 32]; sha.result(&mut payment_hash); - match self.remove_outbound_htlc(msg.htlc_id, Some(payment_hash)) { - Err(e) => return Err(e), - Ok(htlc) => { - //TODO: Double-check that we didn't exceed some limits (or value_to_self went - //negative here?) - self.value_to_self_msat -= htlc.amount_msat; - } - } self.channel_monitor.provide_payment_preimage(&msg.payment_preimage); - Ok(()) + self.mark_outbound_htlc_removed(msg.htlc_id, Some(payment_hash), None) } - pub fn update_fail_htlc(&mut self, msg: &msgs::UpdateFailHTLC) -> Result<[u8; 32], HandleError> { + pub fn update_fail_htlc(&mut self, msg: &msgs::UpdateFailHTLC, fail_reason: HTLCFailReason) -> Result<(), HandleError> { if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) { return Err(HandleError{err: "Got add HTLC message when channel was not in an operational state", msg: None}); } - let payment_hash = match self.remove_outbound_htlc(msg.htlc_id, None) { - Err(e) => return Err(e), - Ok(htlc) => { - //TODO: Double-check that we didn't exceed some limits (or value_to_self went - //negative here?) - htlc.payment_hash - } - }; - Ok(payment_hash) + self.mark_outbound_htlc_removed(msg.htlc_id, None, Some(fail_reason)) } - pub fn update_fail_malformed_htlc(&mut self, msg: &msgs::UpdateFailMalformedHTLC) -> Result<[u8; 32], HandleError> { + pub fn update_fail_malformed_htlc(&mut self, msg: &msgs::UpdateFailMalformedHTLC, fail_reason: HTLCFailReason) -> Result<(), HandleError> { if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) { return Err(HandleError{err: "Got add HTLC message when channel was not in an operational state", msg: None}); } - let payment_hash = match self.remove_outbound_htlc(msg.htlc_id, None) { - Err(e) => return Err(e), - Ok(htlc) => { - //TODO: Double-check that we didn't exceed some limits (or value_to_self went - //negative here?) - htlc.payment_hash - } - }; - Ok(payment_hash) + self.mark_outbound_htlc_removed(msg.htlc_id, None, Some(fail_reason)) } - pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned) -> Result<(msgs::RevokeAndACK, Vec), HandleError> { + pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned) -> Result<(msgs::RevokeAndACK, Option), HandleError> { if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) { return Err(HandleError{err: "Got commitment signed message when channel was not in an operational state", msg: None}); } @@ -1275,7 +1400,7 @@ impl Channel { } for (idx, ref htlc) in local_commitment_tx.1.iter().enumerate() { - let htlc_tx = self.build_htlc_transaction(&local_commitment_txid, htlc, true, &local_keys)?; + let htlc_tx = self.build_htlc_transaction(&local_commitment_txid, htlc, true, &local_keys); let htlc_redeemscript = chan_utils::get_htlc_redeemscript(&htlc, &local_keys); let htlc_sighash = Message::from_slice(&bip143::SighashComponents::new(&htlc_tx).sighash_all(&htlc_tx.input[0], &htlc_redeemscript, htlc.amount_msat / 1000)[..]).unwrap(); secp_call!(self.secp_ctx.verify(&htlc_sighash, &msg.htlc_signatures[idx], &local_keys.b_htlc_key), "Invalid HTLC tx siganture from peer"); @@ -1288,21 +1413,109 @@ impl Channel { // Update state now that we've passed all the can-fail calls... - let mut to_forward_infos = Vec::new(); - for ref mut htlc in self.pending_htlcs.iter_mut() { + let mut need_our_commitment = false; + for htlc in self.pending_htlcs.iter_mut() { if htlc.state == HTLCState::RemoteAnnounced { - htlc.state = HTLCState::Committed; - to_forward_infos.push(htlc.pending_forward_state.take().unwrap()); + htlc.state = HTLCState::AwaitingRemoteRevokeToAnnounce; + need_our_commitment = true; + } else if htlc.state == HTLCState::RemoteRemoved { + htlc.state = HTLCState::AwaitingRemoteRevokeToRemove; + need_our_commitment = true; } } + // Finally delete all the LocalRemovedAwaitingCommitment HTLCs + // We really shouldnt have two passes here, but retain gives a non-mutable ref (Rust bug) + let mut claimed_value_msat = 0; + self.pending_htlcs.retain(|htlc| { + if htlc.state == HTLCState::LocalRemovedAwaitingCommitment { + claimed_value_msat += htlc.amount_msat; + false + } else { true } + }); + self.value_to_self_msat += claimed_value_msat; self.cur_local_commitment_transaction_number -= 1; + let our_commitment_signed = if need_our_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { + // If we're AwaitingRemoteRevoke we can't send a new commitment here, but that's ok - + // we'll send one right away when we get the revoke_and_ack when we + // free_holding_cell_htlcs(). + Some(self.send_commitment_no_status_check()?) + } else { None }; + Ok((msgs::RevokeAndACK { channel_id: self.channel_id, per_commitment_secret: per_commitment_secret, next_per_commitment_point: next_per_commitment_point, - }, to_forward_infos)) + }, our_commitment_signed)) + } + + /// Used to fulfill holding_cell_htlcs when we get a remote ack (or implicitly get it by them + /// fulfilling or failing the last pending HTLC) + fn free_holding_cell_htlcs(&mut self) -> Result, HandleError> { + if self.holding_cell_htlc_updates.len() != 0 { + let mut htlc_updates = Vec::new(); + mem::swap(&mut htlc_updates, &mut self.holding_cell_htlc_updates); + let mut update_add_htlcs = Vec::with_capacity(htlc_updates.len()); + let mut update_fulfill_htlcs = Vec::with_capacity(htlc_updates.len()); + let mut update_fail_htlcs = Vec::with_capacity(htlc_updates.len()); + let mut err = None; + for htlc_update in htlc_updates.drain(..) { + // Note that this *can* fail, though it should be due to rather-rare conditions on + // fee races with adding too many outputs which push our total payments just over + // the limit. In case its less rare than I anticipate, we may want to revisit + // handling this case better and maybe fufilling some of the HTLCs while attempting + // to rebalance channels. + if err.is_some() { // We're back to AwaitingRemoteRevoke (or are about to fail the channel) + self.holding_cell_htlc_updates.push(htlc_update); + } else { + match &htlc_update { + &HTLCUpdateAwaitingACK::AddHTLC {amount_msat, cltv_expiry, payment_hash, ref onion_routing_packet, ..} => { + match self.send_htlc(amount_msat, payment_hash, cltv_expiry, onion_routing_packet.clone()) { + Ok(update_add_msg_option) => update_add_htlcs.push(update_add_msg_option.unwrap()), + Err(e) => { + err = Some(e); + } + } + }, + &HTLCUpdateAwaitingACK::ClaimHTLC { payment_preimage, .. } => { + match self.get_update_fulfill_htlc(payment_preimage) { + Ok(update_fulfill_msg_option) => update_fulfill_htlcs.push(update_fulfill_msg_option.unwrap()), + Err(e) => { + err = Some(e); + } + } + }, + &HTLCUpdateAwaitingACK::FailHTLC { payment_hash, ref err_packet } => { + match self.get_update_fail_htlc(&payment_hash, err_packet.clone()) { + Ok(update_fail_msg_option) => update_fail_htlcs.push(update_fail_msg_option.unwrap()), + Err(e) => { + err = Some(e); + } + } + }, + } + if err.is_some() { + self.holding_cell_htlc_updates.push(htlc_update); + } + } + } + //TODO: Need to examine the type of err - if its a fee issue or similar we may want to + //fail it back the route, if its a temporary issue we can ignore it... + match err { + None => { + Ok(Some(msgs::CommitmentUpdate { + update_add_htlcs, + update_fulfill_htlcs, + update_fail_htlcs, + commitment_signed: self.send_commitment_no_status_check()? + })) + }, + Some(e) => Err(e) + } + } else { + Ok(None) + } } /// Handles receiving a remote's revoke_and_ack. Note that we may return a new @@ -1310,7 +1523,7 @@ impl Channel { /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail, /// generating an appropriate error *after* the channel state has been updated based on the /// revoke_and_ack message. - pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK) -> Result, msgs::CommitmentSigned)>, HandleError> { + pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK) -> Result<(Option, Vec, Vec<([u8; 32], HTLCFailReason)>), HandleError> { if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) { return Err(HandleError{err: "Got revoke/ACK message when channel was not in an operational state", msg: None}); } @@ -1326,19 +1539,67 @@ impl Channel { self.channel_state &= !(ChannelState::AwaitingRemoteRevoke as u32); self.their_cur_commitment_point = msg.next_per_commitment_point; self.cur_remote_commitment_transaction_number -= 1; + + let mut to_forward_infos = Vec::new(); + let mut revoked_htlcs = Vec::new(); + let mut require_commitment = false; + let mut value_to_self_msat_diff: i64 = 0; + // We really shouldnt have two passes here, but retain gives a non-mutable ref (Rust bug) + self.pending_htlcs.retain(|htlc| { + if htlc.state == HTLCState::LocalRemoved { + if htlc.local_removed_fulfilled { true } else { false } + } else if htlc.state == HTLCState::AwaitingRemovedRemoteRevoke { + if let Some(reason) = htlc.fail_reason.clone() { // We really want take() here, but, again, non-mut ref :( + revoked_htlcs.push((htlc.payment_hash, reason)); + } else { + // They fulfilled, so we sent them money + value_to_self_msat_diff -= htlc.amount_msat as i64; + } + false + } else { true } + }); for htlc in self.pending_htlcs.iter_mut() { if htlc.state == HTLCState::LocalAnnounced { htlc.state = HTLCState::Committed; + } else if htlc.state == HTLCState::AwaitingRemoteRevokeToAnnounce { + htlc.state = HTLCState::AwaitingAnnouncedRemoteRevoke; + require_commitment = true; + } else if htlc.state == HTLCState::AwaitingAnnouncedRemoteRevoke { + htlc.state = HTLCState::Committed; + to_forward_infos.push(htlc.pending_forward_state.take().unwrap()); + } else if htlc.state == HTLCState::AwaitingRemoteRevokeToRemove { + htlc.state = HTLCState::AwaitingRemovedRemoteRevoke; + require_commitment = true; + } else if htlc.state == HTLCState::LocalRemoved { + assert!(htlc.local_removed_fulfilled); + htlc.state = HTLCState::LocalRemovedAwaitingCommitment; } } + self.value_to_self_msat = (self.value_to_self_msat as i64 + value_to_self_msat_diff) as u64; - self.free_holding_cell_htlcs() + match self.free_holding_cell_htlcs()? { + Some(commitment_update) => { + Ok((Some(commitment_update), to_forward_infos, revoked_htlcs)) + }, + None => { + if require_commitment { + Ok((Some(msgs::CommitmentUpdate { + update_add_htlcs: Vec::new(), + update_fulfill_htlcs: Vec::new(), + update_fail_htlcs: Vec::new(), + commitment_signed: self.send_commitment_no_status_check()? + }), to_forward_infos, revoked_htlcs)) + } else { + Ok((None, to_forward_infos, revoked_htlcs)) + } + } + } } pub fn update_fee(&mut self, fee_estimator: &FeeEstimator, msg: &msgs::UpdateFee) -> Result<(), HandleError> { - if self.channel_outbound { + if self.channel_outbound { return Err(HandleError{err: "Non-funding remote tried to update channel fee", msg: None}); - } + } Channel::check_remote_fee(fee_estimator, msg.feerate_per_kw)?; self.feerate_per_kw = msg.feerate_per_kw as u64; Ok(()) @@ -1398,10 +1659,16 @@ impl Channel { // We can't send our shutdown until we've committed all of our pending HTLCs, but the // remote side is unlikely to accept any new HTLCs, so we go ahead and "free" any holding // cell HTLCs and return them to fail the payment. - let mut dropped_outbound_htlcs = Vec::with_capacity(self.holding_cell_htlcs.len()); - for htlc in self.holding_cell_htlcs.drain(..) { - dropped_outbound_htlcs.push(htlc.payment_hash); - } + let mut dropped_outbound_htlcs = Vec::with_capacity(self.holding_cell_htlc_updates.len()); + self.holding_cell_htlc_updates.retain(|htlc_update| { + match htlc_update { + &HTLCUpdateAwaitingACK::AddHTLC { ref payment_hash, .. } => { + dropped_outbound_htlcs.push(payment_hash.clone()); + false + }, + _ => true + } + }); for htlc in self.pending_htlcs.iter() { if htlc.state == HTLCState::LocalAnnounced { return Ok((None, None, dropped_outbound_htlcs)); @@ -1854,7 +2121,7 @@ impl Channel { return Err(HandleError{err: "Cannot send less than their minimum HTLC value", msg: None}); } - let (_, outbound_htlc_count, htlc_outbound_value_msat, htlc_inbound_value_msat) = self.get_pending_htlc_stats(); + let (_, outbound_htlc_count, htlc_outbound_value_msat, htlc_inbound_value_msat) = self.get_pending_htlc_stats(false); if outbound_htlc_count + 1 > self.their_max_accepted_htlcs as u32 { return Err(HandleError{err: "Cannot push more than their max accepted HTLCs", msg: None}); } @@ -1873,7 +2140,7 @@ impl Channel { // Now update local state: if (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == (ChannelState::AwaitingRemoteRevoke as u32) { //TODO: Check the limits *including* other pending holding cell HTLCs! - self.holding_cell_htlcs.push(HTLCOutputAwaitingACK { + self.holding_cell_htlc_updates.push(HTLCUpdateAwaitingACK::AddHTLC { amount_msat: amount_msat, payment_hash: payment_hash, cltv_expiry: cltv_expiry, @@ -1890,6 +2157,8 @@ impl Channel { payment_hash: payment_hash.clone(), cltv_expiry: cltv_expiry, state: HTLCState::LocalAnnounced, + fail_reason: None, + local_removed_fulfilled: false, pending_forward_state: None }); @@ -1924,9 +2193,23 @@ impl Channel { if !have_updates { return Err(HandleError{err: "Cannot create commitment tx until we have some updates to send", msg: None}); } - + self.send_commitment_no_status_check() + } + /// Only fails in case of bad keys + fn send_commitment_no_status_check(&mut self) -> Result { let funding_script = self.get_funding_redeemscript(); + // We can upgrade the status of some HTLCs that are waiting on a commitment, even if we + // fail to generate this, we still are at least at a position where upgrading their status + // is acceptable. + for htlc in self.pending_htlcs.iter_mut() { + if htlc.state == HTLCState::AwaitingRemoteRevokeToAnnounce { + htlc.state = HTLCState::AwaitingAnnouncedRemoteRevoke; + } else if htlc.state == HTLCState::AwaitingRemoteRevokeToRemove { + htlc.state = HTLCState::AwaitingRemovedRemoteRevoke; + } + } + let remote_keys = self.build_remote_transaction_keys()?; let remote_commitment_tx = self.build_commitment_transaction(self.cur_remote_commitment_transaction_number, &remote_keys, false, true); let remote_commitment_txid = remote_commitment_tx.0.txid(); @@ -1936,7 +2219,7 @@ impl Channel { let mut htlc_sigs = Vec::new(); for ref htlc in remote_commitment_tx.1.iter() { - let htlc_tx = self.build_htlc_transaction(&remote_commitment_txid, htlc, false, &remote_keys)?; + let htlc_tx = self.build_htlc_transaction(&remote_commitment_txid, htlc, false, &remote_keys); let htlc_redeemscript = chan_utils::get_htlc_redeemscript(&htlc, &remote_keys); let htlc_sighash = Message::from_slice(&bip143::SighashComponents::new(&htlc_tx).sighash_all(&htlc_tx.input[0], &htlc_redeemscript, htlc.amount_msat / 1000)[..]).unwrap(); let our_htlc_key = secp_derived_key!(chan_utils::derive_private_key(&self.secp_ctx, &remote_keys.per_commitment_point, &self.local_keys.htlc_base_key)); @@ -1960,7 +2243,7 @@ impl Channel { pub fn send_htlc_and_commit(&mut self, amount_msat: u64, payment_hash: [u8; 32], cltv_expiry: u32, onion_routing_packet: msgs::OnionPacket) -> Result, HandleError> { match self.send_htlc(amount_msat, payment_hash, cltv_expiry, onion_routing_packet)? { Some(update_add_htlc) => - Ok(Some((update_add_htlc, self.send_commitment()?))), + Ok(Some((update_add_htlc, self.send_commitment_no_status_check()?))), None => Ok(None) } } @@ -1990,10 +2273,16 @@ impl Channel { // We can't send our shutdown until we've committed all of our pending HTLCs, but the // remote side is unlikely to accept any new HTLCs, so we go ahead and "free" any holding // cell HTLCs and return them to fail the payment. - let mut dropped_outbound_htlcs = Vec::with_capacity(self.holding_cell_htlcs.len()); - for htlc in self.holding_cell_htlcs.drain(..) { - dropped_outbound_htlcs.push(htlc.payment_hash); - } + let mut dropped_outbound_htlcs = Vec::with_capacity(self.holding_cell_htlc_updates.len()); + self.holding_cell_htlc_updates.retain(|htlc_update| { + match htlc_update { + &HTLCUpdateAwaitingACK::AddHTLC { ref payment_hash, .. } => { + dropped_outbound_htlcs.push(payment_hash.clone()); + false + }, + _ => true + } + }); Ok((msgs::Shutdown { channel_id: self.channel_id, @@ -2091,7 +2380,7 @@ mod tests { let remote_signature = Signature::from_der(&secp_ctx, &hex_bytes($their_sig_hex).unwrap()[..]).unwrap(); let ref htlc = unsigned_tx.1[$htlc_idx]; - let mut htlc_tx = chan.build_htlc_transaction(&unsigned_tx.0.txid(), &htlc, true, &keys).unwrap(); + let mut htlc_tx = chan.build_htlc_transaction(&unsigned_tx.0.txid(), &htlc, true, &keys); let htlc_redeemscript = chan_utils::get_htlc_redeemscript(&htlc, &keys); let htlc_sighash = Message::from_slice(&bip143::SighashComponents::new(&htlc_tx).sighash_all(&htlc_tx.input[0], &htlc_redeemscript, htlc.amount_msat / 1000)[..]).unwrap(); secp_ctx.verify(&htlc_sighash, &remote_signature, &keys.b_htlc_key).unwrap(); @@ -2136,6 +2425,8 @@ mod tests { cltv_expiry: 500, payment_hash: [0; 32], state: HTLCState::Committed, + fail_reason: None, + local_removed_fulfilled: false, pending_forward_state: None, }; let mut sha = Sha256::new(); @@ -2151,6 +2442,8 @@ mod tests { cltv_expiry: 501, payment_hash: [0; 32], state: HTLCState::Committed, + fail_reason: None, + local_removed_fulfilled: false, pending_forward_state: None, }; let mut sha = Sha256::new(); @@ -2166,6 +2459,8 @@ mod tests { cltv_expiry: 502, payment_hash: [0; 32], state: HTLCState::Committed, + fail_reason: None, + local_removed_fulfilled: false, pending_forward_state: None, }; let mut sha = Sha256::new(); @@ -2181,6 +2476,8 @@ mod tests { cltv_expiry: 503, payment_hash: [0; 32], state: HTLCState::Committed, + fail_reason: None, + local_removed_fulfilled: false, pending_forward_state: None, }; let mut sha = Sha256::new(); @@ -2196,6 +2493,8 @@ mod tests { cltv_expiry: 504, payment_hash: [0; 32], state: HTLCState::Committed, + fail_reason: None, + local_removed_fulfilled: false, pending_forward_state: None, }; let mut sha = Sha256::new(); diff --git a/src/ln/channelmanager.rs b/src/ln/channelmanager.rs index b35387ba8..dff6cb19e 100644 --- a/src/ln/channelmanager.rs +++ b/src/ln/channelmanager.rs @@ -37,12 +37,12 @@ mod channel_held_info { /// Stores the info we will need to send when we want to forward an HTLC onwards pub struct PendingForwardHTLCInfo { - onion_packet: Option, - payment_hash: [u8; 32], - short_channel_id: u64, - prev_short_channel_id: u64, - amt_to_forward: u64, - outgoing_cltv_value: u32, + pub(super) onion_packet: Option, + pub(super) payment_hash: [u8; 32], + pub(super) short_channel_id: u64, + pub(super) prev_short_channel_id: u64, + pub(super) amt_to_forward: u64, + pub(super) outgoing_cltv_value: u32, } #[cfg(feature = "fuzztarget")] @@ -59,6 +59,7 @@ mod channel_held_info { } } + #[derive(Clone)] // See Channel::revoke_and_ack for why, tl;dr: Rust bug pub enum HTLCFailReason { ErrorPacket { err: msgs::OnionErrorPacket, @@ -68,6 +69,15 @@ mod channel_held_info { data: Vec, } } + + #[cfg(feature = "fuzztarget")] + impl HTLCFailReason { + pub fn dummy() -> Self { + HTLCFailReason::Reason { + failure_code: 0, data: Vec::new(), + } + } + } } #[cfg(feature = "fuzztarget")] pub use self::channel_held_info::*; @@ -770,14 +780,14 @@ impl ChannelManager { } }; - let (node_id, fail_msg) = { + let (node_id, fail_msgs) = { let chan_id = match channel_state.short_to_id.get(&source_short_channel_id) { Some(chan_id) => chan_id.clone(), None => return false }; let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); - match chan.get_update_fail_htlc(payment_hash, err_packet) { + match chan.get_update_fail_htlc_and_commit(payment_hash, err_packet) { Ok(msg) => (chan.get_their_node_id(), msg), Err(_e) => { //TODO: Do something with e? @@ -786,12 +796,18 @@ impl ChannelManager { } }; - mem::drop(channel_state); - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::SendFailHTLC { - node_id, - msg: fail_msg - }); + match fail_msgs { + Some(msgs) => { + mem::drop(channel_state); + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push(events::Event::SendFailHTLC { + node_id, + msg: msgs.0, + commitment_msg: msgs.1, + }); + }, + None => {}, + } true }, @@ -847,14 +863,14 @@ impl ChannelManager { false }, PendingOutboundHTLC::IntermediaryHopData { source_short_channel_id, .. } => { - let (node_id, fulfill_msg, monitor) = { + let (node_id, fulfill_msgs, monitor) = { let chan_id = match channel_state.short_to_id.get(&source_short_channel_id) { Some(chan_id) => chan_id.clone(), None => return false }; let chan = channel_state.by_id.get_mut(&chan_id).unwrap(); - match chan.get_update_fulfill_htlc(payment_preimage) { + match chan.get_update_fulfill_htlc_and_commit(payment_preimage) { Ok(msg) => (chan.get_their_node_id(), msg, if from_user { Some(chan.channel_monitor()) } else { None }), Err(_e) => { //TODO: Do something with e? @@ -863,13 +879,17 @@ impl ChannelManager { } }; - { - mem::drop(channel_state); - let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::SendFulfillHTLC { - node_id: node_id, - msg: fulfill_msg - }); + mem::drop(channel_state); + match fulfill_msgs { + Some(msgs) => { + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.push(events::Event::SendFulfillHTLC { + node_id: node_id, + msg: msgs.0, + commitment_msg: msgs.1, + }); + }, + None => {}, } //TODO: It may not be possible to handle add_update_monitor fails gracefully, maybe @@ -1361,40 +1381,34 @@ impl ChannelMessageHandler for ChannelManager { fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), HandleError> { let mut channel_state = self.channel_state.lock().unwrap(); - let payment_hash = match channel_state.by_id.get_mut(&msg.channel_id) { + match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { if chan.get_their_node_id() != *their_node_id { return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None}) } - chan.update_fail_htlc(&msg)? + chan.update_fail_htlc(&msg, HTLCFailReason::ErrorPacket { err: msg.reason.clone() }) }, None => return Err(HandleError{err: "Failed to find corresponding channel", msg: None}) - }; - self.fail_htlc_backwards_internal(channel_state, &payment_hash, HTLCFailReason::ErrorPacket { err: msg.reason.clone() }); - Ok(()) + } } fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), HandleError> { let mut channel_state = self.channel_state.lock().unwrap(); - let payment_hash = match channel_state.by_id.get_mut(&msg.channel_id) { + match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { if chan.get_their_node_id() != *their_node_id { return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None}) } - chan.update_fail_malformed_htlc(&msg)? + chan.update_fail_malformed_htlc(&msg, HTLCFailReason::Reason { failure_code: msg.failure_code, data: Vec::new() }) }, None => return Err(HandleError{err: "Failed to find corresponding channel", msg: None}) - }; - self.fail_htlc_backwards_internal(channel_state, &payment_hash, HTLCFailReason::Reason { failure_code: msg.failure_code, data: Vec::new() }); - Ok(()) + } } - fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result { - let mut forward_event = None; + fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) -> Result<(msgs::RevokeAndACK, Option), HandleError> { let (res, monitor) = { let mut channel_state = self.channel_state.lock().unwrap(); - - let ((res, mut forwarding_infos), monitor) = match channel_state.by_id.get_mut(&msg.channel_id) { + match channel_state.by_id.get_mut(&msg.channel_id) { Some(chan) => { if chan.get_their_node_id() != *their_node_id { return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None}) @@ -1402,13 +1416,40 @@ impl ChannelMessageHandler for ChannelManager { (chan.commitment_signed(&msg)?, chan.channel_monitor()) }, None => return Err(HandleError{err: "Failed to find corresponding channel", msg: None}) - }; + } + }; + //TODO: Only if we store HTLC sigs + self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor)?; + + Ok(res) + } + + fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result, HandleError> { + let ((res, mut pending_forwards, mut pending_failures), monitor) = { + let mut channel_state = self.channel_state.lock().unwrap(); + match channel_state.by_id.get_mut(&msg.channel_id) { + Some(chan) => { + if chan.get_their_node_id() != *their_node_id { + return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None}) + } + (chan.revoke_and_ack(&msg)?, chan.channel_monitor()) + }, + None => return Err(HandleError{err: "Failed to find corresponding channel", msg: None}) + } + }; + self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor)?; + for failure in pending_failures.drain(..) { + self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), &failure.0, failure.1); + } + let mut forward_event = None; + if !pending_forwards.is_empty() { + let mut channel_state = self.channel_state.lock().unwrap(); if channel_state.forward_htlcs.is_empty() { forward_event = Some(Instant::now() + Duration::from_millis(((rng::rand_f32() * 4.0 + 1.0) * MIN_HTLC_RELAY_HOLDING_CELL_MILLIS as f32) as u64)); channel_state.next_forward = forward_event.unwrap(); } - for forward_info in forwarding_infos.drain(..) { + for forward_info in pending_forwards.drain(..) { match channel_state.forward_htlcs.entry(forward_info.short_channel_id) { hash_map::Entry::Occupied(mut entry) => { entry.get_mut().push(forward_info); @@ -1418,12 +1459,7 @@ impl ChannelMessageHandler for ChannelManager { } } } - - (res, monitor) - }; - //TODO: Only if we store HTLC sigs - self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor)?; - + } match forward_event { Some(time) => { let mut pending_events = self.pending_events.lock().unwrap(); @@ -1437,23 +1473,6 @@ impl ChannelMessageHandler for ChannelManager { Ok(res) } - fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result, msgs::CommitmentSigned)>, HandleError> { - let (res, monitor) = { - let mut channel_state = self.channel_state.lock().unwrap(); - match channel_state.by_id.get_mut(&msg.channel_id) { - Some(chan) => { - if chan.get_their_node_id() != *their_node_id { - return Err(HandleError{err: "Got a message for a channel from the wrong node!", msg: None}) - } - (chan.revoke_and_ack(&msg)?, chan.channel_monitor()) - }, - None => return Err(HandleError{err: "Failed to find corresponding channel", msg: None}) - } - }; - self.monitor.add_update_monitor(monitor.get_funding_txo().unwrap(), monitor)?; - Ok(res) - } - fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) -> Result<(), HandleError> { let mut channel_state = self.channel_state.lock().unwrap(); match channel_state.by_id.get_mut(&msg.channel_id) { @@ -1564,8 +1583,9 @@ mod tests { use rand::{thread_rng,Rng}; - use std::sync::{Arc, Mutex}; + use std::collections::HashMap; use std::default::Default; + use std::sync::{Arc, Mutex}; use std::time::Instant; fn build_test_onion_keys() -> Vec { @@ -1931,9 +1951,17 @@ mod tests { assert_eq!(added_monitors.len(), 1); added_monitors.clear(); } - assert!(prev_node.0.handle_revoke_and_ack(&node.get_our_node_id(), &revoke_and_ack).unwrap().is_none()); + assert!(prev_node.0.handle_revoke_and_ack(&node.get_our_node_id(), &revoke_and_ack.0).unwrap().is_none()); + let prev_revoke_and_ack = prev_node.0.handle_commitment_signed(&node.get_our_node_id(), &revoke_and_ack.1.unwrap()).unwrap(); { let mut added_monitors = prev_node.1.added_monitors.lock().unwrap(); + assert_eq!(added_monitors.len(), 2); + added_monitors.clear(); + } + assert!(node.handle_revoke_and_ack(&prev_node.0.get_our_node_id(), &prev_revoke_and_ack.0).unwrap().is_none()); + assert!(prev_revoke_and_ack.1.is_none()); + { + let mut added_monitors = monitor.added_monitors.lock().unwrap(); assert_eq!(added_monitors.len(), 1); added_monitors.clear(); } @@ -1979,42 +2007,58 @@ mod tests { added_monitors.clear(); } - let mut expected_next_node = expected_route.last().unwrap().0.get_our_node_id(); - let mut prev_node = expected_route.last().unwrap().0; - let mut next_msg = None; - for &(node, monitor) in expected_route.iter().rev() { - assert_eq!(expected_next_node, node.get_our_node_id()); - match next_msg { - Some(msg) => { - node.handle_update_fulfill_htlc(&prev_node.get_our_node_id(), &msg).unwrap(); + let mut next_msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)> = None; + macro_rules! update_fulfill_dance { + ($node: expr, $monitor: expr, $prev_node: expr, $prev_monitor: expr) => { + { + $node.handle_update_fulfill_htlc(&$prev_node.get_our_node_id(), &next_msgs.as_ref().unwrap().0).unwrap(); + let revoke_and_commit = $node.handle_commitment_signed(&$prev_node.get_our_node_id(), &next_msgs.as_ref().unwrap().1).unwrap(); { - let mut added_monitors = monitor.added_monitors.lock().unwrap(); + let mut added_monitors = $monitor.added_monitors.lock().unwrap(); + assert_eq!(added_monitors.len(), 2); + added_monitors.clear(); + } + assert!($prev_node.handle_revoke_and_ack(&$node.get_our_node_id(), &revoke_and_commit.0).unwrap().is_none()); + let revoke_and_ack = $prev_node.handle_commitment_signed(&$node.get_our_node_id(), &revoke_and_commit.1.unwrap()).unwrap(); + assert!(revoke_and_ack.1.is_none()); + { + let mut added_monitors = $prev_monitor.added_monitors.lock().unwrap(); + assert_eq!(added_monitors.len(), 2); + added_monitors.clear(); + } + assert!($node.handle_revoke_and_ack(&$prev_node.get_our_node_id(), &revoke_and_ack.0).unwrap().is_none()); + { + let mut added_monitors = $monitor.added_monitors.lock().unwrap(); assert_eq!(added_monitors.len(), 1); added_monitors.clear(); } - }, None => {} + } + } + } + + let mut expected_next_node = expected_route.last().unwrap().0.get_our_node_id(); + let mut prev_node = (expected_route.last().unwrap().0, expected_route.last().unwrap().1); + for &(node, monitor) in expected_route.iter().rev() { + assert_eq!(expected_next_node, node.get_our_node_id()); + if next_msgs.is_some() { + update_fulfill_dance!(node, monitor, prev_node.0, prev_node.1); } let events = node.get_and_clear_pending_events(); assert_eq!(events.len(), 1); match events[0] { - Event::SendFulfillHTLC { ref node_id, ref msg } => { + Event::SendFulfillHTLC { ref node_id, ref msg, ref commitment_msg } => { expected_next_node = node_id.clone(); - next_msg = Some(msg.clone()); + next_msgs = Some((msg.clone(), commitment_msg.clone())); }, _ => panic!("Unexpected event"), }; - prev_node = node; + prev_node = (node, monitor); } assert_eq!(expected_next_node, origin_node.get_our_node_id()); - origin_node.handle_update_fulfill_htlc(&expected_route.first().unwrap().0.get_our_node_id(), &next_msg.unwrap()).unwrap(); - { - let mut added_monitors = origin_monitor.added_monitors.lock().unwrap(); - assert_eq!(added_monitors.len(), 1); - added_monitors.clear(); - } + update_fulfill_dance!(origin_node, origin_monitor, expected_route.first().unwrap().0, expected_route.first().unwrap().1); let events = origin_node.get_and_clear_pending_events(); assert_eq!(events.len(), 1); @@ -2072,32 +2116,58 @@ mod tests { assert!(expected_route.last().unwrap().0.fail_htlc_backwards(&our_payment_hash)); + let mut next_msgs: Option<(msgs::UpdateFailHTLC, msgs::CommitmentSigned)> = None; + macro_rules! update_fail_dance { + ($node: expr, $monitor: expr, $prev_node: expr, $prev_monitor: expr) => { + { + $node.handle_update_fail_htlc(&$prev_node.get_our_node_id(), &next_msgs.as_ref().unwrap().0).unwrap(); + let revoke_and_commit = $node.handle_commitment_signed(&$prev_node.get_our_node_id(), &next_msgs.as_ref().unwrap().1).unwrap(); + { + let mut added_monitors = $monitor.added_monitors.lock().unwrap(); + assert_eq!(added_monitors.len(), 1); + added_monitors.clear(); + } + assert!($prev_node.handle_revoke_and_ack(&$node.get_our_node_id(), &revoke_and_commit.0).unwrap().is_none()); + let revoke_and_ack = $prev_node.handle_commitment_signed(&$node.get_our_node_id(), &revoke_and_commit.1.unwrap()).unwrap(); + assert!(revoke_and_ack.1.is_none()); + { + let mut added_monitors = $prev_monitor.added_monitors.lock().unwrap(); + assert_eq!(added_monitors.len(), 2); + added_monitors.clear(); + } + assert!($node.handle_revoke_and_ack(&$prev_node.get_our_node_id(), &revoke_and_ack.0).unwrap().is_none()); + { + let mut added_monitors = $monitor.added_monitors.lock().unwrap(); + assert_eq!(added_monitors.len(), 1); + added_monitors.clear(); + } + } + } + } + let mut expected_next_node = expected_route.last().unwrap().0.get_our_node_id(); - let mut prev_node = expected_route.last().unwrap().0; - let mut next_msg = None; - for &(node, _) in expected_route.iter().rev() { + let mut prev_node = (expected_route.last().unwrap().0, expected_route.last().unwrap().1); + for &(node, monitor) in expected_route.iter().rev() { assert_eq!(expected_next_node, node.get_our_node_id()); - match next_msg { - Some(msg) => { - node.handle_update_fail_htlc(&prev_node.get_our_node_id(), &msg).unwrap(); - }, None => {} + if next_msgs.is_some() { + update_fail_dance!(node, monitor, prev_node.0, prev_node.1); } let events = node.get_and_clear_pending_events(); assert_eq!(events.len(), 1); match events[0] { - Event::SendFailHTLC { ref node_id, ref msg } => { + Event::SendFailHTLC { ref node_id, ref msg, ref commitment_msg } => { expected_next_node = node_id.clone(); - next_msg = Some(msg.clone()); + next_msgs = Some((msg.clone(), commitment_msg.clone())); }, _ => panic!("Unexpected event"), }; - prev_node = node; + prev_node = (node, monitor); } assert_eq!(expected_next_node, origin_node.get_our_node_id()); - origin_node.handle_update_fail_htlc(&expected_route.first().unwrap().0.get_our_node_id(), &next_msg.unwrap()).unwrap(); + update_fail_dance!(origin_node, origin_monitor, expected_route.first().unwrap().0, expected_route.first().unwrap().1); let events = origin_node.get_and_clear_pending_events(); assert_eq!(events.len(), 1); diff --git a/src/ln/msgs.rs b/src/ln/msgs.rs index e85f25ac7..5939c09b1 100644 --- a/src/ln/msgs.rs +++ b/src/ln/msgs.rs @@ -357,6 +357,15 @@ pub struct HandleError { //TODO: rename me pub msg: Option, //TODO: Make this required and rename it } +/// Struct used to return values from revoke_and_ack messages, containing a bunch of commitment +/// transaction updates if they were pending. +pub struct CommitmentUpdate { + pub update_add_htlcs: Vec, + pub update_fulfill_htlcs: Vec, + pub update_fail_htlcs: Vec, + pub commitment_signed: CommitmentSigned, +} + /// A trait to describe an object which can receive channel messages. Messages MAY be called in /// paralell when they originate from different their_node_ids, however they MUST NOT be called in /// paralell when the two calls have the same their_node_id. @@ -377,8 +386,8 @@ pub trait ChannelMessageHandler : events::EventsProvider { fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &UpdateFulfillHTLC) -> Result<(), HandleError>; fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &UpdateFailHTLC) -> Result<(), HandleError>; fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &UpdateFailMalformedHTLC) -> Result<(), HandleError>; - fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &CommitmentSigned) -> Result; - fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &RevokeAndACK) -> Result, CommitmentSigned)>, HandleError>; + fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &CommitmentSigned) -> Result<(RevokeAndACK, Option), HandleError>; + fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &RevokeAndACK) -> Result, HandleError>; fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &UpdateFee) -> Result<(), HandleError>; diff --git a/src/ln/peer_handler.rs b/src/ln/peer_handler.rs index 01081845a..d7b1e9d69 100644 --- a/src/ln/peer_handler.rs +++ b/src/ln/peer_handler.rs @@ -433,18 +433,27 @@ impl PeerManager { 132 => { let msg = try_potential_decodeerror!(msgs::CommitmentSigned::decode(&msg_data[2..])); - let resp = try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg)); - encode_and_send_msg!(resp, 133); + let resps = try_potential_handleerror!(self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg)); + encode_and_send_msg!(resps.0, 133); + if let Some(resp) = resps.1 { + encode_and_send_msg!(resp, 132); + } }, 133 => { let msg = try_potential_decodeerror!(msgs::RevokeAndACK::decode(&msg_data[2..])); let resp_option = try_potential_handleerror!(self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg)); match resp_option { Some(resps) => { - for resp in resps.0 { + for resp in resps.update_add_htlcs { encode_and_send_msg!(resp, 128); } - encode_and_send_msg!(resps.1, 132); + for resp in resps.update_fulfill_htlcs { + encode_and_send_msg!(resp, 130); + } + for resp in resps.update_fail_htlcs { + encode_and_send_msg!(resp, 131); + } + encode_and_send_msg!(resps.commitment_signed, 132); }, None => {}, } @@ -581,19 +590,21 @@ impl PeerManager { Self::do_attempt_write_data(&mut descriptor, peer); continue; }, - Event::SendFulfillHTLC { ref node_id, ref msg } => { + Event::SendFulfillHTLC { ref node_id, ref msg, ref commitment_msg } => { let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 130))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_msg, 132))); Self::do_attempt_write_data(&mut descriptor, peer); continue; }, - Event::SendFailHTLC { ref node_id, ref msg } => { + Event::SendFailHTLC { ref node_id, ref msg, ref commitment_msg } => { let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 131))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_msg, 132))); Self::do_attempt_write_data(&mut descriptor, peer); continue; }, diff --git a/src/util/events.rs b/src/util/events.rs index 9cf25ec38..db4769067 100644 --- a/src/util/events.rs +++ b/src/util/events.rs @@ -76,11 +76,13 @@ pub enum Event { SendFulfillHTLC { node_id: PublicKey, msg: msgs::UpdateFulfillHTLC, + commitment_msg: msgs::CommitmentSigned, }, /// Used to indicate that we need to fail an htlc from the peer with the given node_id. SendFailHTLC { node_id: PublicKey, msg: msgs::UpdateFailHTLC, + commitment_msg: msgs::CommitmentSigned, }, /// Used to indicate that a channel_announcement and channel_update should be broadcast to all /// peers (except the peer with node_id either msg.contents.node_id_1 or msg.contents.node_id_2).