X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=28513ebbc9ec921abc4bc539a98ff1d288fe0386;hb=fe6564816a6275e45dcd91dd0320c30ebb1f38de;hp=6426f0925f31f7bb1aafda8c827354fed67b1ad6;hpb=282b52f7bd874190d3e4b311b8b2a5b870285331;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6426f092..28513ebb 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -200,6 +200,8 @@ pub enum PendingHTLCRouting { /// For HTLCs received by LDK, these will ultimately bubble back up as /// [`RecipientOnionFields::custom_tlvs`]. custom_tlvs: Vec<(u64, Vec)>, + /// Set if this HTLC is the final hop in a multi-hop blinded path. + requires_blinded_error: bool, }, } @@ -221,6 +223,7 @@ impl PendingHTLCRouting { match self { Self::Forward { blinded: Some(BlindedForward { failure, .. }), .. } => Some(*failure), Self::Receive { requires_blinded_error: true, .. } => Some(BlindedFailure::FromBlindedNode), + Self::ReceiveKeysend { requires_blinded_error: true, .. } => Some(BlindedFailure::FromBlindedNode), _ => None, } } @@ -1178,6 +1181,8 @@ where // | | // | |__`pending_intercepted_htlcs` // | +// |__`decode_update_add_htlcs` +// | // |__`per_peer_state` // | // |__`pending_inbound_payments` @@ -1268,6 +1273,18 @@ where /// See `ChannelManager` struct-level documentation for lock order requirements. pending_intercepted_htlcs: Mutex>, + /// SCID/SCID Alias -> pending `update_add_htlc`s to decode. + /// + /// Note that because we may have an SCID Alias as the key we can have two entries per channel, + /// though in practice we probably won't be receiving HTLCs for a channel both via the alias + /// and via the classic SCID. + /// + /// Note that no consistency guarantees are made about the existence of a channel with the + /// `short_channel_id` here, nor the `channel_id` in `UpdateAddHTLC`! + /// + /// See `ChannelManager` struct-level documentation for lock order requirements. + decode_update_add_htlcs: Mutex>>, + /// The sets of payments which are claimable or currently being claimed. See /// [`ClaimablePayments`]' individual field docs for more info. /// @@ -2235,9 +2252,9 @@ macro_rules! handle_monitor_update_completion { let update_actions = $peer_state.monitor_update_blocked_actions .remove(&$chan.context.channel_id()).unwrap_or(Vec::new()); - let htlc_forwards = $self.handle_channel_resumption( + let (htlc_forwards, decode_update_add_htlcs) = $self.handle_channel_resumption( &mut $peer_state.pending_msg_events, $chan, updates.raa, - updates.commitment_update, updates.order, updates.accepted_htlcs, + updates.commitment_update, updates.order, updates.accepted_htlcs, updates.pending_update_adds, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs); if let Some(upd) = channel_update { @@ -2298,6 +2315,9 @@ macro_rules! handle_monitor_update_completion { if let Some(forwards) = htlc_forwards { $self.forward_htlcs(&mut [forwards][..]); } + if let Some(decode) = decode_update_add_htlcs { + $self.push_decode_update_add_htlcs(decode); + } $self.finalize_claims(updates.finalized_claimed_htlcs); for failure in updates.failed_htlcs.drain(..) { let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; @@ -2474,6 +2494,7 @@ where pending_inbound_payments: Mutex::new(new_hash_map()), pending_outbound_payments: OutboundPayments::new(), forward_htlcs: Mutex::new(new_hash_map()), + decode_update_add_htlcs: Mutex::new(new_hash_map()), claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }), pending_intercepted_htlcs: Mutex::new(new_hash_map()), outpoint_to_peer: Mutex::new(new_hash_map()), @@ -3073,6 +3094,163 @@ where } } + fn can_forward_htlc_to_outgoing_channel( + &self, chan: &mut Channel, msg: &msgs::UpdateAddHTLC, next_packet: &NextPacketDetails + ) -> Result<(), (&'static str, u16, Option)> { + if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels { + // Note that the behavior here should be identical to the above block - we + // should NOT reveal the existence or non-existence of a private channel if + // we don't allow forwards outbound over them. + return Err(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None)); + } + if chan.context.get_channel_type().supports_scid_privacy() && next_packet.outgoing_scid != chan.context.outbound_scid_alias() { + // `option_scid_alias` (referred to in LDK as `scid_privacy`) means + // "refuse to forward unless the SCID alias was used", so we pretend + // we don't have the channel here. + return Err(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None)); + } + + // Note that we could technically not return an error yet here and just hope + // that the connection is reestablished or monitor updated by the time we get + // around to doing the actual forward, but better to fail early if we can and + // hopefully an attacker trying to path-trace payments cannot make this occur + // on a small/per-node/per-channel scale. + if !chan.context.is_live() { // channel_disabled + // If the channel_update we're going to return is disabled (i.e. the + // peer has been disabled for some time), return `channel_disabled`, + // otherwise return `temporary_channel_failure`. + let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok(); + if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { + return Err(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); + } else { + return Err(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); + } + } + if next_packet.outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum + let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok(); + return Err(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt)); + } + if let Err((err, code)) = chan.htlc_satisfies_config(msg, next_packet.outgoing_amt_msat, next_packet.outgoing_cltv_value) { + let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok(); + return Err((err, code, chan_update_opt)); + } + + Ok(()) + } + + /// Executes a callback `C` that returns some value `X` on the channel found with the given + /// `scid`. `None` is returned when the channel is not found. + fn do_funded_channel_callback) -> X>( + &self, scid: u64, callback: C, + ) -> Option { + let (counterparty_node_id, channel_id) = match self.short_to_chan_info.read().unwrap().get(&scid).cloned() { + None => return None, + Some((cp_id, id)) => (cp_id, id), + }; + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if peer_state_mutex_opt.is_none() { + return None; + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.get_mut(&channel_id).and_then( + |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None } + ) { + None => None, + Some(chan) => Some(callback(chan)), + } + } + + fn can_forward_htlc( + &self, msg: &msgs::UpdateAddHTLC, next_packet_details: &NextPacketDetails + ) -> Result<(), (&'static str, u16, Option)> { + match self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel| { + self.can_forward_htlc_to_outgoing_channel(chan, msg, next_packet_details) + }) { + Some(Ok(())) => {}, + Some(Err(e)) => return Err(e), + None => { + // If we couldn't find the channel info for the scid, it may be a phantom or + // intercept forward. + if (self.default_configuration.accept_intercept_htlcs && + fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)) || + fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash) + {} else { + return Err(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); + } + } + } + + let cur_height = self.best_block.read().unwrap().height + 1; + if let Err((err_msg, err_code)) = check_incoming_htlc_cltv( + cur_height, next_packet_details.outgoing_cltv_value, msg.cltv_expiry + ) { + let chan_update_opt = self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel| { + self.get_channel_update_for_onion(next_packet_details.outgoing_scid, chan).ok() + }).flatten(); + return Err((err_msg, err_code, chan_update_opt)); + } + + Ok(()) + } + + fn htlc_failure_from_update_add_err( + &self, msg: &msgs::UpdateAddHTLC, counterparty_node_id: &PublicKey, err_msg: &'static str, + mut err_code: u16, chan_update: Option, is_intro_node_blinded_forward: bool, + shared_secret: &[u8; 32] + ) -> HTLCFailureMsg { + let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2)); + if chan_update.is_some() && err_code & 0x1000 == 0x1000 { + let chan_update = chan_update.unwrap(); + if err_code == 0x1000 | 11 || err_code == 0x1000 | 12 { + msg.amount_msat.write(&mut res).expect("Writes cannot fail"); + } + else if err_code == 0x1000 | 13 { + msg.cltv_expiry.write(&mut res).expect("Writes cannot fail"); + } + else if err_code == 0x1000 | 20 { + // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791 + 0u16.write(&mut res).expect("Writes cannot fail"); + } + (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail"); + msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail"); + chan_update.write(&mut res).expect("Writes cannot fail"); + } else if err_code & 0x1000 == 0x1000 { + // If we're trying to return an error that requires a `channel_update` but + // we're forwarding to a phantom or intercept "channel" (i.e. cannot + // generate an update), just use the generic "temporary_node_failure" + // instead. + err_code = 0x2000 | 2; + } + + log_info!( + WithContext::from(&self.logger, Some(*counterparty_node_id), Some(msg.channel_id)), + "Failed to accept/forward incoming HTLC: {}", err_msg + ); + // If `msg.blinding_point` is set, we must always fail with malformed. + if msg.blinding_point.is_some() { + return HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC { + channel_id: msg.channel_id, + htlc_id: msg.htlc_id, + sha256_of_onion: [0; 32], + failure_code: INVALID_ONION_BLINDING, + }); + } + + let (err_code, err_data) = if is_intro_node_blinded_forward { + (INVALID_ONION_BLINDING, &[0; 32][..]) + } else { + (err_code, &res.0[..]) + }; + HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { + channel_id: msg.channel_id, + htlc_id: msg.htlc_id, + reason: HTLCFailReason::reason(err_code, err_data.to_vec()) + .get_encrypted_failure_packet(shared_secret, &None), + }) + } + fn decode_update_add_htlc_onion( &self, msg: &msgs::UpdateAddHTLC, counterparty_node_id: &PublicKey, ) -> Result< @@ -3082,48 +3260,7 @@ where msg, &self.node_signer, &self.logger, &self.secp_ctx )?; - let is_intro_node_forward = match next_hop { - onion_utils::Hop::Forward { - next_hop_data: msgs::InboundOnionPayload::BlindedForward { - intro_node_blinding_point: Some(_), .. - }, .. - } => true, - _ => false, - }; - - macro_rules! return_err { - ($msg: expr, $err_code: expr, $data: expr) => { - { - log_info!( - WithContext::from(&self.logger, Some(*counterparty_node_id), Some(msg.channel_id)), - "Failed to accept/forward incoming HTLC: {}", $msg - ); - // If `msg.blinding_point` is set, we must always fail with malformed. - if msg.blinding_point.is_some() { - return Err(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC { - channel_id: msg.channel_id, - htlc_id: msg.htlc_id, - sha256_of_onion: [0; 32], - failure_code: INVALID_ONION_BLINDING, - })); - } - - let (err_code, err_data) = if is_intro_node_forward { - (INVALID_ONION_BLINDING, &[0; 32][..]) - } else { ($err_code, $data) }; - return Err(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { - channel_id: msg.channel_id, - htlc_id: msg.htlc_id, - reason: HTLCFailReason::reason(err_code, err_data.to_vec()) - .get_encrypted_failure_packet(&shared_secret, &None), - })); - } - } - } - - let NextPacketDetails { - next_packet_pubkey, outgoing_amt_msat, outgoing_scid, outgoing_cltv_value - } = match next_packet_details_opt { + let next_packet_details = match next_packet_details_opt { Some(next_packet_details) => next_packet_details, // it is a receive, so no need for outbound checks None => return Ok((next_hop, shared_secret, None)), @@ -3131,124 +3268,15 @@ where // Perform outbound checks here instead of in [`Self::construct_pending_htlc_info`] because we // can't hold the outbound peer state lock at the same time as the inbound peer state lock. - if let Some((err, mut code, chan_update)) = loop { - let id_option = self.short_to_chan_info.read().unwrap().get(&outgoing_scid).cloned(); - let forwarding_chan_info_opt = match id_option { - None => { // unknown_next_peer - // Note that this is likely a timing oracle for detecting whether an scid is a - // phantom or an intercept. - if (self.default_configuration.accept_intercept_htlcs && - fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, outgoing_scid, &self.chain_hash)) || - fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, outgoing_scid, &self.chain_hash) - { - None - } else { - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - } - }, - Some((cp_id, id)) => Some((cp_id.clone(), id.clone())), - }; - let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt { - let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if peer_state_mutex_opt.is_none() { - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let chan = match peer_state.channel_by_id.get_mut(&forwarding_id).map( - |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None } - ).flatten() { - None => { - // Channel was removed. The short_to_chan_info and channel_by_id maps - // have no consistency guarantees. - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - }, - Some(chan) => chan - }; - if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels { - // Note that the behavior here should be identical to the above block - we - // should NOT reveal the existence or non-existence of a private channel if - // we don't allow forwards outbound over them. - break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None)); - } - if chan.context.get_channel_type().supports_scid_privacy() && outgoing_scid != chan.context.outbound_scid_alias() { - // `option_scid_alias` (referred to in LDK as `scid_privacy`) means - // "refuse to forward unless the SCID alias was used", so we pretend - // we don't have the channel here. - break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None)); - } - let chan_update_opt = self.get_channel_update_for_onion(outgoing_scid, chan).ok(); - - // Note that we could technically not return an error yet here and just hope - // that the connection is reestablished or monitor updated by the time we get - // around to doing the actual forward, but better to fail early if we can and - // hopefully an attacker trying to path-trace payments cannot make this occur - // on a small/per-node/per-channel scale. - if !chan.context.is_live() { // channel_disabled - // If the channel_update we're going to return is disabled (i.e. the - // peer has been disabled for some time), return `channel_disabled`, - // otherwise return `temporary_channel_failure`. - if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { - break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); - } else { - break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); - } - } - if outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum - break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt)); - } - if let Err((err, code)) = chan.htlc_satisfies_config(&msg, outgoing_amt_msat, outgoing_cltv_value) { - break Some((err, code, chan_update_opt)); - } - chan_update_opt - } else { - None - }; - - let cur_height = self.best_block.read().unwrap().height + 1; - - if let Err((err_msg, code)) = check_incoming_htlc_cltv( - cur_height, outgoing_cltv_value, msg.cltv_expiry - ) { - if code & 0x1000 != 0 && chan_update_opt.is_none() { - // We really should set `incorrect_cltv_expiry` here but as we're not - // forwarding over a real channel we can't generate a channel_update - // for it. Instead we just return a generic temporary_node_failure. - break Some((err_msg, 0x2000 | 2, None)) - } - let chan_update_opt = if code & 0x1000 != 0 { chan_update_opt } else { None }; - break Some((err_msg, code, chan_update_opt)); - } + self.can_forward_htlc(&msg, &next_packet_details).map_err(|e| { + let (err_msg, err_code, chan_update_opt) = e; + self.htlc_failure_from_update_add_err( + msg, counterparty_node_id, err_msg, err_code, chan_update_opt, + next_hop.is_intro_node_blinded_forward(), &shared_secret + ) + })?; - break None; - } - { - let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2)); - if let Some(chan_update) = chan_update { - if code == 0x1000 | 11 || code == 0x1000 | 12 { - msg.amount_msat.write(&mut res).expect("Writes cannot fail"); - } - else if code == 0x1000 | 13 { - msg.cltv_expiry.write(&mut res).expect("Writes cannot fail"); - } - else if code == 0x1000 | 20 { - // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791 - 0u16.write(&mut res).expect("Writes cannot fail"); - } - (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail"); - msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail"); - chan_update.write(&mut res).expect("Writes cannot fail"); - } else if code & 0x1000 == 0x1000 { - // If we're trying to return an error that requires a `channel_update` but - // we're forwarding to a phantom or intercept "channel" (i.e. cannot - // generate an update), just use the generic "temporary_node_failure" - // instead. - code = 0x2000 | 2; - } - return_err!(err, code, &res.0[..]); - } - Ok((next_hop, shared_secret, Some(next_packet_pubkey))) + Ok((next_hop, shared_secret, Some(next_packet_details.next_packet_pubkey))) } fn construct_pending_htlc_status<'a>( @@ -4276,6 +4304,145 @@ where Ok(()) } + fn process_pending_update_add_htlcs(&self) { + let mut decode_update_add_htlcs = new_hash_map(); + mem::swap(&mut decode_update_add_htlcs, &mut self.decode_update_add_htlcs.lock().unwrap()); + + let get_failed_htlc_destination = |outgoing_scid_opt: Option, payment_hash: PaymentHash| { + if let Some(outgoing_scid) = outgoing_scid_opt { + match self.short_to_chan_info.read().unwrap().get(&outgoing_scid) { + Some((outgoing_counterparty_node_id, outgoing_channel_id)) => + HTLCDestination::NextHopChannel { + node_id: Some(*outgoing_counterparty_node_id), + channel_id: *outgoing_channel_id, + }, + None => HTLCDestination::UnknownNextHop { + requested_forward_scid: outgoing_scid, + }, + } + } else { + HTLCDestination::FailedPayment { payment_hash } + } + }; + + 'outer_loop: for (incoming_scid, update_add_htlcs) in decode_update_add_htlcs { + let incoming_channel_details_opt = self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel| { + let counterparty_node_id = chan.context.get_counterparty_node_id(); + let channel_id = chan.context.channel_id(); + let funding_txo = chan.context.get_funding_txo().unwrap(); + let user_channel_id = chan.context.get_user_id(); + let accept_underpaying_htlcs = chan.context.config().accept_underpaying_htlcs; + (counterparty_node_id, channel_id, funding_txo, user_channel_id, accept_underpaying_htlcs) + }); + let ( + incoming_counterparty_node_id, incoming_channel_id, incoming_funding_txo, + incoming_user_channel_id, incoming_accept_underpaying_htlcs + ) = if let Some(incoming_channel_details) = incoming_channel_details_opt { + incoming_channel_details + } else { + // The incoming channel no longer exists, HTLCs should be resolved onchain instead. + continue; + }; + + let mut htlc_forwards = Vec::new(); + let mut htlc_fails = Vec::new(); + for update_add_htlc in &update_add_htlcs { + let (next_hop, shared_secret, next_packet_details_opt) = match decode_incoming_update_add_htlc_onion( + &update_add_htlc, &self.node_signer, &self.logger, &self.secp_ctx + ) { + Ok(decoded_onion) => decoded_onion, + Err(htlc_fail) => { + htlc_fails.push((htlc_fail, HTLCDestination::InvalidOnion)); + continue; + }, + }; + + let is_intro_node_blinded_forward = next_hop.is_intro_node_blinded_forward(); + let outgoing_scid_opt = next_packet_details_opt.as_ref().map(|d| d.outgoing_scid); + + // Process the HTLC on the incoming channel. + match self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel| { + let logger = WithChannelContext::from(&self.logger, &chan.context); + chan.can_accept_incoming_htlc( + update_add_htlc, &self.fee_estimator, &logger, + ) + }) { + Some(Ok(_)) => {}, + Some(Err((err, code))) => { + let outgoing_chan_update_opt = if let Some(outgoing_scid) = outgoing_scid_opt.as_ref() { + self.do_funded_channel_callback(*outgoing_scid, |chan: &mut Channel| { + self.get_channel_update_for_onion(*outgoing_scid, chan).ok() + }).flatten() + } else { + None + }; + let htlc_fail = self.htlc_failure_from_update_add_err( + &update_add_htlc, &incoming_counterparty_node_id, err, code, + outgoing_chan_update_opt, is_intro_node_blinded_forward, &shared_secret, + ); + let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash); + htlc_fails.push((htlc_fail, htlc_destination)); + continue; + }, + // The incoming channel no longer exists, HTLCs should be resolved onchain instead. + None => continue 'outer_loop, + } + + // Now process the HTLC on the outgoing channel if it's a forward. + if let Some(next_packet_details) = next_packet_details_opt.as_ref() { + if let Err((err, code, chan_update_opt)) = self.can_forward_htlc( + &update_add_htlc, next_packet_details + ) { + let htlc_fail = self.htlc_failure_from_update_add_err( + &update_add_htlc, &incoming_counterparty_node_id, err, code, + chan_update_opt, is_intro_node_blinded_forward, &shared_secret, + ); + let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash); + htlc_fails.push((htlc_fail, htlc_destination)); + continue; + } + } + + match self.construct_pending_htlc_status( + &update_add_htlc, &incoming_counterparty_node_id, shared_secret, next_hop, + incoming_accept_underpaying_htlcs, next_packet_details_opt.map(|d| d.next_packet_pubkey), + ) { + PendingHTLCStatus::Forward(htlc_forward) => { + htlc_forwards.push((htlc_forward, update_add_htlc.htlc_id)); + }, + PendingHTLCStatus::Fail(htlc_fail) => { + let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash); + htlc_fails.push((htlc_fail, htlc_destination)); + }, + } + } + + // Process all of the forwards and failures for the channel in which the HTLCs were + // proposed to as a batch. + let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id, + incoming_user_channel_id, htlc_forwards.drain(..).collect()); + self.forward_htlcs_without_forward_event(&mut [pending_forwards]); + for (htlc_fail, htlc_destination) in htlc_fails.drain(..) { + let failure = match htlc_fail { + HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC { + htlc_id: fail_htlc.htlc_id, + err_packet: fail_htlc.reason, + }, + HTLCFailureMsg::Malformed(fail_malformed_htlc) => HTLCForwardInfo::FailMalformedHTLC { + htlc_id: fail_malformed_htlc.htlc_id, + sha256_of_onion: fail_malformed_htlc.sha256_of_onion, + failure_code: fail_malformed_htlc.failure_code, + }, + }; + self.forward_htlcs.lock().unwrap().entry(incoming_scid).or_insert(vec![]).push(failure); + self.pending_events.lock().unwrap().push_back((events::Event::HTLCHandlingFailed { + prev_channel_id: incoming_channel_id, + failed_next_destination: htlc_destination, + }, None)); + } + } + } + /// Processes HTLCs which are pending waiting on random forward delay. /// /// Should only really ever be called in response to a PendingHTLCsForwardable event. @@ -4283,6 +4450,8 @@ where pub fn process_pending_htlc_forwards(&self) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + self.process_pending_update_add_htlcs(); + let mut new_events = VecDeque::new(); let mut failed_forwards = Vec::new(); let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new(); @@ -4523,7 +4692,10 @@ where (incoming_cltv_expiry, OnionPayload::Invoice { _legacy_hop_data }, Some(payment_data), phantom_shared_secret, onion_fields) }, - PendingHTLCRouting::ReceiveKeysend { payment_data, payment_preimage, payment_metadata, incoming_cltv_expiry, custom_tlvs } => { + PendingHTLCRouting::ReceiveKeysend { + payment_data, payment_preimage, payment_metadata, + incoming_cltv_expiry, custom_tlvs, requires_blinded_error: _ + } => { let onion_fields = RecipientOnionFields { payment_secret: payment_data.as_ref().map(|data| data.payment_secret), payment_metadata, @@ -5324,9 +5496,14 @@ where } } + fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) { + let push_forward_event = self.fail_htlc_backwards_internal_without_forward_event(source, payment_hash, onion_error, destination); + if push_forward_event { self.push_pending_forwards_ev(); } + } + /// Fails an HTLC backwards to the sender of it to us. /// Note that we do not assume that channels corresponding to failed HTLCs are still available. - fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) { + fn fail_htlc_backwards_internal_without_forward_event(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) -> bool { // Ensure that no peer state channel storage lock is held when calling this function. // This ensures that future code doesn't introduce a lock-order requirement for // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling @@ -5344,12 +5521,12 @@ where // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called // from block_connected which may run during initialization prior to the chain_monitor // being fully configured. See the docs for `ChannelManagerReadArgs` for more. + let mut push_forward_event; match source { HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, .. } => { - if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path, + push_forward_event = self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path, session_priv, payment_id, self.probing_cookie_secret, &self.secp_ctx, - &self.pending_events, &self.logger) - { self.push_pending_forwards_ev(); } + &self.pending_events, &self.logger); }, HTLCSource::PreviousHopData(HTLCPreviousHopData { ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret, @@ -5383,11 +5560,9 @@ where } }; - let mut push_forward_ev = false; + push_forward_event = self.decode_update_add_htlcs.lock().unwrap().is_empty(); let mut forward_htlcs = self.forward_htlcs.lock().unwrap(); - if forward_htlcs.is_empty() { - push_forward_ev = true; - } + push_forward_event &= forward_htlcs.is_empty(); match forward_htlcs.entry(*short_channel_id) { hash_map::Entry::Occupied(mut entry) => { entry.get_mut().push(failure); @@ -5397,7 +5572,6 @@ where } } mem::drop(forward_htlcs); - if push_forward_ev { self.push_pending_forwards_ev(); } let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push_back((events::Event::HTLCHandlingFailed { prev_channel_id: *channel_id, @@ -5405,6 +5579,7 @@ where }, None)); }, } + push_forward_event } /// Provides a payment preimage in response to [`Event::PaymentClaimable`], generating any @@ -5740,7 +5915,7 @@ where fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, skimmed_fee_msat: Option, from_onchain: bool, startup_replay: bool, next_channel_counterparty_node_id: Option, - next_channel_outpoint: OutPoint, next_channel_id: ChannelId, + next_channel_outpoint: OutPoint, next_channel_id: ChannelId, next_user_channel_id: Option, ) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { @@ -5759,11 +5934,10 @@ where }, HTLCSource::PreviousHopData(hop_data) => { let prev_channel_id = hop_data.channel_id; + let prev_user_channel_id = hop_data.user_channel_id; let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data); #[cfg(debug_assertions)] let claiming_chan_funding_outpoint = hop_data.outpoint; - #[cfg(debug_assertions)] - let claiming_channel_id = hop_data.channel_id; let res = self.claim_funds_from_hop(hop_data, payment_preimage, |htlc_claim_value_msat, definitely_duplicate| { let chan_to_release = @@ -5821,7 +5995,7 @@ where BackgroundEvent::MonitorUpdatesComplete { channel_id, .. } => - *channel_id == claiming_channel_id, + *channel_id == prev_channel_id, } }), "{:?}", *background_events); } @@ -5845,12 +6019,14 @@ where "skimmed_fee_msat must always be included in total_fee_earned_msat"); Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { event: events::Event::PaymentForwarded { - total_fee_earned_msat, - claim_from_onchain_tx: from_onchain, prev_channel_id: Some(prev_channel_id), next_channel_id: Some(next_channel_id), - outbound_amount_forwarded_msat: forwarded_htlc_value_msat, + prev_user_channel_id, + next_user_channel_id, + total_fee_earned_msat, skimmed_fee_msat, + claim_from_onchain_tx: from_onchain, + outbound_amount_forwarded_msat: forwarded_htlc_value_msat, }, downstream_counterparty_and_funding_outpoint: chan_to_release, }) @@ -5922,24 +6098,31 @@ where fn handle_channel_resumption(&self, pending_msg_events: &mut Vec, channel: &mut Channel, raa: Option, commitment_update: Option, order: RAACommitmentOrder, - pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option, + pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec, + funding_broadcastable: Option, channel_ready: Option, announcement_sigs: Option) - -> Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> { + -> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec)>) { let logger = WithChannelContext::from(&self.logger, &channel.context); - log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement", + log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement", &channel.context.channel_id(), if raa.is_some() { "an" } else { "no" }, - if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(), + if commitment_update.is_some() { "a" } else { "no" }, + pending_forwards.len(), pending_update_adds.len(), if funding_broadcastable.is_some() { "" } else { "not " }, if channel_ready.is_some() { "sending" } else { "without" }, if announcement_sigs.is_some() { "sending" } else { "without" }); - let mut htlc_forwards = None; - let counterparty_node_id = channel.context.get_counterparty_node_id(); + let short_channel_id = channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias()); + + let mut htlc_forwards = None; if !pending_forwards.is_empty() { - htlc_forwards = Some((channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias()), - channel.context.get_funding_txo().unwrap(), channel.context.channel_id(), channel.context.get_user_id(), pending_forwards)); + htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(), + channel.context.channel_id(), channel.context.get_user_id(), pending_forwards)); + } + let mut decode_update_add_htlcs = None; + if !pending_update_adds.is_empty() { + decode_update_add_htlcs = Some((short_channel_id, pending_update_adds)); } if let Some(msg) = channel_ready { @@ -5990,7 +6173,7 @@ where emit_channel_ready_event!(pending_events, channel); } - htlc_forwards + (htlc_forwards, decode_update_add_htlcs) } fn channel_monitor_updated(&self, funding_txo: &OutPoint, channel_id: &ChannelId, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) { @@ -6105,73 +6288,82 @@ where // happening and return an error. N.B. that we create channel with an outbound SCID of zero so // that we can delay allocating the SCID until after we're sure that the checks below will // succeed. - let mut channel = match peer_state.inbound_channel_request_by_id.remove(temporary_channel_id) { + let res = match peer_state.inbound_channel_request_by_id.remove(temporary_channel_id) { Some(unaccepted_channel) => { let best_block_height = self.best_block.read().unwrap().height; InboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, &unaccepted_channel.open_channel_msg, user_channel_id, &self.default_configuration, best_block_height, - &self.logger, accept_0conf).map_err(|e| { - let err_str = e.to_string(); - log_error!(logger, "{}", err_str); - - APIError::ChannelUnavailable { err: err_str } - }) - } + &self.logger, accept_0conf).map_err(|err| MsgHandleErrInternal::from_chan_no_close(err, *temporary_channel_id)) + }, _ => { let err_str = "No such channel awaiting to be accepted.".to_owned(); log_error!(logger, "{}", err_str); - Err(APIError::APIMisuseError { err: err_str }) + return Err(APIError::APIMisuseError { err: err_str }); } - }?; + }; - if accept_0conf { - // This should have been correctly configured by the call to InboundV1Channel::new. - debug_assert!(channel.context.minimum_depth().unwrap() == 0); - } else if channel.context.get_channel_type().requires_zero_conf() { - let send_msg_err_event = events::MessageSendEvent::HandleError { - node_id: channel.context.get_counterparty_node_id(), - action: msgs::ErrorAction::SendErrorMessage{ - msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "No zero confirmation channels accepted".to_owned(), } + match res { + Err(err) => { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + match handle_error!(self, Result::<(), MsgHandleErrInternal>::Err(err), *counterparty_node_id) { + Ok(_) => unreachable!("`handle_error` only returns Err as we've passed in an Err"), + Err(e) => { + return Err(APIError::ChannelUnavailable { err: e.err }); + }, } - }; - peer_state.pending_msg_events.push(send_msg_err_event); - let err_str = "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned(); - log_error!(logger, "{}", err_str); + } + Ok(mut channel) => { + if accept_0conf { + // This should have been correctly configured by the call to InboundV1Channel::new. + debug_assert!(channel.context.minimum_depth().unwrap() == 0); + } else if channel.context.get_channel_type().requires_zero_conf() { + let send_msg_err_event = events::MessageSendEvent::HandleError { + node_id: channel.context.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage{ + msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "No zero confirmation channels accepted".to_owned(), } + } + }; + peer_state.pending_msg_events.push(send_msg_err_event); + let err_str = "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned(); + log_error!(logger, "{}", err_str); - return Err(APIError::APIMisuseError { err: err_str }); - } else { - // If this peer already has some channels, a new channel won't increase our number of peers - // with unfunded channels, so as long as we aren't over the maximum number of unfunded - // channels per-peer we can accept channels from a peer with existing ones. - if is_only_peer_channel && peers_without_funded_channels >= MAX_UNFUNDED_CHANNEL_PEERS { - let send_msg_err_event = events::MessageSendEvent::HandleError { - node_id: channel.context.get_counterparty_node_id(), - action: msgs::ErrorAction::SendErrorMessage{ - msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "Have too many peers with unfunded channels, not accepting new ones".to_owned(), } - } - }; - peer_state.pending_msg_events.push(send_msg_err_event); - let err_str = "Too many peers with unfunded channels, refusing to accept new ones".to_owned(); - log_error!(logger, "{}", err_str); + return Err(APIError::APIMisuseError { err: err_str }); + } else { + // If this peer already has some channels, a new channel won't increase our number of peers + // with unfunded channels, so as long as we aren't over the maximum number of unfunded + // channels per-peer we can accept channels from a peer with existing ones. + if is_only_peer_channel && peers_without_funded_channels >= MAX_UNFUNDED_CHANNEL_PEERS { + let send_msg_err_event = events::MessageSendEvent::HandleError { + node_id: channel.context.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage{ + msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "Have too many peers with unfunded channels, not accepting new ones".to_owned(), } + } + }; + peer_state.pending_msg_events.push(send_msg_err_event); + let err_str = "Too many peers with unfunded channels, refusing to accept new ones".to_owned(); + log_error!(logger, "{}", err_str); - return Err(APIError::APIMisuseError { err: err_str }); - } - } + return Err(APIError::APIMisuseError { err: err_str }); + } + } - // Now that we know we have a channel, assign an outbound SCID alias. - let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); - channel.context.set_outbound_scid_alias(outbound_scid_alias); + // Now that we know we have a channel, assign an outbound SCID alias. + let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); + channel.context.set_outbound_scid_alias(outbound_scid_alias); - peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { - node_id: channel.context.get_counterparty_node_id(), - msg: channel.accept_inbound_channel(), - }); + peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { + node_id: channel.context.get_counterparty_node_id(), + msg: channel.accept_inbound_channel(), + }); - peer_state.channel_by_id.insert(temporary_channel_id.clone(), ChannelPhase::UnfundedInboundV1(channel)); + peer_state.channel_by_id.insert(temporary_channel_id.clone(), ChannelPhase::UnfundedInboundV1(channel)); - Ok(()) + Ok(()) + }, + } } /// Gets the number of peers which match the given filter and do not have any funded, outbound, @@ -6766,7 +6958,7 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan_phase_entry) => { if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { - let pending_forward_info = match decoded_hop_res { + let mut pending_forward_info = match decoded_hop_res { Ok((next_hop, shared_secret, next_packet_pk_opt)) => self.construct_pending_htlc_status( msg, counterparty_node_id, shared_secret, next_hop, @@ -6774,44 +6966,45 @@ where ), Err(e) => PendingHTLCStatus::Fail(e) }; - let create_pending_htlc_status = |chan: &Channel, pending_forward_info: PendingHTLCStatus, error_code: u16| { + let logger = WithChannelContext::from(&self.logger, &chan.context); + // If the update_add is completely bogus, the call will Err and we will close, + // but if we've sent a shutdown and they haven't acknowledged it yet, we just + // want to reject the new HTLC and fail it backwards instead of forwarding. + if let Err((_, error_code)) = chan.can_accept_incoming_htlc(&msg, &self.fee_estimator, &logger) { if msg.blinding_point.is_some() { - return PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed( - msgs::UpdateFailMalformedHTLC { - channel_id: msg.channel_id, - htlc_id: msg.htlc_id, - sha256_of_onion: [0; 32], - failure_code: INVALID_ONION_BLINDING, - } - )) - } - // If the update_add is completely bogus, the call will Err and we will close, - // but if we've sent a shutdown and they haven't acknowledged it yet, we just - // want to reject the new HTLC and fail it backwards instead of forwarding. - match pending_forward_info { - PendingHTLCStatus::Forward(PendingHTLCInfo { - ref incoming_shared_secret, ref routing, .. - }) => { - let reason = if routing.blinded_failure().is_some() { - HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32]) - } else if (error_code & 0x1000) != 0 { - let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan); - HTLCFailReason::reason(real_code, error_data) - } else { - HTLCFailReason::from_failure_code(error_code) - }.get_encrypted_failure_packet(incoming_shared_secret, &None); - let msg = msgs::UpdateFailHTLC { + pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed( + msgs::UpdateFailMalformedHTLC { channel_id: msg.channel_id, htlc_id: msg.htlc_id, - reason - }; - PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg)) - }, - _ => pending_forward_info + sha256_of_onion: [0; 32], + failure_code: INVALID_ONION_BLINDING, + } + )) + } else { + match pending_forward_info { + PendingHTLCStatus::Forward(PendingHTLCInfo { + ref incoming_shared_secret, ref routing, .. + }) => { + let reason = if routing.blinded_failure().is_some() { + HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32]) + } else if (error_code & 0x1000) != 0 { + let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan); + HTLCFailReason::reason(real_code, error_data) + } else { + HTLCFailReason::from_failure_code(error_code) + }.get_encrypted_failure_packet(incoming_shared_secret, &None); + let msg = msgs::UpdateFailHTLC { + channel_id: msg.channel_id, + htlc_id: msg.htlc_id, + reason + }; + pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg)); + }, + _ => {}, + } } - }; - let logger = WithChannelContext::from(&self.logger, &chan.context); - try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &&logger), chan_phase_entry); + } + try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info), chan_phase_entry); } else { return try_chan_phase_entry!(self, Err(ChannelError::Close( "Got an update_add_htlc message for an unfunded channel!".into())), chan_phase_entry); @@ -6824,6 +7017,7 @@ where fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> { let funding_txo; + let next_user_channel_id; let (htlc_source, forwarded_htlc_value, skimmed_fee_msat) = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -6853,6 +7047,7 @@ where // outbound HTLC is claimed. This is guaranteed to all complete before we // process the RAA as messages are processed from single peers serially. funding_txo = chan.context.get_funding_txo().expect("We won't accept a fulfill until funded"); + next_user_channel_id = chan.context.get_user_id(); res } else { return try_chan_phase_entry!(self, Err(ChannelError::Close( @@ -6864,7 +7059,7 @@ where }; self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), skimmed_fee_msat, false, false, Some(*counterparty_node_id), - funding_txo, msg.channel_id + funding_txo, msg.channel_id, Some(next_user_channel_id), ); Ok(()) @@ -6953,10 +7148,28 @@ where } } + fn push_decode_update_add_htlcs(&self, mut update_add_htlcs: (u64, Vec)) { + let mut push_forward_event = self.forward_htlcs.lock().unwrap().is_empty(); + let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap(); + push_forward_event &= decode_update_add_htlcs.is_empty(); + let scid = update_add_htlcs.0; + match decode_update_add_htlcs.entry(scid) { + hash_map::Entry::Occupied(mut e) => { e.get_mut().append(&mut update_add_htlcs.1); }, + hash_map::Entry::Vacant(e) => { e.insert(update_add_htlcs.1); }, + } + if push_forward_event { self.push_pending_forwards_ev(); } + } + #[inline] fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) { + let push_forward_event = self.forward_htlcs_without_forward_event(per_source_pending_forwards); + if push_forward_event { self.push_pending_forwards_ev() } + } + + #[inline] + fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool { + let mut push_forward_event = false; for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards { - let mut push_forward_event = false; let mut new_intercept_events = VecDeque::new(); let mut failed_intercept_forwards = Vec::new(); if !pending_forwards.is_empty() { @@ -6969,6 +7182,7 @@ where // Pull this now to avoid introducing a lock order with `forward_htlcs`. let is_our_scid = self.short_to_chan_info.read().unwrap().contains_key(&scid); + let decode_update_add_htlcs_empty = self.decode_update_add_htlcs.lock().unwrap().is_empty(); let mut forward_htlcs = self.forward_htlcs.lock().unwrap(); let forward_htlcs_empty = forward_htlcs.is_empty(); match forward_htlcs.entry(scid) { @@ -7017,9 +7231,7 @@ where } else { // We don't want to generate a PendingHTLCsForwardable event if only intercepted // payments are being processed. - if forward_htlcs_empty { - push_forward_event = true; - } + push_forward_event |= forward_htlcs_empty && decode_update_add_htlcs_empty; entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info }))); } @@ -7029,15 +7241,15 @@ where } for (htlc_source, payment_hash, failure_reason, destination) in failed_intercept_forwards.drain(..) { - self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination); + push_forward_event |= self.fail_htlc_backwards_internal_without_forward_event(&htlc_source, &payment_hash, &failure_reason, destination); } if !new_intercept_events.is_empty() { let mut events = self.pending_events.lock().unwrap(); events.append(&mut new_intercept_events); } - if push_forward_event { self.push_pending_forwards_ev() } } + push_forward_event } fn push_pending_forwards_ev(&self) { @@ -7247,7 +7459,6 @@ where } fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result { - let htlc_forwards; let need_lnd_workaround = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -7290,9 +7501,11 @@ where } } let need_lnd_workaround = chan.context.workaround_lnd_bug_4006.take(); - htlc_forwards = self.handle_channel_resumption( + let (htlc_forwards, decode_update_add_htlcs) = self.handle_channel_resumption( &mut peer_state.pending_msg_events, chan, responses.raa, responses.commitment_update, responses.order, - Vec::new(), None, responses.channel_ready, responses.announcement_sigs); + Vec::new(), Vec::new(), None, responses.channel_ready, responses.announcement_sigs); + debug_assert!(htlc_forwards.is_none()); + debug_assert!(decode_update_add_htlcs.is_none()); if let Some(upd) = channel_update { peer_state.pending_msg_events.push(upd); } @@ -7338,16 +7551,10 @@ where } }; - let mut persist = NotifyOption::SkipPersistHandleEvents; - if let Some(forwards) = htlc_forwards { - self.forward_htlcs(&mut [forwards][..]); - persist = NotifyOption::DoPersist; - } - if let Some(channel_ready_msg) = need_lnd_workaround { self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?; } - Ok(persist) + Ok(NotifyOption::SkipPersistHandleEvents) } /// Process pending events from the [`chain::Watch`], returning whether any events were processed. @@ -7366,7 +7573,7 @@ where log_trace!(logger, "Claiming HTLC with preimage {} from our monitor", preimage); self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), None, true, - false, counterparty_node_id, funding_outpoint, channel_id); + false, counterparty_node_id, funding_outpoint, channel_id, None); } else { log_trace!(logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash); let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id }; @@ -7374,7 +7581,7 @@ where self.fail_htlc_backwards_internal(&htlc_update.source, &htlc_update.payment_hash, &reason, receiver); } }, - MonitorEvent::HolderForceClosed(_funding_outpoint) => { + MonitorEvent::HolderForceClosed(_) | MonitorEvent::HolderForceClosedWithInfo { .. } => { let counterparty_node_id_opt = match counterparty_node_id { Some(cp_id) => Some(cp_id), None => { @@ -7392,7 +7599,12 @@ where let pending_msg_events = &mut peer_state.pending_msg_events; if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id) { if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, chan_phase_entry) { - failed_channels.push(chan.context.force_shutdown(false, ClosureReason::HolderForceClosed)); + let reason = if let MonitorEvent::HolderForceClosedWithInfo { reason, .. } = monitor_event { + reason + } else { + ClosureReason::HolderForceClosed + }; + failed_channels.push(chan.context.force_shutdown(false, reason.clone())); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update @@ -7401,7 +7613,7 @@ where pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: chan.context.get_counterparty_node_id(), action: msgs::ErrorAction::DisconnectPeer { - msg: Some(msgs::ErrorMessage { channel_id: chan.context.channel_id(), data: "Channel force-closed".to_owned() }) + msg: Some(msgs::ErrorMessage { channel_id: chan.context.channel_id(), data: reason.to_string() }) }, }); } @@ -9775,6 +9987,7 @@ impl_writeable_tlv_based_enum!(PendingHTLCRouting, }, (2, ReceiveKeysend) => { (0, payment_preimage, required), + (1, requires_blinded_error, (default_value, false)), (2, incoming_cltv_expiry, required), (3, payment_metadata, option), (4, payment_data, option), // Added in 0.0.116 @@ -10174,6 +10387,12 @@ where } } + let mut decode_update_add_htlcs_opt = None; + let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap(); + if !decode_update_add_htlcs.is_empty() { + decode_update_add_htlcs_opt = Some(decode_update_add_htlcs); + } + let per_peer_state = self.per_peer_state.write().unwrap(); let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap(); @@ -10325,6 +10544,7 @@ where (10, in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), (13, htlc_onion_fields, optional_vec), + (14, decode_update_add_htlcs_opt, option), }); Ok(()) @@ -10790,6 +11010,7 @@ where let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); let mut events_override = None; let mut in_flight_monitor_updates: Option>> = None; + let mut decode_update_add_htlcs: Option>> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -10803,7 +11024,9 @@ where (10, in_flight_monitor_updates, option), (11, probing_cookie_secret, option), (13, claimable_htlc_onion_fields, optional_vec), + (14, decode_update_add_htlcs, option), }); + let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map()); if fake_scid_rand_bytes.is_none() { fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes()); } @@ -11023,6 +11246,18 @@ where // still have an entry for this HTLC in `forward_htlcs` or // `pending_intercepted_htlcs`, we were apparently not persisted after // the monitor was when forwarding the payment. + decode_update_add_htlcs.retain(|scid, update_add_htlcs| { + update_add_htlcs.retain(|update_add_htlc| { + let matches = *scid == prev_hop_data.short_channel_id && + update_add_htlc.htlc_id == prev_hop_data.htlc_id; + if matches { + log_info!(logger, "Removing pending to-decode HTLC with hash {} as it was forwarded to the closed channel {}", + &htlc.payment_hash, &monitor.channel_id()); + } + !matches + }); + !update_add_htlcs.is_empty() + }); forward_htlcs.retain(|_, forwards| { forwards.retain(|forward| { if let HTLCForwardInfo::AddHTLC(htlc_info) = forward { @@ -11104,7 +11339,7 @@ where } } - if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() { + if !forward_htlcs.is_empty() || !decode_update_add_htlcs.is_empty() || pending_outbounds.needs_abandon() { // If we have pending HTLCs to forward, assume we either dropped a // `PendingHTLCsForwardable` or the user received it but never processed it as they // shut down before the timer hit. Either way, set the time_forwardable to a small @@ -11338,6 +11573,7 @@ where pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()), forward_htlcs: Mutex::new(forward_htlcs), + decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs), claimable_payments: Mutex::new(ClaimablePayments { claimable_payments, pending_claiming_payments: pending_claiming_payments.unwrap() }), outbound_scid_aliases: Mutex::new(outbound_scid_aliases), outpoint_to_peer: Mutex::new(outpoint_to_peer), @@ -11386,7 +11622,9 @@ where // don't remember in the `ChannelMonitor` where we got a preimage from, but if the // channel is closed we just assume that it probably came from an on-chain claim. channel_manager.claim_funds_internal(source, preimage, Some(downstream_value), None, - downstream_closed, true, downstream_node_id, downstream_funding, downstream_channel_id); + downstream_closed, true, downstream_node_id, downstream_funding, + downstream_channel_id, None + ); } //TODO: Broadcast channel update for closed channels, but only after we've made a