fn can_forward_htlc_to_outgoing_channel(
&self, chan: &mut Channel<SP>, msg: &msgs::UpdateAddHTLC, next_packet: &NextPacketDetails
- ) -> Result<Option<msgs::ChannelUpdate>, (&'static str, u16, Option<msgs::ChannelUpdate>)> {
+ ) -> Result<(), (&'static str, u16, Option<msgs::ChannelUpdate>)> {
if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels {
// Note that the behavior here should be identical to the above block - we
// should NOT reveal the existence or non-existence of a private channel if
// we don't have the channel here.
return Err(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None));
}
- let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
// Note that we could technically not return an error yet here and just hope
// that the connection is reestablished or monitor updated by the time we get
// If the channel_update we're going to return is disabled (i.e. the
// peer has been disabled for some time), return `channel_disabled`,
// otherwise return `temporary_channel_failure`.
+ let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
return Err(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
} else {
}
}
if next_packet.outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum
+ let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
return Err(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt));
}
if let Err((err, code)) = chan.htlc_satisfies_config(msg, next_packet.outgoing_amt_msat, next_packet.outgoing_cltv_value) {
+ let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
return Err((err, code, chan_update_opt));
}
- Ok(chan_update_opt)
+ Ok(())
+ }
+
+ /// Executes a callback `C` that returns some value `X` on the channel found with the given
+ /// `scid`. `None` is returned when the channel is not found.
+ fn do_funded_channel_callback<X, C: Fn(&mut Channel<SP>) -> X>(
+ &self, scid: u64, callback: C,
+ ) -> Option<X> {
+ let (counterparty_node_id, channel_id) = match self.short_to_chan_info.read().unwrap().get(&scid).cloned() {
+ None => return None,
+ Some((cp_id, id)) => (cp_id, id),
+ };
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
+ if peer_state_mutex_opt.is_none() {
+ return None;
+ }
+ let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
+ let peer_state = &mut *peer_state_lock;
+ match peer_state.channel_by_id.get_mut(&channel_id).and_then(
+ |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None }
+ ) {
+ None => None,
+ Some(chan) => Some(callback(chan)),
+ }
+ }
+
+ fn can_forward_htlc(
+ &self, msg: &msgs::UpdateAddHTLC, next_packet_details: &NextPacketDetails
+ ) -> Result<(), (&'static str, u16, Option<msgs::ChannelUpdate>)> {
+ match self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel<SP>| {
+ self.can_forward_htlc_to_outgoing_channel(chan, msg, next_packet_details)
+ }) {
+ Some(Ok(())) => {},
+ Some(Err(e)) => return Err(e),
+ None => {
+ // If we couldn't find the channel info for the scid, it may be a phantom or
+ // intercept forward.
+ if (self.default_configuration.accept_intercept_htlcs &&
+ fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)) ||
+ fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)
+ {} else {
+ return Err(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
+ }
+ }
+ }
+
+ let cur_height = self.best_block.read().unwrap().height + 1;
+ if let Err((err_msg, err_code)) = check_incoming_htlc_cltv(
+ cur_height, next_packet_details.outgoing_cltv_value, msg.cltv_expiry
+ ) {
+ let chan_update_opt = self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel<SP>| {
+ self.get_channel_update_for_onion(next_packet_details.outgoing_scid, chan).ok()
+ }).flatten();
+ return Err((err_msg, err_code, chan_update_opt));
+ }
+
+ Ok(())
}
fn htlc_failure_from_update_add_err(
shared_secret: &[u8; 32]
) -> HTLCFailureMsg {
let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2));
- if let Some(chan_update) = chan_update {
+ if chan_update.is_some() && err_code & 0x1000 == 0x1000 {
+ let chan_update = chan_update.unwrap();
if err_code == 0x1000 | 11 || err_code == 0x1000 | 12 {
msg.amount_msat.write(&mut res).expect("Writes cannot fail");
}
// Perform outbound checks here instead of in [`Self::construct_pending_htlc_info`] because we
// can't hold the outbound peer state lock at the same time as the inbound peer state lock.
- if let Some((err, code, chan_update)) = loop {
- let id_option = self.short_to_chan_info.read().unwrap().get(&next_packet_details.outgoing_scid).cloned();
- let forwarding_chan_info_opt = match id_option {
- None => { // unknown_next_peer
- // Note that this is likely a timing oracle for detecting whether an scid is a
- // phantom or an intercept.
- if (self.default_configuration.accept_intercept_htlcs &&
- fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)) ||
- fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)
- {
- None
- } else {
- break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
- }
- },
- Some((cp_id, id)) => Some((cp_id.clone(), id.clone())),
- };
- let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt {
- let per_peer_state = self.per_peer_state.read().unwrap();
- let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
- if peer_state_mutex_opt.is_none() {
- break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
- }
- let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
- let peer_state = &mut *peer_state_lock;
- let chan = match peer_state.channel_by_id.get_mut(&forwarding_id).map(
- |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None }
- ).flatten() {
- None => {
- // Channel was removed. The short_to_chan_info and channel_by_id maps
- // have no consistency guarantees.
- break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None));
- },
- Some(chan) => chan
- };
- match self.can_forward_htlc_to_outgoing_channel(chan, msg, &next_packet_details) {
- Ok(chan_update_opt) => chan_update_opt,
- Err(e) => break Some(e),
- }
- } else {
- None
- };
-
- let cur_height = self.best_block.read().unwrap().height + 1;
-
- if let Err((err_msg, code)) = check_incoming_htlc_cltv(
- cur_height, next_packet_details.outgoing_cltv_value, msg.cltv_expiry
- ) {
- if code & 0x1000 != 0 && chan_update_opt.is_none() {
- // We really should set `incorrect_cltv_expiry` here but as we're not
- // forwarding over a real channel we can't generate a channel_update
- // for it. Instead we just return a generic temporary_node_failure.
- break Some((err_msg, 0x2000 | 2, None))
- }
- let chan_update_opt = if code & 0x1000 != 0 { chan_update_opt } else { None };
- break Some((err_msg, code, chan_update_opt));
- }
+ self.can_forward_htlc(&msg, &next_packet_details).map_err(|e| {
+ let (err_msg, err_code, chan_update_opt) = e;
+ self.htlc_failure_from_update_add_err(
+ msg, counterparty_node_id, err_msg, err_code, chan_update_opt,
+ next_hop.is_intro_node_blinded_forward(), &shared_secret
+ )
+ })?;
- break None;
- }
- {
- return Err(self.htlc_failure_from_update_add_err(
- msg, counterparty_node_id, err, code, chan_update, next_hop.is_intro_node_blinded_forward(), &shared_secret
- ));
- }
Ok((next_hop, shared_secret, Some(next_packet_details.next_packet_pubkey)))
}
Ok(())
}
+ fn process_pending_update_add_htlcs(&self) {
+ let mut decode_update_add_htlcs = new_hash_map();
+ mem::swap(&mut decode_update_add_htlcs, &mut self.decode_update_add_htlcs.lock().unwrap());
+
+ let get_failed_htlc_destination = |outgoing_scid_opt: Option<u64>, payment_hash: PaymentHash| {
+ if let Some(outgoing_scid) = outgoing_scid_opt {
+ match self.short_to_chan_info.read().unwrap().get(&outgoing_scid) {
+ Some((outgoing_counterparty_node_id, outgoing_channel_id)) =>
+ HTLCDestination::NextHopChannel {
+ node_id: Some(*outgoing_counterparty_node_id),
+ channel_id: *outgoing_channel_id,
+ },
+ None => HTLCDestination::UnknownNextHop {
+ requested_forward_scid: outgoing_scid,
+ },
+ }
+ } else {
+ HTLCDestination::FailedPayment { payment_hash }
+ }
+ };
+
+ 'outer_loop: for (incoming_scid, update_add_htlcs) in decode_update_add_htlcs {
+ let incoming_channel_details_opt = self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel<SP>| {
+ let counterparty_node_id = chan.context.get_counterparty_node_id();
+ let channel_id = chan.context.channel_id();
+ let funding_txo = chan.context.get_funding_txo().unwrap();
+ let user_channel_id = chan.context.get_user_id();
+ let accept_underpaying_htlcs = chan.context.config().accept_underpaying_htlcs;
+ (counterparty_node_id, channel_id, funding_txo, user_channel_id, accept_underpaying_htlcs)
+ });
+ let (
+ incoming_counterparty_node_id, incoming_channel_id, incoming_funding_txo,
+ incoming_user_channel_id, incoming_accept_underpaying_htlcs
+ ) = if let Some(incoming_channel_details) = incoming_channel_details_opt {
+ incoming_channel_details
+ } else {
+ // The incoming channel no longer exists, HTLCs should be resolved onchain instead.
+ continue;
+ };
+
+ let mut htlc_forwards = Vec::new();
+ let mut htlc_fails = Vec::new();
+ for update_add_htlc in &update_add_htlcs {
+ let (next_hop, shared_secret, next_packet_details_opt) = match decode_incoming_update_add_htlc_onion(
+ &update_add_htlc, &self.node_signer, &self.logger, &self.secp_ctx
+ ) {
+ Ok(decoded_onion) => decoded_onion,
+ Err(htlc_fail) => {
+ htlc_fails.push((htlc_fail, HTLCDestination::InvalidOnion));
+ continue;
+ },
+ };
+
+ let is_intro_node_blinded_forward = next_hop.is_intro_node_blinded_forward();
+ let outgoing_scid_opt = next_packet_details_opt.as_ref().map(|d| d.outgoing_scid);
+
+ // Process the HTLC on the incoming channel.
+ match self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel<SP>| {
+ let logger = WithChannelContext::from(&self.logger, &chan.context);
+ chan.can_accept_incoming_htlc(
+ update_add_htlc, &self.fee_estimator, &logger,
+ )
+ }) {
+ Some(Ok(_)) => {},
+ Some(Err((err, code))) => {
+ let outgoing_chan_update_opt = if let Some(outgoing_scid) = outgoing_scid_opt.as_ref() {
+ self.do_funded_channel_callback(*outgoing_scid, |chan: &mut Channel<SP>| {
+ self.get_channel_update_for_onion(*outgoing_scid, chan).ok()
+ }).flatten()
+ } else {
+ None
+ };
+ let htlc_fail = self.htlc_failure_from_update_add_err(
+ &update_add_htlc, &incoming_counterparty_node_id, err, code,
+ outgoing_chan_update_opt, is_intro_node_blinded_forward, &shared_secret,
+ );
+ let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+ htlc_fails.push((htlc_fail, htlc_destination));
+ continue;
+ },
+ // The incoming channel no longer exists, HTLCs should be resolved onchain instead.
+ None => continue 'outer_loop,
+ }
+
+ // Now process the HTLC on the outgoing channel if it's a forward.
+ if let Some(next_packet_details) = next_packet_details_opt.as_ref() {
+ if let Err((err, code, chan_update_opt)) = self.can_forward_htlc(
+ &update_add_htlc, next_packet_details
+ ) {
+ let htlc_fail = self.htlc_failure_from_update_add_err(
+ &update_add_htlc, &incoming_counterparty_node_id, err, code,
+ chan_update_opt, is_intro_node_blinded_forward, &shared_secret,
+ );
+ let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+ htlc_fails.push((htlc_fail, htlc_destination));
+ continue;
+ }
+ }
+
+ match self.construct_pending_htlc_status(
+ &update_add_htlc, &incoming_counterparty_node_id, shared_secret, next_hop,
+ incoming_accept_underpaying_htlcs, next_packet_details_opt.map(|d| d.next_packet_pubkey),
+ ) {
+ PendingHTLCStatus::Forward(htlc_forward) => {
+ htlc_forwards.push((htlc_forward, update_add_htlc.htlc_id));
+ },
+ PendingHTLCStatus::Fail(htlc_fail) => {
+ let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+ htlc_fails.push((htlc_fail, htlc_destination));
+ },
+ }
+ }
+
+ // Process all of the forwards and failures for the channel in which the HTLCs were
+ // proposed to as a batch.
+ let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id,
+ incoming_user_channel_id, htlc_forwards.drain(..).collect());
+ self.forward_htlcs_without_forward_event(&mut [pending_forwards]);
+ for (htlc_fail, htlc_destination) in htlc_fails.drain(..) {
+ let failure = match htlc_fail {
+ HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC {
+ htlc_id: fail_htlc.htlc_id,
+ err_packet: fail_htlc.reason,
+ },
+ HTLCFailureMsg::Malformed(fail_malformed_htlc) => HTLCForwardInfo::FailMalformedHTLC {
+ htlc_id: fail_malformed_htlc.htlc_id,
+ sha256_of_onion: fail_malformed_htlc.sha256_of_onion,
+ failure_code: fail_malformed_htlc.failure_code,
+ },
+ };
+ self.forward_htlcs.lock().unwrap().entry(incoming_scid).or_insert(vec![]).push(failure);
+ self.pending_events.lock().unwrap().push_back((events::Event::HTLCHandlingFailed {
+ prev_channel_id: incoming_channel_id,
+ failed_next_destination: htlc_destination,
+ }, None));
+ }
+ }
+ }
+
/// Processes HTLCs which are pending waiting on random forward delay.
///
/// Should only really ever be called in response to a PendingHTLCsForwardable event.
pub fn process_pending_htlc_forwards(&self) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
+ self.process_pending_update_add_htlcs();
+
let mut new_events = VecDeque::new();
let mut failed_forwards = Vec::new();
let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
}
}
+ fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
+ let push_forward_event = self.fail_htlc_backwards_internal_without_forward_event(source, payment_hash, onion_error, destination);
+ if push_forward_event { self.push_pending_forwards_ev(); }
+ }
+
/// Fails an HTLC backwards to the sender of it to us.
/// Note that we do not assume that channels corresponding to failed HTLCs are still available.
- fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) {
+ fn fail_htlc_backwards_internal_without_forward_event(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) -> bool {
// Ensure that no peer state channel storage lock is held when calling this function.
// This ensures that future code doesn't introduce a lock-order requirement for
// `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling
// Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
// from block_connected which may run during initialization prior to the chain_monitor
// being fully configured. See the docs for `ChannelManagerReadArgs` for more.
+ let mut push_forward_event;
match source {
HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, .. } => {
- if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
+ push_forward_event = self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
session_priv, payment_id, self.probing_cookie_secret, &self.secp_ctx,
- &self.pending_events, &self.logger)
- { self.push_pending_forwards_ev(); }
+ &self.pending_events, &self.logger);
},
HTLCSource::PreviousHopData(HTLCPreviousHopData {
ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret,
}
};
- let mut push_forward_ev = false;
+ push_forward_event = self.decode_update_add_htlcs.lock().unwrap().is_empty();
let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
- if forward_htlcs.is_empty() {
- push_forward_ev = true;
- }
+ push_forward_event &= forward_htlcs.is_empty();
match forward_htlcs.entry(*short_channel_id) {
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().push(failure);
}
}
mem::drop(forward_htlcs);
- if push_forward_ev { self.push_pending_forwards_ev(); }
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push_back((events::Event::HTLCHandlingFailed {
prev_channel_id: *channel_id,
}, None));
},
}
+ push_forward_event
}
/// Provides a payment preimage in response to [`Event::PaymentClaimable`], generating any
match peer_state.channel_by_id.entry(msg.channel_id) {
hash_map::Entry::Occupied(mut chan_phase_entry) => {
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
- let pending_forward_info = match decoded_hop_res {
+ let mut pending_forward_info = match decoded_hop_res {
Ok((next_hop, shared_secret, next_packet_pk_opt)) =>
self.construct_pending_htlc_status(
msg, counterparty_node_id, shared_secret, next_hop,
),
Err(e) => PendingHTLCStatus::Fail(e)
};
- let create_pending_htlc_status = |chan: &Channel<SP>, pending_forward_info: PendingHTLCStatus, error_code: u16| {
+ let logger = WithChannelContext::from(&self.logger, &chan.context);
+ // If the update_add is completely bogus, the call will Err and we will close,
+ // but if we've sent a shutdown and they haven't acknowledged it yet, we just
+ // want to reject the new HTLC and fail it backwards instead of forwarding.
+ if let Err((_, error_code)) = chan.can_accept_incoming_htlc(&msg, &self.fee_estimator, &logger) {
if msg.blinding_point.is_some() {
- return PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(
- msgs::UpdateFailMalformedHTLC {
- channel_id: msg.channel_id,
- htlc_id: msg.htlc_id,
- sha256_of_onion: [0; 32],
- failure_code: INVALID_ONION_BLINDING,
- }
- ))
- }
- // If the update_add is completely bogus, the call will Err and we will close,
- // but if we've sent a shutdown and they haven't acknowledged it yet, we just
- // want to reject the new HTLC and fail it backwards instead of forwarding.
- match pending_forward_info {
- PendingHTLCStatus::Forward(PendingHTLCInfo {
- ref incoming_shared_secret, ref routing, ..
- }) => {
- let reason = if routing.blinded_failure().is_some() {
- HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32])
- } else if (error_code & 0x1000) != 0 {
- let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan);
- HTLCFailReason::reason(real_code, error_data)
- } else {
- HTLCFailReason::from_failure_code(error_code)
- }.get_encrypted_failure_packet(incoming_shared_secret, &None);
- let msg = msgs::UpdateFailHTLC {
+ pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed(
+ msgs::UpdateFailMalformedHTLC {
channel_id: msg.channel_id,
htlc_id: msg.htlc_id,
- reason
- };
- PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg))
- },
- _ => pending_forward_info
+ sha256_of_onion: [0; 32],
+ failure_code: INVALID_ONION_BLINDING,
+ }
+ ))
+ } else {
+ match pending_forward_info {
+ PendingHTLCStatus::Forward(PendingHTLCInfo {
+ ref incoming_shared_secret, ref routing, ..
+ }) => {
+ let reason = if routing.blinded_failure().is_some() {
+ HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32])
+ } else if (error_code & 0x1000) != 0 {
+ let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan);
+ HTLCFailReason::reason(real_code, error_data)
+ } else {
+ HTLCFailReason::from_failure_code(error_code)
+ }.get_encrypted_failure_packet(incoming_shared_secret, &None);
+ let msg = msgs::UpdateFailHTLC {
+ channel_id: msg.channel_id,
+ htlc_id: msg.htlc_id,
+ reason
+ };
+ pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg));
+ },
+ _ => {},
+ }
}
- };
- let logger = WithChannelContext::from(&self.logger, &chan.context);
- try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &&logger), chan_phase_entry);
+ }
+ try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info), chan_phase_entry);
} else {
return try_chan_phase_entry!(self, Err(ChannelError::Close(
"Got an update_add_htlc message for an unfunded channel!".into())), chan_phase_entry);
}
fn push_decode_update_add_htlcs(&self, mut update_add_htlcs: (u64, Vec<msgs::UpdateAddHTLC>)) {
+ let mut push_forward_event = self.forward_htlcs.lock().unwrap().is_empty();
let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
+ push_forward_event &= decode_update_add_htlcs.is_empty();
let scid = update_add_htlcs.0;
match decode_update_add_htlcs.entry(scid) {
hash_map::Entry::Occupied(mut e) => { e.get_mut().append(&mut update_add_htlcs.1); },
hash_map::Entry::Vacant(e) => { e.insert(update_add_htlcs.1); },
}
+ if push_forward_event { self.push_pending_forwards_ev(); }
}
#[inline]
fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
+ let push_forward_event = self.forward_htlcs_without_forward_event(per_source_pending_forwards);
+ if push_forward_event { self.push_pending_forwards_ev() }
+ }
+
+ #[inline]
+ fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool {
+ let mut push_forward_event = false;
for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
- let mut push_forward_event = false;
let mut new_intercept_events = VecDeque::new();
let mut failed_intercept_forwards = Vec::new();
if !pending_forwards.is_empty() {
// Pull this now to avoid introducing a lock order with `forward_htlcs`.
let is_our_scid = self.short_to_chan_info.read().unwrap().contains_key(&scid);
+ let decode_update_add_htlcs_empty = self.decode_update_add_htlcs.lock().unwrap().is_empty();
let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
let forward_htlcs_empty = forward_htlcs.is_empty();
match forward_htlcs.entry(scid) {
} else {
// We don't want to generate a PendingHTLCsForwardable event if only intercepted
// payments are being processed.
- if forward_htlcs_empty {
- push_forward_event = true;
- }
+ push_forward_event |= forward_htlcs_empty && decode_update_add_htlcs_empty;
entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info })));
}
}
for (htlc_source, payment_hash, failure_reason, destination) in failed_intercept_forwards.drain(..) {
- self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination);
+ push_forward_event |= self.fail_htlc_backwards_internal_without_forward_event(&htlc_source, &payment_hash, &failure_reason, destination);
}
if !new_intercept_events.is_empty() {
let mut events = self.pending_events.lock().unwrap();
events.append(&mut new_intercept_events);
}
- if push_forward_event { self.push_pending_forwards_ev() }
}
+ push_forward_event
}
fn push_pending_forwards_ev(&self) {
(13, claimable_htlc_onion_fields, optional_vec),
(14, decode_update_add_htlcs, option),
});
- let decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
+ let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
if fake_scid_rand_bytes.is_none() {
fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
}
// still have an entry for this HTLC in `forward_htlcs` or
// `pending_intercepted_htlcs`, we were apparently not persisted after
// the monitor was when forwarding the payment.
+ decode_update_add_htlcs.retain(|scid, update_add_htlcs| {
+ update_add_htlcs.retain(|update_add_htlc| {
+ let matches = *scid == prev_hop_data.short_channel_id &&
+ update_add_htlc.htlc_id == prev_hop_data.htlc_id;
+ if matches {
+ log_info!(logger, "Removing pending to-decode HTLC with hash {} as it was forwarded to the closed channel {}",
+ &htlc.payment_hash, &monitor.channel_id());
+ }
+ !matches
+ });
+ !update_add_htlcs.is_empty()
+ });
forward_htlcs.retain(|_, forwards| {
forwards.retain(|forward| {
if let HTLCForwardInfo::AddHTLC(htlc_info) = forward {
}
}
- if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() {
+ if !forward_htlcs.is_empty() || !decode_update_add_htlcs.is_empty() || pending_outbounds.needs_abandon() {
// If we have pending HTLCs to forward, assume we either dropped a
// `PendingHTLCsForwardable` or the user received it but never processed it as they
// shut down before the timer hit. Either way, set the time_forwardable to a small