X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=11d0b299efef3a35d0d8872d5a016ad807498838;hb=fb3a86f498f971dc01fdd58b166031793ff1bc1a;hp=78fa47e9272cc735dbe709c1d8cc9286d7b26c19;hpb=134d60a3e800b062f85270973c1e0702eeab077c;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 78fa47e9..11d0b299 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -58,10 +58,11 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError}; use crate::ln::outbound_payment; use crate::ln::outbound_payment::{Bolt12PaymentError, OutboundPayments, PaymentAttempts, PendingOutboundPayment, SendAlongPathArgs, StaleExpiration}; use crate::ln::wire::Encode; -use crate::offers::invoice::{BlindedPayInfo, Bolt12Invoice, DEFAULT_RELATIVE_EXPIRY, DerivedSigningPubkey, InvoiceBuilder}; +use crate::offers::invoice::{BlindedPayInfo, Bolt12Invoice, DEFAULT_RELATIVE_EXPIRY, DerivedSigningPubkey, ExplicitSigningPubkey, InvoiceBuilder, UnsignedBolt12Invoice}; use crate::offers::invoice_error::InvoiceError; +use crate::offers::invoice_request::{DerivedPayerId, InvoiceRequestBuilder}; use crate::offers::merkle::SignError; -use crate::offers::offer::{DerivedMetadata, Offer, OfferBuilder}; +use crate::offers::offer::{Offer, OfferBuilder}; use crate::offers::parse::Bolt12SemanticError; use crate::offers::refund::{Refund, RefundBuilder}; use crate::onion_message::messenger::{Destination, MessageRouter, PendingOnionMessage, new_pending_onion_message}; @@ -77,11 +78,17 @@ use crate::util::logger::{Level, Logger, WithContext}; use crate::util::errors::APIError; #[cfg(not(c_bindings))] use { + crate::offers::offer::DerivedMetadata, crate::routing::router::DefaultRouter, crate::routing::gossip::NetworkGraph, crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}, crate::sign::KeysManager, }; +#[cfg(c_bindings)] +use { + crate::offers::offer::OfferWithDerivedMetadataBuilder, + crate::offers::refund::RefundMaybeWithDerivedMetadataBuilder, +}; use alloc::collections::{btree_map, BTreeMap}; @@ -193,6 +200,8 @@ pub enum PendingHTLCRouting { /// For HTLCs received by LDK, these will ultimately bubble back up as /// [`RecipientOnionFields::custom_tlvs`]. custom_tlvs: Vec<(u64, Vec)>, + /// Set if this HTLC is the final hop in a multi-hop blinded path. + requires_blinded_error: bool, }, } @@ -214,6 +223,7 @@ impl PendingHTLCRouting { match self { Self::Forward { blinded: Some(BlindedForward { failure, .. }), .. } => Some(*failure), Self::Receive { requires_blinded_error: true, .. } => Some(BlindedFailure::FromBlindedNode), + Self::ReceiveKeysend { requires_blinded_error: true, .. } => Some(BlindedFailure::FromBlindedNode), _ => None, } } @@ -893,7 +903,7 @@ pub(super) struct PeerState where SP::Target: SignerProvider { /// The peer is currently connected (i.e. we've seen a /// [`ChannelMessageHandler::peer_connected`] and no corresponding /// [`ChannelMessageHandler::peer_disconnected`]. - is_connected: bool, + pub is_connected: bool, } impl PeerState where SP::Target: SignerProvider { @@ -1171,6 +1181,8 @@ where // | | // | |__`pending_intercepted_htlcs` // | +// |__`decode_update_add_htlcs` +// | // |__`per_peer_state` // | // |__`pending_inbound_payments` @@ -1261,6 +1273,18 @@ where /// See `ChannelManager` struct-level documentation for lock order requirements. pending_intercepted_htlcs: Mutex>, + /// SCID/SCID Alias -> pending `update_add_htlc`s to decode. + /// + /// Note that because we may have an SCID Alias as the key we can have two entries per channel, + /// though in practice we probably won't be receiving HTLCs for a channel both via the alias + /// and via the classic SCID. + /// + /// Note that no consistency guarantees are made about the existence of a channel with the + /// `short_channel_id` here, nor the `channel_id` in `UpdateAddHTLC`! + /// + /// See `ChannelManager` struct-level documentation for lock order requirements. + decode_update_add_htlcs: Mutex>>, + /// The sets of payments which are claimable or currently being claimed. See /// [`ClaimablePayments`]' individual field docs for more info. /// @@ -1404,6 +1428,9 @@ where pending_offers_messages: Mutex>>, + /// Tracks the message events that are to be broadcasted when we are connected to some peer. + pending_broadcast_messages: Mutex>, + entropy_source: ES, node_signer: NS, signer_provider: SP, @@ -1995,7 +2022,7 @@ macro_rules! handle_error { match $internal { Ok(msg) => Ok(msg), Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => { - let mut msg_events = Vec::with_capacity(2); + let mut msg_event = None; if let Some((shutdown_res, update_option)) = shutdown_finish { let counterparty_node_id = shutdown_res.counterparty_node_id; @@ -2007,7 +2034,8 @@ macro_rules! handle_error { $self.finish_close_channel(shutdown_res); if let Some(update) = update_option { - msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -2017,17 +2045,17 @@ macro_rules! handle_error { if let msgs::ErrorAction::IgnoreError = err.action { } else { - msg_events.push(events::MessageSendEvent::HandleError { + msg_event = Some(events::MessageSendEvent::HandleError { node_id: $counterparty_node_id, action: err.action.clone() }); } - if !msg_events.is_empty() { + if let Some(msg_event) = msg_event { let per_peer_state = $self.per_peer_state.read().unwrap(); if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) { let mut peer_state = peer_state_mutex.lock().unwrap(); - peer_state.pending_msg_events.append(&mut msg_events); + peer_state.pending_msg_events.push(msg_event); } } @@ -2209,7 +2237,7 @@ macro_rules! handle_monitor_update_completion { let logger = WithChannelContext::from(&$self.logger, &$chan.context); let mut updates = $chan.monitor_updating_restored(&&logger, &$self.node_signer, $self.chain_hash, &$self.default_configuration, - $self.best_block.read().unwrap().height()); + $self.best_block.read().unwrap().height); let counterparty_node_id = $chan.context.get_counterparty_node_id(); let channel_update = if updates.channel_ready.is_some() && $chan.context.is_usable() { // We only send a channel_update in the case where we are just now sending a @@ -2228,9 +2256,9 @@ macro_rules! handle_monitor_update_completion { let update_actions = $peer_state.monitor_update_blocked_actions .remove(&$chan.context.channel_id()).unwrap_or(Vec::new()); - let htlc_forwards = $self.handle_channel_resumption( + let (htlc_forwards, decode_update_add_htlcs) = $self.handle_channel_resumption( &mut $peer_state.pending_msg_events, $chan, updates.raa, - updates.commitment_update, updates.order, updates.accepted_htlcs, + updates.commitment_update, updates.order, updates.accepted_htlcs, updates.pending_update_adds, updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs); if let Some(upd) = channel_update { @@ -2291,6 +2319,9 @@ macro_rules! handle_monitor_update_completion { if let Some(forwards) = htlc_forwards { $self.forward_htlcs(&mut [forwards][..]); } + if let Some(decode) = decode_update_add_htlcs { + $self.push_decode_update_add_htlcs(decode); + } $self.finalize_claims(updates.finalized_claimed_htlcs); for failure in updates.failed_htlcs.drain(..) { let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; @@ -2467,6 +2498,7 @@ where pending_inbound_payments: Mutex::new(new_hash_map()), pending_outbound_payments: OutboundPayments::new(), forward_htlcs: Mutex::new(new_hash_map()), + decode_update_add_htlcs: Mutex::new(new_hash_map()), claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: new_hash_map(), pending_claiming_payments: new_hash_map() }), pending_intercepted_htlcs: Mutex::new(new_hash_map()), outpoint_to_peer: Mutex::new(new_hash_map()), @@ -2494,6 +2526,7 @@ where funding_batch_states: Mutex::new(BTreeMap::new()), pending_offers_messages: Mutex::new(Vec::new()), + pending_broadcast_messages: Mutex::new(Vec::new()), entropy_source, node_signer, @@ -2509,7 +2542,7 @@ where } fn create_and_insert_outbound_scid_alias(&self) -> u64 { - let height = self.best_block.read().unwrap().height(); + let height = self.best_block.read().unwrap().height; let mut outbound_scid_alias = 0; let mut i = 0; loop { @@ -2587,7 +2620,7 @@ where let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration }; match OutboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, their_network_key, their_features, channel_value_satoshis, push_msat, user_channel_id, config, - self.best_block.read().unwrap().height(), outbound_scid_alias, temporary_channel_id) + self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id) { Ok(res) => res, Err(e) => { @@ -2626,7 +2659,7 @@ where // the same channel. let mut res = Vec::with_capacity(self.short_to_chan_info.read().unwrap().len()); { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); @@ -2659,7 +2692,7 @@ where // the same channel. let mut res = Vec::with_capacity(self.short_to_chan_info.read().unwrap().len()); { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { let mut peer_state_lock = peer_state_mutex.lock().unwrap(); @@ -2689,7 +2722,7 @@ where /// Gets the list of channels we have with a given counterparty, in random order. pub fn list_channels_with_counterparty(&self, counterparty_node_id: &PublicKey) -> Vec { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { @@ -2992,17 +3025,11 @@ where } }; if let Some(update) = update_opt { - // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if - // not try to broadcast it via whatever peer we have. - let per_peer_state = self.per_peer_state.read().unwrap(); - let a_peer_state_opt = per_peer_state.get(peer_node_id) - .ok_or(per_peer_state.values().next()); - if let Ok(a_peer_state_mutex) = a_peer_state_opt { - let mut a_peer_state = a_peer_state_mutex.lock().unwrap(); - a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } + // If we have some Channel Update to broadcast, we cache it and broadcast it later. + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); } Ok(counterparty_node_id) @@ -3066,6 +3093,163 @@ where } } + fn can_forward_htlc_to_outgoing_channel( + &self, chan: &mut Channel, msg: &msgs::UpdateAddHTLC, next_packet: &NextPacketDetails + ) -> Result<(), (&'static str, u16, Option)> { + if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels { + // Note that the behavior here should be identical to the above block - we + // should NOT reveal the existence or non-existence of a private channel if + // we don't allow forwards outbound over them. + return Err(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None)); + } + if chan.context.get_channel_type().supports_scid_privacy() && next_packet.outgoing_scid != chan.context.outbound_scid_alias() { + // `option_scid_alias` (referred to in LDK as `scid_privacy`) means + // "refuse to forward unless the SCID alias was used", so we pretend + // we don't have the channel here. + return Err(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None)); + } + + // Note that we could technically not return an error yet here and just hope + // that the connection is reestablished or monitor updated by the time we get + // around to doing the actual forward, but better to fail early if we can and + // hopefully an attacker trying to path-trace payments cannot make this occur + // on a small/per-node/per-channel scale. + if !chan.context.is_live() { // channel_disabled + // If the channel_update we're going to return is disabled (i.e. the + // peer has been disabled for some time), return `channel_disabled`, + // otherwise return `temporary_channel_failure`. + let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok(); + if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { + return Err(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); + } else { + return Err(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); + } + } + if next_packet.outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum + let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok(); + return Err(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt)); + } + if let Err((err, code)) = chan.htlc_satisfies_config(msg, next_packet.outgoing_amt_msat, next_packet.outgoing_cltv_value) { + let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok(); + return Err((err, code, chan_update_opt)); + } + + Ok(()) + } + + /// Executes a callback `C` that returns some value `X` on the channel found with the given + /// `scid`. `None` is returned when the channel is not found. + fn do_funded_channel_callback) -> X>( + &self, scid: u64, callback: C, + ) -> Option { + let (counterparty_node_id, channel_id) = match self.short_to_chan_info.read().unwrap().get(&scid).cloned() { + None => return None, + Some((cp_id, id)) => (cp_id, id), + }; + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); + if peer_state_mutex_opt.is_none() { + return None; + } + let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.get_mut(&channel_id).and_then( + |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None } + ) { + None => None, + Some(chan) => Some(callback(chan)), + } + } + + fn can_forward_htlc( + &self, msg: &msgs::UpdateAddHTLC, next_packet_details: &NextPacketDetails + ) -> Result<(), (&'static str, u16, Option)> { + match self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel| { + self.can_forward_htlc_to_outgoing_channel(chan, msg, next_packet_details) + }) { + Some(Ok(())) => {}, + Some(Err(e)) => return Err(e), + None => { + // If we couldn't find the channel info for the scid, it may be a phantom or + // intercept forward. + if (self.default_configuration.accept_intercept_htlcs && + fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash)) || + fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, next_packet_details.outgoing_scid, &self.chain_hash) + {} else { + return Err(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); + } + } + } + + let cur_height = self.best_block.read().unwrap().height + 1; + if let Err((err_msg, err_code)) = check_incoming_htlc_cltv( + cur_height, next_packet_details.outgoing_cltv_value, msg.cltv_expiry + ) { + let chan_update_opt = self.do_funded_channel_callback(next_packet_details.outgoing_scid, |chan: &mut Channel| { + self.get_channel_update_for_onion(next_packet_details.outgoing_scid, chan).ok() + }).flatten(); + return Err((err_msg, err_code, chan_update_opt)); + } + + Ok(()) + } + + fn htlc_failure_from_update_add_err( + &self, msg: &msgs::UpdateAddHTLC, counterparty_node_id: &PublicKey, err_msg: &'static str, + mut err_code: u16, chan_update: Option, is_intro_node_blinded_forward: bool, + shared_secret: &[u8; 32] + ) -> HTLCFailureMsg { + let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2)); + if chan_update.is_some() && err_code & 0x1000 == 0x1000 { + let chan_update = chan_update.unwrap(); + if err_code == 0x1000 | 11 || err_code == 0x1000 | 12 { + msg.amount_msat.write(&mut res).expect("Writes cannot fail"); + } + else if err_code == 0x1000 | 13 { + msg.cltv_expiry.write(&mut res).expect("Writes cannot fail"); + } + else if err_code == 0x1000 | 20 { + // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791 + 0u16.write(&mut res).expect("Writes cannot fail"); + } + (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail"); + msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail"); + chan_update.write(&mut res).expect("Writes cannot fail"); + } else if err_code & 0x1000 == 0x1000 { + // If we're trying to return an error that requires a `channel_update` but + // we're forwarding to a phantom or intercept "channel" (i.e. cannot + // generate an update), just use the generic "temporary_node_failure" + // instead. + err_code = 0x2000 | 2; + } + + log_info!( + WithContext::from(&self.logger, Some(*counterparty_node_id), Some(msg.channel_id)), + "Failed to accept/forward incoming HTLC: {}", err_msg + ); + // If `msg.blinding_point` is set, we must always fail with malformed. + if msg.blinding_point.is_some() { + return HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC { + channel_id: msg.channel_id, + htlc_id: msg.htlc_id, + sha256_of_onion: [0; 32], + failure_code: INVALID_ONION_BLINDING, + }); + } + + let (err_code, err_data) = if is_intro_node_blinded_forward { + (INVALID_ONION_BLINDING, &[0; 32][..]) + } else { + (err_code, &res.0[..]) + }; + HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { + channel_id: msg.channel_id, + htlc_id: msg.htlc_id, + reason: HTLCFailReason::reason(err_code, err_data.to_vec()) + .get_encrypted_failure_packet(shared_secret, &None), + }) + } + fn decode_update_add_htlc_onion( &self, msg: &msgs::UpdateAddHTLC, counterparty_node_id: &PublicKey, ) -> Result< @@ -3075,48 +3259,7 @@ where msg, &self.node_signer, &self.logger, &self.secp_ctx )?; - let is_intro_node_forward = match next_hop { - onion_utils::Hop::Forward { - next_hop_data: msgs::InboundOnionPayload::BlindedForward { - intro_node_blinding_point: Some(_), .. - }, .. - } => true, - _ => false, - }; - - macro_rules! return_err { - ($msg: expr, $err_code: expr, $data: expr) => { - { - log_info!( - WithContext::from(&self.logger, Some(*counterparty_node_id), Some(msg.channel_id)), - "Failed to accept/forward incoming HTLC: {}", $msg - ); - // If `msg.blinding_point` is set, we must always fail with malformed. - if msg.blinding_point.is_some() { - return Err(HTLCFailureMsg::Malformed(msgs::UpdateFailMalformedHTLC { - channel_id: msg.channel_id, - htlc_id: msg.htlc_id, - sha256_of_onion: [0; 32], - failure_code: INVALID_ONION_BLINDING, - })); - } - - let (err_code, err_data) = if is_intro_node_forward { - (INVALID_ONION_BLINDING, &[0; 32][..]) - } else { ($err_code, $data) }; - return Err(HTLCFailureMsg::Relay(msgs::UpdateFailHTLC { - channel_id: msg.channel_id, - htlc_id: msg.htlc_id, - reason: HTLCFailReason::reason(err_code, err_data.to_vec()) - .get_encrypted_failure_packet(&shared_secret, &None), - })); - } - } - } - - let NextPacketDetails { - next_packet_pubkey, outgoing_amt_msat, outgoing_scid, outgoing_cltv_value - } = match next_packet_details_opt { + let next_packet_details = match next_packet_details_opt { Some(next_packet_details) => next_packet_details, // it is a receive, so no need for outbound checks None => return Ok((next_hop, shared_secret, None)), @@ -3124,124 +3267,15 @@ where // Perform outbound checks here instead of in [`Self::construct_pending_htlc_info`] because we // can't hold the outbound peer state lock at the same time as the inbound peer state lock. - if let Some((err, mut code, chan_update)) = loop { - let id_option = self.short_to_chan_info.read().unwrap().get(&outgoing_scid).cloned(); - let forwarding_chan_info_opt = match id_option { - None => { // unknown_next_peer - // Note that this is likely a timing oracle for detecting whether an scid is a - // phantom or an intercept. - if (self.default_configuration.accept_intercept_htlcs && - fake_scid::is_valid_intercept(&self.fake_scid_rand_bytes, outgoing_scid, &self.chain_hash)) || - fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, outgoing_scid, &self.chain_hash) - { - None - } else { - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - } - }, - Some((cp_id, id)) => Some((cp_id.clone(), id.clone())), - }; - let chan_update_opt = if let Some((counterparty_node_id, forwarding_id)) = forwarding_chan_info_opt { - let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); - if peer_state_mutex_opt.is_none() { - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - } - let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let chan = match peer_state.channel_by_id.get_mut(&forwarding_id).map( - |chan_phase| if let ChannelPhase::Funded(chan) = chan_phase { Some(chan) } else { None } - ).flatten() { - None => { - // Channel was removed. The short_to_chan_info and channel_by_id maps - // have no consistency guarantees. - break Some(("Don't have available channel for forwarding as requested.", 0x4000 | 10, None)); - }, - Some(chan) => chan - }; - if !chan.context.should_announce() && !self.default_configuration.accept_forwards_to_priv_channels { - // Note that the behavior here should be identical to the above block - we - // should NOT reveal the existence or non-existence of a private channel if - // we don't allow forwards outbound over them. - break Some(("Refusing to forward to a private channel based on our config.", 0x4000 | 10, None)); - } - if chan.context.get_channel_type().supports_scid_privacy() && outgoing_scid != chan.context.outbound_scid_alias() { - // `option_scid_alias` (referred to in LDK as `scid_privacy`) means - // "refuse to forward unless the SCID alias was used", so we pretend - // we don't have the channel here. - break Some(("Refusing to forward over real channel SCID as our counterparty requested.", 0x4000 | 10, None)); - } - let chan_update_opt = self.get_channel_update_for_onion(outgoing_scid, chan).ok(); - - // Note that we could technically not return an error yet here and just hope - // that the connection is reestablished or monitor updated by the time we get - // around to doing the actual forward, but better to fail early if we can and - // hopefully an attacker trying to path-trace payments cannot make this occur - // on a small/per-node/per-channel scale. - if !chan.context.is_live() { // channel_disabled - // If the channel_update we're going to return is disabled (i.e. the - // peer has been disabled for some time), return `channel_disabled`, - // otherwise return `temporary_channel_failure`. - if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { - break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); - } else { - break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); - } - } - if outgoing_amt_msat < chan.context.get_counterparty_htlc_minimum_msat() { // amount_below_minimum - break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt)); - } - if let Err((err, code)) = chan.htlc_satisfies_config(&msg, outgoing_amt_msat, outgoing_cltv_value) { - break Some((err, code, chan_update_opt)); - } - chan_update_opt - } else { - None - }; - - let cur_height = self.best_block.read().unwrap().height() + 1; - - if let Err((err_msg, code)) = check_incoming_htlc_cltv( - cur_height, outgoing_cltv_value, msg.cltv_expiry - ) { - if code & 0x1000 != 0 && chan_update_opt.is_none() { - // We really should set `incorrect_cltv_expiry` here but as we're not - // forwarding over a real channel we can't generate a channel_update - // for it. Instead we just return a generic temporary_node_failure. - break Some((err_msg, 0x2000 | 2, None)) - } - let chan_update_opt = if code & 0x1000 != 0 { chan_update_opt } else { None }; - break Some((err_msg, code, chan_update_opt)); - } + self.can_forward_htlc(&msg, &next_packet_details).map_err(|e| { + let (err_msg, err_code, chan_update_opt) = e; + self.htlc_failure_from_update_add_err( + msg, counterparty_node_id, err_msg, err_code, chan_update_opt, + next_hop.is_intro_node_blinded_forward(), &shared_secret + ) + })?; - break None; - } - { - let mut res = VecWriter(Vec::with_capacity(chan_update.serialized_length() + 2 + 8 + 2)); - if let Some(chan_update) = chan_update { - if code == 0x1000 | 11 || code == 0x1000 | 12 { - msg.amount_msat.write(&mut res).expect("Writes cannot fail"); - } - else if code == 0x1000 | 13 { - msg.cltv_expiry.write(&mut res).expect("Writes cannot fail"); - } - else if code == 0x1000 | 20 { - // TODO: underspecified, follow https://github.com/lightning/bolts/issues/791 - 0u16.write(&mut res).expect("Writes cannot fail"); - } - (chan_update.serialized_length() as u16 + 2).write(&mut res).expect("Writes cannot fail"); - msgs::ChannelUpdate::TYPE.write(&mut res).expect("Writes cannot fail"); - chan_update.write(&mut res).expect("Writes cannot fail"); - } else if code & 0x1000 == 0x1000 { - // If we're trying to return an error that requires a `channel_update` but - // we're forwarding to a phantom or intercept "channel" (i.e. cannot - // generate an update), just use the generic "temporary_node_failure" - // instead. - code = 0x2000 | 2; - } - return_err!(err, code, &res.0[..]); - } - Ok((next_hop, shared_secret, Some(next_packet_pubkey))) + Ok((next_hop, shared_secret, Some(next_packet_details.next_packet_pubkey))) } fn construct_pending_htlc_status<'a>( @@ -3276,7 +3310,7 @@ where match decoded_hop { onion_utils::Hop::Receive(next_hop_data) => { // OUR PAYMENT! - let current_height: u32 = self.best_block.read().unwrap().height(); + let current_height: u32 = self.best_block.read().unwrap().height; match create_recv_pending_htlc_info(next_hop_data, shared_secret, msg.payment_hash, msg.amount_msat, msg.cltv_expiry, None, allow_underpay, msg.skimmed_fee_msat, current_height, self.default_configuration.accept_mpp_keysend) @@ -3536,7 +3570,7 @@ where /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress pub fn send_payment_with_route(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId) -> Result<(), PaymentSendFailure> { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments .send_payment_with_route(route, payment_hash, recipient_onion, payment_id, @@ -3547,7 +3581,7 @@ where /// Similar to [`ChannelManager::send_payment_with_route`], but will automatically find a route based on /// `route_params` and retry failed payment paths based on `retry_strategy`. pub fn send_payment(&self, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), RetryableSendFailure> { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments .send_payment(payment_hash, recipient_onion, payment_id, retry_strategy, route_params, @@ -3558,7 +3592,7 @@ where #[cfg(test)] pub(super) fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, keysend_preimage: Option, payment_id: PaymentId, recv_value_msat: Option, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion, keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, @@ -3567,7 +3601,7 @@ where #[cfg(test)] pub(crate) fn test_add_new_pending_payment(&self, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route: &Route) -> Result, PaymentSendFailure> { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; self.pending_outbound_payments.test_add_new_pending_payment(payment_hash, recipient_onion, payment_id, route, None, &self.entropy_source, best_block_height) } @@ -3577,7 +3611,7 @@ where } pub(super) fn send_payment_for_bolt12_invoice(&self, invoice: &Bolt12Invoice, payment_id: PaymentId) -> Result<(), Bolt12PaymentError> { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments .send_payment_for_bolt12_invoice( @@ -3634,7 +3668,7 @@ where /// /// [`send_payment`]: Self::send_payment pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option, recipient_onion: RecipientOnionFields, payment_id: PaymentId) -> Result { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_spontaneous_payment_with_route( route, payment_preimage, recipient_onion, payment_id, &self.entropy_source, @@ -3649,7 +3683,7 @@ where /// /// [`PaymentParameters::for_keysend`]: crate::routing::router::PaymentParameters::for_keysend pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, recipient_onion, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), @@ -3661,7 +3695,7 @@ where /// [`PaymentHash`] of probes based on a static secret and a random [`PaymentId`], which allows /// us to easily discern them from real payments. pub fn send_probe(&self, path: Path) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height, @@ -3939,7 +3973,7 @@ where })); } { - let height = self.best_block.read().unwrap().height(); + let height = self.best_block.read().unwrap().height; // Transactions are evaluated as final by network mempools if their locktime is strictly // lower than the next block height. However, the modules constituting our Lightning // node might not have perfect sync about their blockchain views. Thus, if the wallet @@ -4078,6 +4112,7 @@ where .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; + for channel_id in channel_ids { if !peer_state.has_channel(channel_id) { return Err(APIError::ChannelUnavailable { @@ -4094,7 +4129,8 @@ where } if let ChannelPhase::Funded(channel) = channel_phase { if let Ok(msg) = self.get_channel_update_for_broadcast(channel) { - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg }); } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) { peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate { node_id: channel.context.get_counterparty_node_id(), @@ -4269,6 +4305,145 @@ where Ok(()) } + fn process_pending_update_add_htlcs(&self) { + let mut decode_update_add_htlcs = new_hash_map(); + mem::swap(&mut decode_update_add_htlcs, &mut self.decode_update_add_htlcs.lock().unwrap()); + + let get_failed_htlc_destination = |outgoing_scid_opt: Option, payment_hash: PaymentHash| { + if let Some(outgoing_scid) = outgoing_scid_opt { + match self.short_to_chan_info.read().unwrap().get(&outgoing_scid) { + Some((outgoing_counterparty_node_id, outgoing_channel_id)) => + HTLCDestination::NextHopChannel { + node_id: Some(*outgoing_counterparty_node_id), + channel_id: *outgoing_channel_id, + }, + None => HTLCDestination::UnknownNextHop { + requested_forward_scid: outgoing_scid, + }, + } + } else { + HTLCDestination::FailedPayment { payment_hash } + } + }; + + 'outer_loop: for (incoming_scid, update_add_htlcs) in decode_update_add_htlcs { + let incoming_channel_details_opt = self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel| { + let counterparty_node_id = chan.context.get_counterparty_node_id(); + let channel_id = chan.context.channel_id(); + let funding_txo = chan.context.get_funding_txo().unwrap(); + let user_channel_id = chan.context.get_user_id(); + let accept_underpaying_htlcs = chan.context.config().accept_underpaying_htlcs; + (counterparty_node_id, channel_id, funding_txo, user_channel_id, accept_underpaying_htlcs) + }); + let ( + incoming_counterparty_node_id, incoming_channel_id, incoming_funding_txo, + incoming_user_channel_id, incoming_accept_underpaying_htlcs + ) = if let Some(incoming_channel_details) = incoming_channel_details_opt { + incoming_channel_details + } else { + // The incoming channel no longer exists, HTLCs should be resolved onchain instead. + continue; + }; + + let mut htlc_forwards = Vec::new(); + let mut htlc_fails = Vec::new(); + for update_add_htlc in &update_add_htlcs { + let (next_hop, shared_secret, next_packet_details_opt) = match decode_incoming_update_add_htlc_onion( + &update_add_htlc, &self.node_signer, &self.logger, &self.secp_ctx + ) { + Ok(decoded_onion) => decoded_onion, + Err(htlc_fail) => { + htlc_fails.push((htlc_fail, HTLCDestination::InvalidOnion)); + continue; + }, + }; + + let is_intro_node_blinded_forward = next_hop.is_intro_node_blinded_forward(); + let outgoing_scid_opt = next_packet_details_opt.as_ref().map(|d| d.outgoing_scid); + + // Process the HTLC on the incoming channel. + match self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel| { + let logger = WithChannelContext::from(&self.logger, &chan.context); + chan.can_accept_incoming_htlc( + update_add_htlc, &self.fee_estimator, &logger, + ) + }) { + Some(Ok(_)) => {}, + Some(Err((err, code))) => { + let outgoing_chan_update_opt = if let Some(outgoing_scid) = outgoing_scid_opt.as_ref() { + self.do_funded_channel_callback(*outgoing_scid, |chan: &mut Channel| { + self.get_channel_update_for_onion(*outgoing_scid, chan).ok() + }).flatten() + } else { + None + }; + let htlc_fail = self.htlc_failure_from_update_add_err( + &update_add_htlc, &incoming_counterparty_node_id, err, code, + outgoing_chan_update_opt, is_intro_node_blinded_forward, &shared_secret, + ); + let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash); + htlc_fails.push((htlc_fail, htlc_destination)); + continue; + }, + // The incoming channel no longer exists, HTLCs should be resolved onchain instead. + None => continue 'outer_loop, + } + + // Now process the HTLC on the outgoing channel if it's a forward. + if let Some(next_packet_details) = next_packet_details_opt.as_ref() { + if let Err((err, code, chan_update_opt)) = self.can_forward_htlc( + &update_add_htlc, next_packet_details + ) { + let htlc_fail = self.htlc_failure_from_update_add_err( + &update_add_htlc, &incoming_counterparty_node_id, err, code, + chan_update_opt, is_intro_node_blinded_forward, &shared_secret, + ); + let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash); + htlc_fails.push((htlc_fail, htlc_destination)); + continue; + } + } + + match self.construct_pending_htlc_status( + &update_add_htlc, &incoming_counterparty_node_id, shared_secret, next_hop, + incoming_accept_underpaying_htlcs, next_packet_details_opt.map(|d| d.next_packet_pubkey), + ) { + PendingHTLCStatus::Forward(htlc_forward) => { + htlc_forwards.push((htlc_forward, update_add_htlc.htlc_id)); + }, + PendingHTLCStatus::Fail(htlc_fail) => { + let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash); + htlc_fails.push((htlc_fail, htlc_destination)); + }, + } + } + + // Process all of the forwards and failures for the channel in which the HTLCs were + // proposed to as a batch. + let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id, + incoming_user_channel_id, htlc_forwards.drain(..).collect()); + self.forward_htlcs_without_forward_event(&mut [pending_forwards]); + for (htlc_fail, htlc_destination) in htlc_fails.drain(..) { + let failure = match htlc_fail { + HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC { + htlc_id: fail_htlc.htlc_id, + err_packet: fail_htlc.reason, + }, + HTLCFailureMsg::Malformed(fail_malformed_htlc) => HTLCForwardInfo::FailMalformedHTLC { + htlc_id: fail_malformed_htlc.htlc_id, + sha256_of_onion: fail_malformed_htlc.sha256_of_onion, + failure_code: fail_malformed_htlc.failure_code, + }, + }; + self.forward_htlcs.lock().unwrap().entry(incoming_scid).or_insert(vec![]).push(failure); + self.pending_events.lock().unwrap().push_back((events::Event::HTLCHandlingFailed { + prev_channel_id: incoming_channel_id, + failed_next_destination: htlc_destination, + }, None)); + } + } + } + /// Processes HTLCs which are pending waiting on random forward delay. /// /// Should only really ever be called in response to a PendingHTLCsForwardable event. @@ -4276,6 +4451,8 @@ where pub fn process_pending_htlc_forwards(&self) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + self.process_pending_update_add_htlcs(); + let mut new_events = VecDeque::new(); let mut failed_forwards = Vec::new(); let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new(); @@ -4363,7 +4540,7 @@ where }; match next_hop { onion_utils::Hop::Receive(hop_data) => { - let current_height: u32 = self.best_block.read().unwrap().height(); + let current_height: u32 = self.best_block.read().unwrap().height; match create_recv_pending_htlc_info(hop_data, incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, Some(phantom_shared_secret), false, None, @@ -4516,7 +4693,10 @@ where (incoming_cltv_expiry, OnionPayload::Invoice { _legacy_hop_data }, Some(payment_data), phantom_shared_secret, onion_fields) }, - PendingHTLCRouting::ReceiveKeysend { payment_data, payment_preimage, payment_metadata, incoming_cltv_expiry, custom_tlvs } => { + PendingHTLCRouting::ReceiveKeysend { + payment_data, payment_preimage, payment_metadata, + incoming_cltv_expiry, custom_tlvs, requires_blinded_error: _ + } => { let onion_fields = RecipientOnionFields { payment_secret: payment_data.as_ref().map(|data| data.payment_secret), payment_metadata, @@ -4560,7 +4740,7 @@ where debug_assert!(!committed_to_claimable); let mut htlc_msat_height_data = $htlc.value.to_be_bytes().to_vec(); htlc_msat_height_data.extend_from_slice( - &self.best_block.read().unwrap().height().to_be_bytes(), + &self.best_block.read().unwrap().height.to_be_bytes(), ); failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id: $htlc.prev_hop.short_channel_id, @@ -4698,7 +4878,7 @@ where } }; if let Some(min_final_cltv_expiry_delta) = min_final_cltv_expiry_delta { - let expected_min_expiry_height = (self.current_best_block().height() + min_final_cltv_expiry_delta as u32) as u64; + let expected_min_expiry_height = (self.current_best_block().height + min_final_cltv_expiry_delta as u32) as u64; if (cltv_expiry as u64) < expected_min_expiry_height { log_trace!(self.logger, "Failing new HTLC with payment_hash {} as its CLTV expiry was too soon (had {}, earliest expected {})", &payment_hash, cltv_expiry, expected_min_expiry_height); @@ -4752,7 +4932,7 @@ where } } - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; self.pending_outbound_payments.check_retry_payments(&self.router, || self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, &self.pending_events, &self.logger, |args| self.send_payment_along_path(args)); @@ -5000,7 +5180,8 @@ where if n >= DISABLE_GOSSIP_TICKS { chan.set_channel_update_status(ChannelUpdateStatus::Disabled); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -5014,7 +5195,8 @@ where if n >= ENABLE_GOSSIP_TICKS { chan.set_channel_update_status(ChannelUpdateStatus::Enabled); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -5223,7 +5405,7 @@ where FailureCode::RequiredNodeFeatureMissing => HTLCFailReason::from_failure_code(failure_code.into()), FailureCode::IncorrectOrUnknownPaymentDetails => { let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec(); - htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height().to_be_bytes()); + htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes()); HTLCFailReason::reason(failure_code.into(), htlc_msat_height_data) }, FailureCode::InvalidOnionPayload(data) => { @@ -5317,9 +5499,14 @@ where } } + fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) { + let push_forward_event = self.fail_htlc_backwards_internal_without_forward_event(source, payment_hash, onion_error, destination); + if push_forward_event { self.push_pending_forwards_ev(); } + } + /// Fails an HTLC backwards to the sender of it to us. /// Note that we do not assume that channels corresponding to failed HTLCs are still available. - fn fail_htlc_backwards_internal(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) { + fn fail_htlc_backwards_internal_without_forward_event(&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, destination: HTLCDestination) -> bool { // Ensure that no peer state channel storage lock is held when calling this function. // This ensures that future code doesn't introduce a lock-order requirement for // `forward_htlcs` to be locked after the `per_peer_state` peer locks, which calling @@ -5337,12 +5524,12 @@ where // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called // from block_connected which may run during initialization prior to the chain_monitor // being fully configured. See the docs for `ChannelManagerReadArgs` for more. + let mut push_forward_event; match source { HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, .. } => { - if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path, + push_forward_event = self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path, session_priv, payment_id, self.probing_cookie_secret, &self.secp_ctx, - &self.pending_events, &self.logger) - { self.push_pending_forwards_ev(); } + &self.pending_events, &self.logger); }, HTLCSource::PreviousHopData(HTLCPreviousHopData { ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret, @@ -5376,11 +5563,9 @@ where } }; - let mut push_forward_ev = false; + push_forward_event = self.decode_update_add_htlcs.lock().unwrap().is_empty(); let mut forward_htlcs = self.forward_htlcs.lock().unwrap(); - if forward_htlcs.is_empty() { - push_forward_ev = true; - } + push_forward_event &= forward_htlcs.is_empty(); match forward_htlcs.entry(*short_channel_id) { hash_map::Entry::Occupied(mut entry) => { entry.get_mut().push(failure); @@ -5390,7 +5575,6 @@ where } } mem::drop(forward_htlcs); - if push_forward_ev { self.push_pending_forwards_ev(); } let mut pending_events = self.pending_events.lock().unwrap(); pending_events.push_back((events::Event::HTLCHandlingFailed { prev_channel_id: *channel_id, @@ -5398,6 +5582,7 @@ where }, None)); }, } + push_forward_event } /// Provides a payment preimage in response to [`Event::PaymentClaimable`], generating any @@ -5554,7 +5739,7 @@ where if !valid_mpp { for htlc in sources.drain(..) { let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec(); - htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height().to_be_bytes()); + htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes()); let source = HTLCSource::PreviousHopData(htlc.prev_hop); let reason = HTLCFailReason::reason(0x4000 | 15, htlc_msat_height_data); let receiver = HTLCDestination::FailedPayment { payment_hash }; @@ -5756,8 +5941,6 @@ where let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data); #[cfg(debug_assertions)] let claiming_chan_funding_outpoint = hop_data.outpoint; - #[cfg(debug_assertions)] - let claiming_channel_id = hop_data.channel_id; let res = self.claim_funds_from_hop(hop_data, payment_preimage, |htlc_claim_value_msat, definitely_duplicate| { let chan_to_release = @@ -5815,7 +5998,7 @@ where BackgroundEvent::MonitorUpdatesComplete { channel_id, .. } => - *channel_id == claiming_channel_id, + *channel_id == prev_channel_id, } }), "{:?}", *background_events); } @@ -5918,24 +6101,31 @@ where fn handle_channel_resumption(&self, pending_msg_events: &mut Vec, channel: &mut Channel, raa: Option, commitment_update: Option, order: RAACommitmentOrder, - pending_forwards: Vec<(PendingHTLCInfo, u64)>, funding_broadcastable: Option, + pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec, + funding_broadcastable: Option, channel_ready: Option, announcement_sigs: Option) - -> Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> { + -> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec)>) { let logger = WithChannelContext::from(&self.logger, &channel.context); - log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {}broadcasting funding, {} channel ready, {} announcement", + log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement", &channel.context.channel_id(), if raa.is_some() { "an" } else { "no" }, - if commitment_update.is_some() { "a" } else { "no" }, pending_forwards.len(), + if commitment_update.is_some() { "a" } else { "no" }, + pending_forwards.len(), pending_update_adds.len(), if funding_broadcastable.is_some() { "" } else { "not " }, if channel_ready.is_some() { "sending" } else { "without" }, if announcement_sigs.is_some() { "sending" } else { "without" }); - let mut htlc_forwards = None; - let counterparty_node_id = channel.context.get_counterparty_node_id(); + let short_channel_id = channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias()); + + let mut htlc_forwards = None; if !pending_forwards.is_empty() { - htlc_forwards = Some((channel.context.get_short_channel_id().unwrap_or(channel.context.outbound_scid_alias()), - channel.context.get_funding_txo().unwrap(), channel.context.channel_id(), channel.context.get_user_id(), pending_forwards)); + htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(), + channel.context.channel_id(), channel.context.get_user_id(), pending_forwards)); + } + let mut decode_update_add_htlcs = None; + if !pending_update_adds.is_empty() { + decode_update_add_htlcs = Some((short_channel_id, pending_update_adds)); } if let Some(msg) = channel_ready { @@ -5986,7 +6176,7 @@ where emit_channel_ready_event!(pending_events, channel); } - htlc_forwards + (htlc_forwards, decode_update_add_htlcs) } fn channel_monitor_updated(&self, funding_txo: &OutPoint, channel_id: &ChannelId, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) { @@ -6101,73 +6291,82 @@ where // happening and return an error. N.B. that we create channel with an outbound SCID of zero so // that we can delay allocating the SCID until after we're sure that the checks below will // succeed. - let mut channel = match peer_state.inbound_channel_request_by_id.remove(temporary_channel_id) { + let res = match peer_state.inbound_channel_request_by_id.remove(temporary_channel_id) { Some(unaccepted_channel) => { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; InboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, counterparty_node_id.clone(), &self.channel_type_features(), &peer_state.latest_features, &unaccepted_channel.open_channel_msg, user_channel_id, &self.default_configuration, best_block_height, - &self.logger, accept_0conf).map_err(|e| { - let err_str = e.to_string(); - log_error!(logger, "{}", err_str); - - APIError::ChannelUnavailable { err: err_str } - }) - } + &self.logger, accept_0conf).map_err(|err| MsgHandleErrInternal::from_chan_no_close(err, *temporary_channel_id)) + }, _ => { let err_str = "No such channel awaiting to be accepted.".to_owned(); log_error!(logger, "{}", err_str); - Err(APIError::APIMisuseError { err: err_str }) + return Err(APIError::APIMisuseError { err: err_str }); } - }?; + }; - if accept_0conf { - // This should have been correctly configured by the call to InboundV1Channel::new. - debug_assert!(channel.context.minimum_depth().unwrap() == 0); - } else if channel.context.get_channel_type().requires_zero_conf() { - let send_msg_err_event = events::MessageSendEvent::HandleError { - node_id: channel.context.get_counterparty_node_id(), - action: msgs::ErrorAction::SendErrorMessage{ - msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "No zero confirmation channels accepted".to_owned(), } + match res { + Err(err) => { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + match handle_error!(self, Result::<(), MsgHandleErrInternal>::Err(err), *counterparty_node_id) { + Ok(_) => unreachable!("`handle_error` only returns Err as we've passed in an Err"), + Err(e) => { + return Err(APIError::ChannelUnavailable { err: e.err }); + }, } - }; - peer_state.pending_msg_events.push(send_msg_err_event); - let err_str = "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned(); - log_error!(logger, "{}", err_str); + } + Ok(mut channel) => { + if accept_0conf { + // This should have been correctly configured by the call to InboundV1Channel::new. + debug_assert!(channel.context.minimum_depth().unwrap() == 0); + } else if channel.context.get_channel_type().requires_zero_conf() { + let send_msg_err_event = events::MessageSendEvent::HandleError { + node_id: channel.context.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage{ + msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "No zero confirmation channels accepted".to_owned(), } + } + }; + peer_state.pending_msg_events.push(send_msg_err_event); + let err_str = "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned(); + log_error!(logger, "{}", err_str); - return Err(APIError::APIMisuseError { err: err_str }); - } else { - // If this peer already has some channels, a new channel won't increase our number of peers - // with unfunded channels, so as long as we aren't over the maximum number of unfunded - // channels per-peer we can accept channels from a peer with existing ones. - if is_only_peer_channel && peers_without_funded_channels >= MAX_UNFUNDED_CHANNEL_PEERS { - let send_msg_err_event = events::MessageSendEvent::HandleError { - node_id: channel.context.get_counterparty_node_id(), - action: msgs::ErrorAction::SendErrorMessage{ - msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "Have too many peers with unfunded channels, not accepting new ones".to_owned(), } - } - }; - peer_state.pending_msg_events.push(send_msg_err_event); - let err_str = "Too many peers with unfunded channels, refusing to accept new ones".to_owned(); - log_error!(logger, "{}", err_str); + return Err(APIError::APIMisuseError { err: err_str }); + } else { + // If this peer already has some channels, a new channel won't increase our number of peers + // with unfunded channels, so as long as we aren't over the maximum number of unfunded + // channels per-peer we can accept channels from a peer with existing ones. + if is_only_peer_channel && peers_without_funded_channels >= MAX_UNFUNDED_CHANNEL_PEERS { + let send_msg_err_event = events::MessageSendEvent::HandleError { + node_id: channel.context.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage{ + msg: msgs::ErrorMessage { channel_id: temporary_channel_id.clone(), data: "Have too many peers with unfunded channels, not accepting new ones".to_owned(), } + } + }; + peer_state.pending_msg_events.push(send_msg_err_event); + let err_str = "Too many peers with unfunded channels, refusing to accept new ones".to_owned(); + log_error!(logger, "{}", err_str); - return Err(APIError::APIMisuseError { err: err_str }); - } - } + return Err(APIError::APIMisuseError { err: err_str }); + } + } - // Now that we know we have a channel, assign an outbound SCID alias. - let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); - channel.context.set_outbound_scid_alias(outbound_scid_alias); + // Now that we know we have a channel, assign an outbound SCID alias. + let outbound_scid_alias = self.create_and_insert_outbound_scid_alias(); + channel.context.set_outbound_scid_alias(outbound_scid_alias); - peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { - node_id: channel.context.get_counterparty_node_id(), - msg: channel.accept_inbound_channel(), - }); + peer_state.pending_msg_events.push(events::MessageSendEvent::SendAcceptChannel { + node_id: channel.context.get_counterparty_node_id(), + msg: channel.accept_inbound_channel(), + }); - peer_state.channel_by_id.insert(temporary_channel_id.clone(), ChannelPhase::UnfundedInboundV1(channel)); + peer_state.channel_by_id.insert(temporary_channel_id.clone(), ChannelPhase::UnfundedInboundV1(channel)); - Ok(()) + Ok(()) + }, + } } /// Gets the number of peers which match the given filter and do not have any funded, outbound, @@ -6178,7 +6377,7 @@ where fn peers_without_funded_channels(&self, maybe_count_peer: Filter) -> usize where Filter: Fn(&PeerState) -> bool { let mut peers_without_funded_channels = 0; - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; { let peer_state_lock = self.per_peer_state.read().unwrap(); for (_, peer_mtx) in peer_state_lock.iter() { @@ -6280,7 +6479,7 @@ where msg.common_fields.temporary_channel_id.clone())); } - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; if Self::unfunded_channel_count(peer_state, best_block_height) >= MAX_UNFUNDED_CHANS_PER_PEER { return Err(MsgHandleErrInternal::send_err_msg_no_close( format!("Refusing more than {} unfunded channels.", MAX_UNFUNDED_CHANS_PER_PEER), @@ -6723,9 +6922,8 @@ where } if let Some(ChannelPhase::Funded(chan)) = chan_option { if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -6762,7 +6960,7 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan_phase_entry) => { if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { - let pending_forward_info = match decoded_hop_res { + let mut pending_forward_info = match decoded_hop_res { Ok((next_hop, shared_secret, next_packet_pk_opt)) => self.construct_pending_htlc_status( msg, counterparty_node_id, shared_secret, next_hop, @@ -6770,44 +6968,45 @@ where ), Err(e) => PendingHTLCStatus::Fail(e) }; - let create_pending_htlc_status = |chan: &Channel, pending_forward_info: PendingHTLCStatus, error_code: u16| { + let logger = WithChannelContext::from(&self.logger, &chan.context); + // If the update_add is completely bogus, the call will Err and we will close, + // but if we've sent a shutdown and they haven't acknowledged it yet, we just + // want to reject the new HTLC and fail it backwards instead of forwarding. + if let Err((_, error_code)) = chan.can_accept_incoming_htlc(&msg, &self.fee_estimator, &logger) { if msg.blinding_point.is_some() { - return PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed( - msgs::UpdateFailMalformedHTLC { - channel_id: msg.channel_id, - htlc_id: msg.htlc_id, - sha256_of_onion: [0; 32], - failure_code: INVALID_ONION_BLINDING, - } - )) - } - // If the update_add is completely bogus, the call will Err and we will close, - // but if we've sent a shutdown and they haven't acknowledged it yet, we just - // want to reject the new HTLC and fail it backwards instead of forwarding. - match pending_forward_info { - PendingHTLCStatus::Forward(PendingHTLCInfo { - ref incoming_shared_secret, ref routing, .. - }) => { - let reason = if routing.blinded_failure().is_some() { - HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32]) - } else if (error_code & 0x1000) != 0 { - let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan); - HTLCFailReason::reason(real_code, error_data) - } else { - HTLCFailReason::from_failure_code(error_code) - }.get_encrypted_failure_packet(incoming_shared_secret, &None); - let msg = msgs::UpdateFailHTLC { + pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Malformed( + msgs::UpdateFailMalformedHTLC { channel_id: msg.channel_id, htlc_id: msg.htlc_id, - reason - }; - PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg)) - }, - _ => pending_forward_info + sha256_of_onion: [0; 32], + failure_code: INVALID_ONION_BLINDING, + } + )) + } else { + match pending_forward_info { + PendingHTLCStatus::Forward(PendingHTLCInfo { + ref incoming_shared_secret, ref routing, .. + }) => { + let reason = if routing.blinded_failure().is_some() { + HTLCFailReason::reason(INVALID_ONION_BLINDING, vec![0; 32]) + } else if (error_code & 0x1000) != 0 { + let (real_code, error_data) = self.get_htlc_inbound_temp_fail_err_and_data(error_code, chan); + HTLCFailReason::reason(real_code, error_data) + } else { + HTLCFailReason::from_failure_code(error_code) + }.get_encrypted_failure_packet(incoming_shared_secret, &None); + let msg = msgs::UpdateFailHTLC { + channel_id: msg.channel_id, + htlc_id: msg.htlc_id, + reason + }; + pending_forward_info = PendingHTLCStatus::Fail(HTLCFailureMsg::Relay(msg)); + }, + _ => {}, + } } - }; - let logger = WithChannelContext::from(&self.logger, &chan.context); - try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info, create_pending_htlc_status, &self.fee_estimator, &&logger), chan_phase_entry); + } + try_chan_phase_entry!(self, chan.update_add_htlc(&msg, pending_forward_info), chan_phase_entry); } else { return try_chan_phase_entry!(self, Err(ChannelError::Close( "Got an update_add_htlc message for an unfunded channel!".into())), chan_phase_entry); @@ -6951,10 +7150,28 @@ where } } + fn push_decode_update_add_htlcs(&self, mut update_add_htlcs: (u64, Vec)) { + let mut push_forward_event = self.forward_htlcs.lock().unwrap().is_empty(); + let mut decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap(); + push_forward_event &= decode_update_add_htlcs.is_empty(); + let scid = update_add_htlcs.0; + match decode_update_add_htlcs.entry(scid) { + hash_map::Entry::Occupied(mut e) => { e.get_mut().append(&mut update_add_htlcs.1); }, + hash_map::Entry::Vacant(e) => { e.insert(update_add_htlcs.1); }, + } + if push_forward_event { self.push_pending_forwards_ev(); } + } + #[inline] fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) { + let push_forward_event = self.forward_htlcs_without_forward_event(per_source_pending_forwards); + if push_forward_event { self.push_pending_forwards_ev() } + } + + #[inline] + fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool { + let mut push_forward_event = false; for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards { - let mut push_forward_event = false; let mut new_intercept_events = VecDeque::new(); let mut failed_intercept_forwards = Vec::new(); if !pending_forwards.is_empty() { @@ -6967,6 +7184,7 @@ where // Pull this now to avoid introducing a lock order with `forward_htlcs`. let is_our_scid = self.short_to_chan_info.read().unwrap().contains_key(&scid); + let decode_update_add_htlcs_empty = self.decode_update_add_htlcs.lock().unwrap().is_empty(); let mut forward_htlcs = self.forward_htlcs.lock().unwrap(); let forward_htlcs_empty = forward_htlcs.is_empty(); match forward_htlcs.entry(scid) { @@ -7015,9 +7233,7 @@ where } else { // We don't want to generate a PendingHTLCsForwardable event if only intercepted // payments are being processed. - if forward_htlcs_empty { - push_forward_event = true; - } + push_forward_event |= forward_htlcs_empty && decode_update_add_htlcs_empty; entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info }))); } @@ -7027,15 +7243,15 @@ where } for (htlc_source, payment_hash, failure_reason, destination) in failed_intercept_forwards.drain(..) { - self.fail_htlc_backwards_internal(&htlc_source, &payment_hash, &failure_reason, destination); + push_forward_event |= self.fail_htlc_backwards_internal_without_forward_event(&htlc_source, &payment_hash, &failure_reason, destination); } if !new_intercept_events.is_empty() { let mut events = self.pending_events.lock().unwrap(); events.append(&mut new_intercept_events); } - if push_forward_event { self.push_pending_forwards_ev() } } + push_forward_event } fn push_pending_forwards_ev(&self) { @@ -7175,7 +7391,7 @@ where peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelAnnouncement { msg: try_chan_phase_entry!(self, chan.announcement_signatures( - &self.node_signer, self.chain_hash, self.best_block.read().unwrap().height(), + &self.node_signer, self.chain_hash, self.best_block.read().unwrap().height, msg, &self.default_configuration ), chan_phase_entry), // Note that announcement_signatures fails if the channel cannot be announced, @@ -7245,7 +7461,6 @@ where } fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result { - let htlc_forwards; let need_lnd_workaround = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -7288,9 +7503,11 @@ where } } let need_lnd_workaround = chan.context.workaround_lnd_bug_4006.take(); - htlc_forwards = self.handle_channel_resumption( + let (htlc_forwards, decode_update_add_htlcs) = self.handle_channel_resumption( &mut peer_state.pending_msg_events, chan, responses.raa, responses.commitment_update, responses.order, - Vec::new(), None, responses.channel_ready, responses.announcement_sigs); + Vec::new(), Vec::new(), None, responses.channel_ready, responses.announcement_sigs); + debug_assert!(htlc_forwards.is_none()); + debug_assert!(decode_update_add_htlcs.is_none()); if let Some(upd) = channel_update { peer_state.pending_msg_events.push(upd); } @@ -7336,16 +7553,10 @@ where } }; - let mut persist = NotifyOption::SkipPersistHandleEvents; - if let Some(forwards) = htlc_forwards { - self.forward_htlcs(&mut [forwards][..]); - persist = NotifyOption::DoPersist; - } - if let Some(channel_ready_msg) = need_lnd_workaround { self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?; } - Ok(persist) + Ok(NotifyOption::SkipPersistHandleEvents) } /// Process pending events from the [`chain::Watch`], returning whether any events were processed. @@ -7372,7 +7583,7 @@ where self.fail_htlc_backwards_internal(&htlc_update.source, &htlc_update.payment_hash, &reason, receiver); } }, - MonitorEvent::HolderForceClosed(_funding_outpoint) => { + MonitorEvent::HolderForceClosed(_) | MonitorEvent::HolderForceClosedWithInfo { .. } => { let counterparty_node_id_opt = match counterparty_node_id { Some(cp_id) => Some(cp_id), None => { @@ -7390,16 +7601,22 @@ where let pending_msg_events = &mut peer_state.pending_msg_events; if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id) { if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, chan_phase_entry) { - failed_channels.push(chan.context.force_shutdown(false, ClosureReason::HolderForceClosed)); + let reason = if let MonitorEvent::HolderForceClosedWithInfo { reason, .. } = monitor_event { + reason + } else { + ClosureReason::HolderForceClosed + }; + failed_channels.push(chan.context.force_shutdown(false, reason.clone())); if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } pending_msg_events.push(events::MessageSendEvent::HandleError { node_id: chan.context.get_counterparty_node_id(), action: msgs::ErrorAction::DisconnectPeer { - msg: Some(msgs::ErrorMessage { channel_id: chan.context.channel_id(), data: "Channel force-closed".to_owned() }) + msg: Some(msgs::ErrorMessage { channel_id: chan.context.channel_id(), data: reason.to_string() }) }, }); } @@ -7577,7 +7794,8 @@ where // We're done with this channel. We got a closing_signed and sent back // a closing_signed with a closing transaction to broadcast. if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -7638,7 +7856,9 @@ where self.finish_close_channel(failure); } } +} +macro_rules! create_offer_builder { ($self: ident, $builder: ty) => { /// Creates an [`OfferBuilder`] such that the [`Offer`] it builds is recognized by the /// [`ChannelManager`] when handling [`InvoiceRequest`] messages for the offer. The offer will /// not have an expiration unless otherwise set on the builder. @@ -7667,23 +7887,25 @@ where /// [`Offer`]: crate::offers::offer::Offer /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest pub fn create_offer_builder( - &self, description: String - ) -> Result, Bolt12SemanticError> { - let node_id = self.get_our_node_id(); - let expanded_key = &self.inbound_payment_key; - let entropy = &*self.entropy_source; - let secp_ctx = &self.secp_ctx; - - let path = self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?; + &$self, description: String + ) -> Result<$builder, Bolt12SemanticError> { + let node_id = $self.get_our_node_id(); + let expanded_key = &$self.inbound_payment_key; + let entropy = &*$self.entropy_source; + let secp_ctx = &$self.secp_ctx; + + let path = $self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?; let builder = OfferBuilder::deriving_signing_pubkey( description, node_id, expanded_key, entropy, secp_ctx ) - .chain_hash(self.chain_hash) + .chain_hash($self.chain_hash) .path(path); - Ok(builder) + Ok(builder.into()) } +} } +macro_rules! create_refund_builder { ($self: ident, $builder: ty) => { /// Creates a [`RefundBuilder`] such that the [`Refund`] it builds is recognized by the /// [`ChannelManager`] when handling [`Bolt12Invoice`] messages for the refund. /// @@ -7733,31 +7955,55 @@ where /// [`Bolt12Invoice::payment_paths`]: crate::offers::invoice::Bolt12Invoice::payment_paths /// [Avoiding Duplicate Payments]: #avoiding-duplicate-payments pub fn create_refund_builder( - &self, description: String, amount_msats: u64, absolute_expiry: Duration, + &$self, description: String, amount_msats: u64, absolute_expiry: Duration, payment_id: PaymentId, retry_strategy: Retry, max_total_routing_fee_msat: Option - ) -> Result, Bolt12SemanticError> { - let node_id = self.get_our_node_id(); - let expanded_key = &self.inbound_payment_key; - let entropy = &*self.entropy_source; - let secp_ctx = &self.secp_ctx; + ) -> Result<$builder, Bolt12SemanticError> { + let node_id = $self.get_our_node_id(); + let expanded_key = &$self.inbound_payment_key; + let entropy = &*$self.entropy_source; + let secp_ctx = &$self.secp_ctx; - let path = self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?; + let path = $self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?; let builder = RefundBuilder::deriving_payer_id( description, node_id, expanded_key, entropy, secp_ctx, amount_msats, payment_id )? - .chain_hash(self.chain_hash) + .chain_hash($self.chain_hash) .absolute_expiry(absolute_expiry) .path(path); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop($self); + let expiration = StaleExpiration::AbsoluteTimeout(absolute_expiry); - self.pending_outbound_payments + $self.pending_outbound_payments .add_new_awaiting_invoice( payment_id, expiration, retry_strategy, max_total_routing_fee_msat, ) .map_err(|_| Bolt12SemanticError::DuplicatePaymentId)?; - Ok(builder) + Ok(builder.into()) } +} } + +impl ChannelManager +where + M::Target: chain::Watch<::EcdsaSigner>, + T::Target: BroadcasterInterface, + ES::Target: EntropySource, + NS::Target: NodeSigner, + SP::Target: SignerProvider, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, +{ + #[cfg(not(c_bindings))] + create_offer_builder!(self, OfferBuilder); + #[cfg(not(c_bindings))] + create_refund_builder!(self, RefundBuilder); + + #[cfg(c_bindings)] + create_offer_builder!(self, OfferWithDerivedMetadataBuilder); + #[cfg(c_bindings)] + create_refund_builder!(self, RefundMaybeWithDerivedMetadataBuilder); /// Pays for an [`Offer`] using the given parameters by creating an [`InvoiceRequest`] and /// enqueuing it to be sent via an onion message. [`ChannelManager`] will pay the actual @@ -7802,6 +8048,7 @@ where /// Errors if: /// - a duplicate `payment_id` is provided given the caveats in the aforementioned link, /// - the provided parameters are invalid for the offer, + /// - the offer is for an unsupported chain, or /// - the parameterized [`Router`] is unable to create a blinded reply path for the invoice /// request. /// @@ -7821,9 +8068,11 @@ where let entropy = &*self.entropy_source; let secp_ctx = &self.secp_ctx; - let builder = offer + let builder: InvoiceRequestBuilder = offer .request_invoice_deriving_payer_id(expanded_key, entropy, secp_ctx, payment_id)? - .chain_hash(self.chain_hash)?; + .into(); + let builder = builder.chain_hash(self.chain_hash)?; + let builder = match quantity { None => builder, Some(quantity) => builder.quantity(quantity)?, @@ -7839,6 +8088,8 @@ where let invoice_request = builder.build_and_sign()?; let reply_path = self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?; + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let expiration = StaleExpiration::TimerTicks(1); self.pending_outbound_payments .add_new_awaiting_invoice( @@ -7888,8 +8139,10 @@ where /// /// # Errors /// - /// Errors if the parameterized [`Router`] is unable to create a blinded payment path or reply - /// path for the invoice. + /// Errors if: + /// - the refund is for an unsupported chain, or + /// - the parameterized [`Router`] is unable to create a blinded payment path or reply path for + /// the invoice. /// /// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice pub fn request_refund_payment(&self, refund: &Refund) -> Result<(), Bolt12SemanticError> { @@ -7900,6 +8153,12 @@ where let amount_msats = refund.amount_msats(); let relative_expiry = DEFAULT_RELATIVE_EXPIRY.as_secs() as u32; + if refund.chain() != self.chain_hash { + return Err(Bolt12SemanticError::UnsupportedChain); + } + + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + match self.create_inbound_payment(Some(amount_msats), relative_expiry, None) { Ok((payment_hash, payment_secret)) => { let payment_paths = self.create_blinded_payment_paths(amount_msats, payment_secret) @@ -7917,6 +8176,7 @@ where let builder = refund.respond_using_derived_keys_no_std( payment_paths, payment_hash, created_at, expanded_key, entropy )?; + let builder: InvoiceBuilder = builder.into(); let invoice = builder.allow_mpp().build_and_sign(secp_ctx)?; let reply_path = self.create_blinded_path() .map_err(|_| Bolt12SemanticError::MissingPaths)?; @@ -8073,7 +8333,7 @@ where let first_hops = self.list_usable_channels(); let payee_node_id = self.get_our_node_id(); - let max_cltv_expiry = self.best_block.read().unwrap().height() + CLTV_FAR_FAR_AWAY + let max_cltv_expiry = self.best_block.read().unwrap().height + CLTV_FAR_FAR_AWAY + LATENCY_GRACE_PERIOD_BLOCKS; let payee_tlvs = ReceiveTlvs { payment_secret, @@ -8092,7 +8352,7 @@ where /// /// [phantom node payments]: crate::sign::PhantomKeysManager pub fn get_phantom_scid(&self) -> u64 { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let short_to_chan_info = self.short_to_chan_info.read().unwrap(); loop { let scid_candidate = fake_scid::Namespace::Phantom.get_fake_scid(best_block_height, &self.chain_hash, &self.fake_scid_rand_bytes, &self.entropy_source); @@ -8122,7 +8382,7 @@ where /// Note that this method is not guaranteed to return unique values, you may need to call it a few /// times to get a unique scid. pub fn get_intercept_scid(&self) -> u64 { - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; let short_to_chan_info = self.short_to_chan_info.read().unwrap(); loop { let scid_candidate = fake_scid::Namespace::Intercept.get_fake_scid(best_block_height, &self.chain_hash, &self.fake_scid_rand_bytes, &self.entropy_source); @@ -8295,7 +8555,7 @@ where /// will randomly be placed first or last in the returned array. /// /// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate` - /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among + /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among /// the `MessageSendEvent`s to the specific peer they were generated under. fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); @@ -8315,6 +8575,7 @@ where result = NotifyOption::DoPersist; } + let mut is_any_peer_connected = false; let mut pending_events = Vec::new(); let per_peer_state = self.per_peer_state.read().unwrap(); for (_cp_id, peer_state_mutex) in per_peer_state.iter() { @@ -8323,6 +8584,15 @@ where if peer_state.pending_msg_events.len() > 0 { pending_events.append(&mut peer_state.pending_msg_events); } + if peer_state.is_connected { + is_any_peer_connected = true + } + } + + // Ensure that we are connected to some peers before getting broadcast messages. + if is_any_peer_connected { + let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap(); + pending_events.append(&mut broadcast_msgs); } if !pending_events.is_empty() { @@ -8370,9 +8640,9 @@ where fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) { { let best_block = self.best_block.read().unwrap(); - assert_eq!(best_block.block_hash(), header.prev_blockhash, + assert_eq!(best_block.block_hash, header.prev_blockhash, "Blocks must be connected in chain-order - the connected header must build on the last connected header"); - assert_eq!(best_block.height(), height - 1, + assert_eq!(best_block.height, height - 1, "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); } @@ -8387,9 +8657,9 @@ where let new_height = height - 1; { let mut best_block = self.best_block.write().unwrap(); - assert_eq!(best_block.block_hash(), header.block_hash(), + assert_eq!(best_block.block_hash, header.block_hash(), "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); - assert_eq!(best_block.height(), height, + assert_eq!(best_block.height, height, "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); *best_block = BestBlock::new(header.prev_blockhash, new_height) } @@ -8423,7 +8693,7 @@ where self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.chain_hash, &self.node_signer, &self.default_configuration, &&WithChannelContext::from(&self.logger, &channel.context)) .map(|(a, b)| (a, Vec::new(), b))); - let last_best_block_height = self.best_block.read().unwrap().height(); + let last_best_block_height = self.best_block.read().unwrap().height; if height < last_best_block_height { let timestamp = self.highest_seen_timestamp.load(Ordering::Acquire); self.do_chain_event(Some(last_best_block_height), |channel| channel.best_block_updated(last_best_block_height, timestamp as u32, self.chain_hash, &self.node_signer, &self.default_configuration, &&WithChannelContext::from(&self.logger, &channel.context))); @@ -8527,6 +8797,7 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; let pending_msg_events = &mut peer_state.pending_msg_events; + peer_state.channel_by_id.retain(|_, phase| { match phase { // Retain unfunded channels. @@ -8602,7 +8873,8 @@ where let reason_message = format!("{}", reason); failed_channels.push(channel.context.force_shutdown(true, reason)); if let Ok(update) = self.get_channel_update_for_broadcast(&channel) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap(); + pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg: update }); } @@ -9059,7 +9331,12 @@ where // Gossip &events::MessageSendEvent::SendChannelAnnouncement { .. } => false, &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, - &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, + // [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`] + // This check here is to ensure exhaustivity. + &events::MessageSendEvent::BroadcastChannelUpdate { .. } => { + debug_assert!(false, "This event shouldn't have been here"); + false + }, &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true, &events::MessageSendEvent::SendChannelUpdate { .. } => false, &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, @@ -9123,7 +9400,7 @@ where let mut peer_state = e.get().lock().unwrap(); peer_state.latest_features = init_msg.features.clone(); - let best_block_height = self.best_block.read().unwrap().height(); + let best_block_height = self.best_block.read().unwrap().height; if inbound_peer_limited && Self::unfunded_channel_count(&*peer_state, best_block_height) == peer_state.channel_by_id.len() @@ -9198,8 +9475,6 @@ where } fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - match &msg.data as &str { "cannot co-op close channel w/ active htlcs"| "link failed to shutdown" => @@ -9212,34 +9487,45 @@ where // We're not going to bother handling this in a sensible way, instead simply // repeating the Shutdown message on repeat until morale improves. if !msg.channel_id.is_zero() { - let per_peer_state = self.per_peer_state.read().unwrap(); - let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); - if peer_state_mutex_opt.is_none() { return; } - let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); - if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get(&msg.channel_id) { - if let Some(msg) = chan.get_outbound_shutdown() { - peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { - node_id: *counterparty_node_id, - msg, - }); - } - peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError { - node_id: *counterparty_node_id, - action: msgs::ErrorAction::SendWarningMessage { - msg: msgs::WarningMessage { - channel_id: msg.channel_id, - data: "You appear to be exhibiting LND bug 6039, we'll keep sending you shutdown messages until you handle them correctly".to_owned() - }, - log_level: Level::Trace, + PersistenceNotifierGuard::optionally_notify( + self, + || -> NotifyOption { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if peer_state_mutex_opt.is_none() { return NotifyOption::SkipPersistNoEvents; } + let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); + if let Some(ChannelPhase::Funded(chan)) = peer_state.channel_by_id.get(&msg.channel_id) { + if let Some(msg) = chan.get_outbound_shutdown() { + peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown { + node_id: *counterparty_node_id, + msg, + }); + } + peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError { + node_id: *counterparty_node_id, + action: msgs::ErrorAction::SendWarningMessage { + msg: msgs::WarningMessage { + channel_id: msg.channel_id, + data: "You appear to be exhibiting LND bug 6039, we'll keep sending you shutdown messages until you handle them correctly".to_owned() + }, + log_level: Level::Trace, + } + }); + // This can happen in a fairly tight loop, so we absolutely cannot trigger + // a `ChannelManager` write here. + return NotifyOption::SkipPersistHandleEvents; } - }); - } + NotifyOption::SkipPersistNoEvents + } + ); } return; } _ => {} } + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + if msg.channel_id.is_zero() { let channel_ids: Vec = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -9429,6 +9715,8 @@ where let builder = invoice_request.respond_using_derived_keys_no_std( payment_paths, payment_hash, created_at ); + let builder: Result, _> = + builder.map(|b| b.into()); match builder.and_then(|b| b.allow_mpp().build_and_sign(secp_ctx)) { Ok(invoice) => Some(OffersMessage::Invoice(invoice)), Err(error) => Some(OffersMessage::InvoiceError(error.into())), @@ -9440,18 +9728,25 @@ where let builder = invoice_request.respond_with_no_std( payment_paths, payment_hash, created_at ); + let builder: Result, _> = + builder.map(|b| b.into()); let response = builder.and_then(|builder| builder.allow_mpp().build()) .map_err(|e| OffersMessage::InvoiceError(e.into())) - .and_then(|invoice| - match invoice.sign(|invoice| self.node_signer.sign_bolt12_invoice(invoice)) { + .and_then(|invoice| { + #[cfg(c_bindings)] + let mut invoice = invoice; + match invoice.sign(|invoice: &UnsignedBolt12Invoice| + self.node_signer.sign_bolt12_invoice(invoice) + ) { Ok(invoice) => Ok(OffersMessage::Invoice(invoice)), - Err(SignError::Signing(())) => Err(OffersMessage::InvoiceError( + Err(SignError::Signing) => Err(OffersMessage::InvoiceError( InvoiceError::from_string("Failed signing invoice".to_string()) )), Err(SignError::Verification(_)) => Err(OffersMessage::InvoiceError( InvoiceError::from_string("Failed invoice signature verification".to_string()) )), - }); + } + }); match response { Ok(invoice) => Some(invoice), Err(error) => Some(error), @@ -9713,6 +10008,7 @@ impl_writeable_tlv_based_enum!(PendingHTLCRouting, }, (2, ReceiveKeysend) => { (0, payment_preimage, required), + (1, requires_blinded_error, (default_value, false)), (2, incoming_cltv_expiry, required), (3, payment_metadata, option), (4, payment_data, option), // Added in 0.0.116 @@ -10065,8 +10361,8 @@ where self.chain_hash.write(writer)?; { let best_block = self.best_block.read().unwrap(); - best_block.height().write(writer)?; - best_block.block_hash().write(writer)?; + best_block.height.write(writer)?; + best_block.block_hash.write(writer)?; } let mut serializable_peer_count: u64 = 0; @@ -10112,6 +10408,12 @@ where } } + let mut decode_update_add_htlcs_opt = None; + let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap(); + if !decode_update_add_htlcs.is_empty() { + decode_update_add_htlcs_opt = Some(decode_update_add_htlcs); + } + let per_peer_state = self.per_peer_state.write().unwrap(); let pending_inbound_payments = self.pending_inbound_payments.lock().unwrap(); @@ -10263,6 +10565,7 @@ where (10, in_flight_monitor_updates, option), (11, self.probing_cookie_secret, required), (13, htlc_onion_fields, optional_vec), + (14, decode_update_add_htlcs_opt, option), }); Ok(()) @@ -10728,6 +11031,7 @@ where let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); let mut events_override = None; let mut in_flight_monitor_updates: Option>> = None; + let mut decode_update_add_htlcs: Option>> = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -10741,7 +11045,9 @@ where (10, in_flight_monitor_updates, option), (11, probing_cookie_secret, option), (13, claimable_htlc_onion_fields, optional_vec), + (14, decode_update_add_htlcs, option), }); + let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map()); if fake_scid_rand_bytes.is_none() { fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes()); } @@ -10841,7 +11147,7 @@ where } } if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id { - // If the channel is ahead of the monitor, return InvalidValue: + // If the channel is ahead of the monitor, return DangerousValue: log_error!(logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight", chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id); @@ -10850,7 +11156,7 @@ where log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); - return Err(DecodeError::InvalidValue); + return Err(DecodeError::DangerousValue); } } else { // We shouldn't have persisted (or read) any unfunded channel types so none should have been @@ -10961,6 +11267,18 @@ where // still have an entry for this HTLC in `forward_htlcs` or // `pending_intercepted_htlcs`, we were apparently not persisted after // the monitor was when forwarding the payment. + decode_update_add_htlcs.retain(|scid, update_add_htlcs| { + update_add_htlcs.retain(|update_add_htlc| { + let matches = *scid == prev_hop_data.short_channel_id && + update_add_htlc.htlc_id == prev_hop_data.htlc_id; + if matches { + log_info!(logger, "Removing pending to-decode HTLC with hash {} as it was forwarded to the closed channel {}", + &htlc.payment_hash, &monitor.channel_id()); + } + !matches + }); + !update_add_htlcs.is_empty() + }); forward_htlcs.retain(|_, forwards| { forwards.retain(|forward| { if let HTLCForwardInfo::AddHTLC(htlc_info) = forward { @@ -11042,7 +11360,7 @@ where } } - if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() { + if !forward_htlcs.is_empty() || !decode_update_add_htlcs.is_empty() || pending_outbounds.needs_abandon() { // If we have pending HTLCs to forward, assume we either dropped a // `PendingHTLCsForwardable` or the user received it but never processed it as they // shut down before the timer hit. Either way, set the time_forwardable to a small @@ -11276,6 +11594,7 @@ where pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()), forward_htlcs: Mutex::new(forward_htlcs), + decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs), claimable_payments: Mutex::new(ClaimablePayments { claimable_payments, pending_claiming_payments: pending_claiming_payments.unwrap() }), outbound_scid_aliases: Mutex::new(outbound_scid_aliases), outpoint_to_peer: Mutex::new(outpoint_to_peer), @@ -11304,6 +11623,8 @@ where pending_offers_messages: Mutex::new(Vec::new()), + pending_broadcast_messages: Mutex::new(Vec::new()), + entropy_source: args.entropy_source, node_signer: args.node_signer, signer_provider: args.signer_provider, @@ -11835,6 +12156,61 @@ mod tests { } } + #[test] + fn test_channel_update_cached() { + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + let chan = create_announced_chan_between_nodes(&nodes, 0, 1); + + nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap(); + check_added_monitors!(nodes[0], 1); + check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000); + + // Confirm that the channel_update was not sent immediately to node[1] but was cached. + let node_1_events = nodes[1].node.get_and_clear_pending_msg_events(); + assert_eq!(node_1_events.len(), 0); + + { + // Assert that ChannelUpdate message has been added to node[0] pending broadcast messages + let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap(); + assert_eq!(pending_broadcast_messages.len(), 1); + } + + // Test that we do not retrieve the pending broadcast messages when we are not connected to any peer + nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id()); + nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + + nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id()); + nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + + let node_0_events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(node_0_events.len(), 0); + + // Now we reconnect to a peer + nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init { + features: nodes[2].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init { + features: nodes[0].node.init_features(), networks: None, remote_network_address: None + }, false).unwrap(); + + // Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages + let node_0_events = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(node_0_events.len(), 1); + match &node_0_events[0] { + MessageSendEvent::BroadcastChannelUpdate { .. } => (), + _ => panic!("Unexpected event"), + } + { + // Assert that ChannelUpdate message has been cleared from nodes[0] pending broadcast messages + let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap(); + assert_eq!(pending_broadcast_messages.len(), 0); + } + } + #[test] fn test_drop_disconnected_peers_when_removing_channels() { let chanmon_cfgs = create_chanmon_cfgs(2); @@ -12350,7 +12726,7 @@ mod tests { }; // Check that if the amount we received + the penultimate hop extra fee is less than the sender // intended amount, we fail the payment. - let current_height: u32 = node[0].node.best_block.read().unwrap().height(); + let current_height: u32 = node[0].node.best_block.read().unwrap().height; if let Err(crate::ln::channelmanager::InboundHTLCErr { err_code, .. }) = create_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]), sender_intended_amt_msat - extra_fee_msat - 1, 42, None, true, Some(extra_fee_msat), @@ -12370,7 +12746,7 @@ mod tests { }), custom_tlvs: Vec::new(), }; - let current_height: u32 = node[0].node.best_block.read().unwrap().height(); + let current_height: u32 = node[0].node.best_block.read().unwrap().height; assert!(create_recv_pending_htlc_info(hop_data, [0; 32], PaymentHash([0; 32]), sender_intended_amt_msat - extra_fee_msat, 42, None, true, Some(extra_fee_msat), current_height, node[0].node.default_configuration.accept_mpp_keysend).is_ok()); @@ -12383,7 +12759,7 @@ mod tests { let node_chanmgr = create_node_chanmgrs(1, &node_cfg, &[None]); let node = create_network(1, &node_cfg, &node_chanmgr); - let current_height: u32 = node[0].node.best_block.read().unwrap().height(); + let current_height: u32 = node[0].node.best_block.read().unwrap().height; let result = create_recv_pending_htlc_info(msgs::InboundOnionPayload::Receive { sender_intended_htlc_amt_msat: 100, cltv_expiry_height: 22, @@ -12809,7 +13185,7 @@ pub mod bench { assert_eq!(&tx_broadcaster.txn_broadcasted.lock().unwrap()[..], &[tx.clone()]); - let block = create_dummy_block(BestBlock::from_network(network).block_hash(), 42, vec![tx]); + let block = create_dummy_block(BestBlock::from_network(network).block_hash, 42, vec![tx]); Listen::block_connected(&node_a, &block, 1); Listen::block_connected(&node_b, &block, 1);