X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=3fa8fa5ba947080d52cb888f505f40b568adb2a1;hb=2c5bd1c56fd5aeb7cf9b049584cf24bd1852c15d;hp=845a922ee562f3a52a4b80c99ea635e784998074;hpb=c1e6a74e0b8309bc85e683824f832ac539806dc5;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 845a922e..3fa8fa5b 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -36,7 +36,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, Fee use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::events; -use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; +use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason}; // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. use crate::ln::{inbound_payment, PaymentHash, PaymentPreimage, PaymentSecret}; @@ -106,11 +106,13 @@ pub(super) enum PendingHTLCRouting { }, Receive { payment_data: msgs::FinalOnionHopData, + payment_metadata: Option>, incoming_cltv_expiry: u32, // Used to track when we should expire pending HTLCs that go unclaimed phantom_shared_secret: Option<[u8; 32]>, }, ReceiveKeysend { payment_preimage: PaymentPreimage, + payment_metadata: Option>, incoming_cltv_expiry: u32, // Used to track when we should expire pending HTLCs that go unclaimed }, } @@ -472,6 +474,7 @@ impl_writeable_tlv_based!(ClaimingPayment, { struct ClaimablePayment { purpose: events::PaymentPurpose, + onion_fields: Option, htlcs: Vec, } @@ -623,6 +626,61 @@ pub type SimpleArcChannelManager = ChannelManager< /// This is not exported to bindings users as Arcs don't make sense in bindings pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>>, &'g L>; +/// A trivial trait which describes any [`ChannelManager`] used in testing. +#[cfg(any(test, feature = "_test_utils"))] +pub trait AChannelManager { + type Watch: chain::Watch; + type M: Deref; + type Broadcaster: BroadcasterInterface; + type T: Deref; + type EntropySource: EntropySource; + type ES: Deref; + type NodeSigner: NodeSigner; + type NS: Deref; + type Signer: WriteableEcdsaChannelSigner; + type SignerProvider: SignerProvider; + type SP: Deref; + type FeeEstimator: FeeEstimator; + type F: Deref; + type Router: Router; + type R: Deref; + type Logger: Logger; + type L: Deref; + fn get_cm(&self) -> &ChannelManager; +} +#[cfg(any(test, feature = "_test_utils"))] +impl AChannelManager +for ChannelManager +where + M::Target: chain::Watch<::Signer> + Sized, + T::Target: BroadcasterInterface + Sized, + ES::Target: EntropySource + Sized, + NS::Target: NodeSigner + Sized, + SP::Target: SignerProvider + Sized, + F::Target: FeeEstimator + Sized, + R::Target: Router + Sized, + L::Target: Logger + Sized, +{ + type Watch = M::Target; + type M = M; + type Broadcaster = T::Target; + type T = T; + type EntropySource = ES::Target; + type ES = ES; + type NodeSigner = NS::Target; + type NS = NS; + type Signer = ::Signer; + type SignerProvider = SP::Target; + type SP = SP; + type FeeEstimator = F::Target; + type F = F; + type Router = R::Target; + type R = R; + type Logger = L::Target; + type L = L; + fn get_cm(&self) -> &ChannelManager { self } +} + /// Manager which keeps track of a number of channels and sends messages to the appropriate /// channel, also tracking HTLC preimages and forwarding onion packets appropriately. /// @@ -1020,6 +1078,14 @@ pub(crate) const MPP_TIMEOUT_TICKS: u8 = 3; /// [`OutboundPayments::remove_stale_resolved_payments`]. pub(crate) const IDEMPOTENCY_TIMEOUT_TICKS: u8 = 7; +/// The number of ticks of [`ChannelManager::timer_tick_occurred`] where a peer is disconnected +/// until we mark the channel disabled and gossip the update. +pub(crate) const DISABLE_GOSSIP_TICKS: u8 = 10; + +/// The number of ticks of [`ChannelManager::timer_tick_occurred`] where a peer is connected until +/// we mark the channel enabled and gossip the update. +pub(crate) const ENABLE_GOSSIP_TICKS: u8 = 5; + /// The maximum number of unfunded channels we can have per-peer before we start rejecting new /// (inbound) ones. The number of peers with unfunded channels is limited separately in /// [`MAX_UNFUNDED_CHANNEL_PEERS`]. @@ -1359,15 +1425,15 @@ pub struct PhantomRouteHints { } macro_rules! handle_error { - ($self: ident, $internal: expr, $counterparty_node_id: expr) => { + ($self: ident, $internal: expr, $counterparty_node_id: expr) => { { + // In testing, ensure there are no deadlocks where the lock is already held upon + // entering the macro. + debug_assert_ne!($self.pending_events.held_by_thread(), LockHeldState::HeldByThread); + debug_assert_ne!($self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread); + match $internal { Ok(msg) => Ok(msg), Err(MsgHandleErrInternal { err, chan_id, shutdown_finish }) => { - // In testing, ensure there are no deadlocks where the lock is already held upon - // entering the macro. - debug_assert_ne!($self.pending_events.held_by_thread(), LockHeldState::HeldByThread); - debug_assert_ne!($self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread); - let mut msg_events = Vec::with_capacity(2); if let Some((shutdown_res, update_option)) = shutdown_finish { @@ -1406,7 +1472,7 @@ macro_rules! handle_error { Err(err) }, } - } + } } } macro_rules! update_maps_on_chan_removal { @@ -1593,7 +1659,7 @@ macro_rules! handle_new_monitor_update { ($self: ident, $update_res: expr, $update_id: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, MANUALLY_REMOVING, $remove: expr) => { { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. - debug_assert!($self.id_to_peer.try_lock().is_ok()); + debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); match $update_res { ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", @@ -1628,6 +1694,36 @@ macro_rules! handle_new_monitor_update { } } +macro_rules! process_events_body { + ($self: expr, $event_to_handle: expr, $handle_event: expr) => { + // We'll acquire our total consistency lock until the returned future completes so that + // we can be sure no other persists happen while processing events. + let _read_guard = $self.total_consistency_lock.read().unwrap(); + + let mut result = NotifyOption::SkipPersist; + + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if $self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + + let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } + + for event in pending_events { + $event_to_handle = event; + $handle_event; + } + + if result == NotifyOption::DoPersist { + $self.persistence_notifier.notify(); + } + } +} + impl ChannelManager where M::Target: chain::Watch<::Signer>, @@ -2168,7 +2264,7 @@ where msg: "Got non final data with an HMAC of 0", }); }, - msgs::OnionHopDataFormat::FinalNode { payment_data, keysend_preimage, .. } => { // TODO: expose the payment_metadata to the user + msgs::OnionHopDataFormat::FinalNode { payment_data, keysend_preimage, payment_metadata } => { if payment_data.is_some() && keysend_preimage.is_some() { return Err(ReceiveError { err_code: 0x4000|22, @@ -2178,6 +2274,7 @@ where } else if let Some(data) = payment_data { PendingHTLCRouting::Receive { payment_data: data, + payment_metadata, incoming_cltv_expiry: hop_data.outgoing_cltv_value, phantom_shared_secret, } @@ -2198,6 +2295,7 @@ where PendingHTLCRouting::ReceiveKeysend { payment_preimage, + payment_metadata, incoming_cltv_expiry: hop_data.outgoing_cltv_value, } } else { @@ -2377,7 +2475,14 @@ where // hopefully an attacker trying to path-trace payments cannot make this occur // on a small/per-node/per-channel scale. if !chan.is_live() { // channel_disabled - break Some(("Forwarding channel is not in a ready state.", 0x1000 | 20, chan_update_opt)); + // If the channel_update we're going to return is disabled (i.e. the + // peer has been disabled for some time), return `channel_disabled`, + // otherwise return `temporary_channel_failure`. + if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { + break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); + } else { + break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); + } } if *outgoing_amt_msat < chan.get_counterparty_htlc_minimum_msat() { // amount_below_minimum break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt)); @@ -2502,11 +2607,18 @@ where log_trace!(self.logger, "Generating channel update for channel {}", log_bytes!(chan.channel_id())); let were_node_one = self.our_network_pubkey.serialize()[..] < chan.get_counterparty_node_id().serialize()[..]; + let enabled = chan.is_usable() && match chan.channel_update_status() { + ChannelUpdateStatus::Enabled => true, + ChannelUpdateStatus::DisabledStaged(_) => true, + ChannelUpdateStatus::Disabled => false, + ChannelUpdateStatus::EnabledStaged(_) => false, + }; + let unsigned = msgs::UnsignedChannelUpdate { chain_hash: self.genesis_hash, short_channel_id, timestamp: chan.get_update_time_counter(), - flags: (!were_node_one) as u8 | ((!chan.is_live() as u8) << 1), + flags: (!were_node_one) as u8 | ((!enabled as u8) << 1), cltv_expiry_delta: chan.get_cltv_expiry_delta(), htlc_minimum_msat: chan.get_counterparty_htlc_minimum_msat(), htlc_maximum_msat: chan.get_announced_htlc_max_msat(), @@ -2697,6 +2809,11 @@ where self.pending_outbound_payments.test_add_new_pending_payment(payment_hash, recipient_onion, payment_id, route, None, &self.entropy_source, best_block_height) } + #[cfg(test)] + pub(crate) fn test_set_payment_metadata(&self, payment_id: PaymentId, new_payment_metadata: Option>) { + self.pending_outbound_payments.test_set_payment_metadata(payment_id, new_payment_metadata); + } + /// Signals that no further retries for the given payment should occur. Useful if you have a /// pending outbound payment with retries remaining, but wish to stop retrying the payment before @@ -2716,7 +2833,7 @@ where /// [`Event::PaymentSent`]: events::Event::PaymentSent pub fn abandon_payment(&self, payment_id: PaymentId) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - self.pending_outbound_payments.abandon_payment(payment_id, &self.pending_events); + self.pending_outbound_payments.abandon_payment(payment_id, PaymentFailureReason::UserAbandoned, &self.pending_events); } /// Send a spontaneous payment, which is a payment that does not require the recipient to have @@ -2791,29 +2908,34 @@ where let mut peer_state_lock = peer_state_mutex.lock().unwrap(); let peer_state = &mut *peer_state_lock; - let (chan, msg) = { - let (res, chan) = { - match peer_state.channel_by_id.remove(temporary_channel_id) { - Some(mut chan) => { - let funding_txo = find_funding_output(&chan, &funding_transaction)?; - - (chan.get_outbound_funding_created(funding_transaction, funding_txo, &self.logger) - .map_err(|e| if let ChannelError::Close(msg) = e { - MsgHandleErrInternal::from_finish_shutdown(msg, chan.channel_id(), chan.get_user_id(), chan.force_shutdown(true), None) - } else { unreachable!(); }) - , chan) + let (msg, chan) = match peer_state.channel_by_id.remove(temporary_channel_id) { + Some(mut chan) => { + let funding_txo = find_funding_output(&chan, &funding_transaction)?; + + let funding_res = chan.get_outbound_funding_created(funding_transaction, funding_txo, &self.logger) + .map_err(|e| if let ChannelError::Close(msg) = e { + MsgHandleErrInternal::from_finish_shutdown(msg, chan.channel_id(), chan.get_user_id(), chan.force_shutdown(true), None) + } else { unreachable!(); }); + match funding_res { + Ok(funding_msg) => (funding_msg, chan), + Err(_) => { + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + + let _ = handle_error!(self, funding_res, chan.get_counterparty_node_id()); + return Err(APIError::ChannelUnavailable { + err: "Signer refused to sign the initial commitment transaction".to_owned() + }); }, - None => { return Err(APIError::ChannelUnavailable { err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*temporary_channel_id), counterparty_node_id) }) }, } - }; - match handle_error!(self, res, chan.get_counterparty_node_id()) { - Ok(funding_msg) => { - (chan, funding_msg) - }, - Err(_) => { return Err(APIError::ChannelUnavailable { - err: "Signer refused to sign the initial commitment transaction".to_owned() - }) }, - } + }, + None => { + return Err(APIError::ChannelUnavailable { + err: format!( + "Channel with id {} not found for the passed counterparty node_id {}", + log_bytes!(*temporary_channel_id), counterparty_node_id), + }) + }, }; peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated { @@ -3276,7 +3398,7 @@ where } } } else { - for forward_info in pending_forwards.drain(..) { + 'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) { match forward_info { HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id, @@ -3284,13 +3406,19 @@ where routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat, .. } }) => { - let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret) = match routing { - PendingHTLCRouting::Receive { payment_data, incoming_cltv_expiry, phantom_shared_secret } => { + let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret, mut onion_fields) = match routing { + PendingHTLCRouting::Receive { payment_data, payment_metadata, incoming_cltv_expiry, phantom_shared_secret } => { let _legacy_hop_data = Some(payment_data.clone()); - (incoming_cltv_expiry, OnionPayload::Invoice { _legacy_hop_data }, Some(payment_data), phantom_shared_secret) + let onion_fields = + RecipientOnionFields { payment_secret: Some(payment_data.payment_secret), payment_metadata }; + (incoming_cltv_expiry, OnionPayload::Invoice { _legacy_hop_data }, + Some(payment_data), phantom_shared_secret, onion_fields) + }, + PendingHTLCRouting::ReceiveKeysend { payment_preimage, payment_metadata, incoming_cltv_expiry } => { + let onion_fields = RecipientOnionFields { payment_secret: None, payment_metadata }; + (incoming_cltv_expiry, OnionPayload::Spontaneous(payment_preimage), + None, None, onion_fields) }, - PendingHTLCRouting::ReceiveKeysend { payment_preimage, incoming_cltv_expiry } => - (incoming_cltv_expiry, OnionPayload::Spontaneous(payment_preimage), None, None), _ => { panic!("short_channel_id == 0 should imply any pending_forward entries are of type Receive"); } @@ -3334,6 +3462,7 @@ where HTLCFailReason::reason(0x4000 | 15, htlc_msat_height_data), HTLCDestination::FailedPayment { payment_hash: $payment_hash }, )); + continue 'next_forwardable_htlc; } } let phantom_shared_secret = claimable_htlc.prev_hop.phantom_shared_secret; @@ -3355,7 +3484,6 @@ where let mut claimable_payments = self.claimable_payments.lock().unwrap(); if claimable_payments.pending_claiming_payments.contains_key(&payment_hash) { fail_htlc!(claimable_htlc, payment_hash); - continue } let ref mut claimable_payment = claimable_payments.claimable_payments .entry(payment_hash) @@ -3363,15 +3491,21 @@ where .or_insert_with(|| { committed_to_claimable = true; ClaimablePayment { - purpose: purpose(), htlcs: Vec::new() + purpose: purpose(), htlcs: Vec::new(), onion_fields: None, } }); + if let Some(earlier_fields) = &mut claimable_payment.onion_fields { + if earlier_fields.check_merge(&mut onion_fields).is_err() { + fail_htlc!(claimable_htlc, payment_hash); + } + } else { + claimable_payment.onion_fields = Some(onion_fields); + } let ref mut htlcs = &mut claimable_payment.htlcs; if htlcs.len() == 1 { if let OnionPayload::Spontaneous(_) = htlcs[0].onion_payload { log_trace!(self.logger, "Failing new HTLC with payment_hash {} as we already had an existing keysend HTLC with the same payment hash", log_bytes!(payment_hash.0)); fail_htlc!(claimable_htlc, payment_hash); - continue } } let mut total_value = claimable_htlc.sender_intended_value; @@ -3415,6 +3549,7 @@ where via_channel_id: Some(prev_channel_id), via_user_channel_id: Some(prev_user_channel_id), claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER), + onion_fields: claimable_payment.onion_fields.clone(), }); payment_claimable_generated = true; } else { @@ -3447,7 +3582,6 @@ where Err(()) => { log_trace!(self.logger, "Failing new HTLC with payment_hash {} as payment verification failed", log_bytes!(payment_hash.0)); fail_htlc!(claimable_htlc, payment_hash); - continue } }; if let Some(min_final_cltv_expiry_delta) = min_final_cltv_expiry_delta { @@ -3456,7 +3590,6 @@ where log_trace!(self.logger, "Failing new HTLC with payment_hash {} as its CLTV expiry was too soon (had {}, earliest expected {})", log_bytes!(payment_hash.0), cltv_expiry, expected_min_expiry_height); fail_htlc!(claimable_htlc, payment_hash); - continue; } } check_total_value!(payment_data, payment_preimage); @@ -3465,7 +3598,6 @@ where let mut claimable_payments = self.claimable_payments.lock().unwrap(); if claimable_payments.pending_claiming_payments.contains_key(&payment_hash) { fail_htlc!(claimable_htlc, payment_hash); - continue } match claimable_payments.claimable_payments.entry(payment_hash) { hash_map::Entry::Vacant(e) => { @@ -3475,6 +3607,7 @@ where let purpose = events::PaymentPurpose::SpontaneousPayment(preimage); e.insert(ClaimablePayment { purpose: purpose.clone(), + onion_fields: Some(onion_fields.clone()), htlcs: vec![claimable_htlc], }); let prev_channel_id = prev_funding_outpoint.to_channel_id(); @@ -3486,6 +3619,7 @@ where via_channel_id: Some(prev_channel_id), via_user_channel_id: Some(prev_user_channel_id), claim_deadline, + onion_fields: Some(onion_fields), }); }, hash_map::Entry::Occupied(_) => { @@ -3500,7 +3634,6 @@ where if payment_data.is_none() { log_trace!(self.logger, "Failing new keysend HTLC with payment_hash {} because we already have an inbound payment with the same payment hash", log_bytes!(payment_hash.0)); fail_htlc!(claimable_htlc, payment_hash); - continue }; let payment_data = payment_data.unwrap(); if inbound_payment.get().payment_secret != payment_data.payment_secret { @@ -3671,27 +3804,39 @@ where } match chan.channel_update_status() { - ChannelUpdateStatus::Enabled if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged), - ChannelUpdateStatus::Disabled if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged), - ChannelUpdateStatus::DisabledStaged if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Enabled), - ChannelUpdateStatus::EnabledStaged if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Disabled), - ChannelUpdateStatus::DisabledStaged if !chan.is_live() => { - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); + ChannelUpdateStatus::Enabled if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged(0)), + ChannelUpdateStatus::Disabled if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged(0)), + ChannelUpdateStatus::DisabledStaged(_) if chan.is_live() + => chan.set_channel_update_status(ChannelUpdateStatus::Enabled), + ChannelUpdateStatus::EnabledStaged(_) if !chan.is_live() + => chan.set_channel_update_status(ChannelUpdateStatus::Disabled), + ChannelUpdateStatus::DisabledStaged(mut n) if !chan.is_live() => { + n += 1; + if n >= DISABLE_GOSSIP_TICKS { + chan.set_channel_update_status(ChannelUpdateStatus::Disabled); + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + should_persist = NotifyOption::DoPersist; + } else { + chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged(n)); } - should_persist = NotifyOption::DoPersist; - chan.set_channel_update_status(ChannelUpdateStatus::Disabled); }, - ChannelUpdateStatus::EnabledStaged if chan.is_live() => { - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); + ChannelUpdateStatus::EnabledStaged(mut n) if chan.is_live() => { + n += 1; + if n >= ENABLE_GOSSIP_TICKS { + chan.set_channel_update_status(ChannelUpdateStatus::Enabled); + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + should_persist = NotifyOption::DoPersist; + } else { + chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged(n)); } - should_persist = NotifyOption::DoPersist; - chan.set_channel_update_status(ChannelUpdateStatus::Enabled); }, _ => {}, } @@ -5743,30 +5888,8 @@ where pub async fn process_pending_events_async Future>( &self, handler: H ) { - // We'll acquire our total consistency lock until the returned future completes so that - // we can be sure no other persists happen while processing events. - let _read_guard = self.total_consistency_lock.read().unwrap(); - - let mut result = NotifyOption::SkipPersist; - - // TODO: This behavior should be documented. It's unintuitive that we query - // ChannelMonitors when clearing other events. - if self.process_pending_monitor_events() { - result = NotifyOption::DoPersist; - } - - let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); - if !pending_events.is_empty() { - result = NotifyOption::DoPersist; - } - - for event in pending_events { - handler(event).await; - } - - if result == NotifyOption::DoPersist { - self.persistence_notifier.notify(); - } + let mut ev; + process_events_body!(self, ev, { handler(ev).await }); } } @@ -5848,26 +5971,8 @@ where /// An [`EventHandler`] may safely call back to the provider in order to handle an event. /// However, it must not call [`Writeable::write`] as doing so would result in a deadlock. fn process_pending_events(&self, handler: H) where H::Target: EventHandler { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut result = NotifyOption::SkipPersist; - - // TODO: This behavior should be documented. It's unintuitive that we query - // ChannelMonitors when clearing other events. - if self.process_pending_monitor_events() { - result = NotifyOption::DoPersist; - } - - let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); - if !pending_events.is_empty() { - result = NotifyOption::DoPersist; - } - - for event in pending_events { - handler.handle_event(event); - } - - result - }); + let mut ev; + process_events_body!(self, ev, handler.handle_event(ev)); } } @@ -6720,10 +6825,12 @@ impl_writeable_tlv_based_enum!(PendingHTLCRouting, (0, payment_data, required), (1, phantom_shared_secret, option), (2, incoming_cltv_expiry, required), + (3, payment_metadata, option), }, (2, ReceiveKeysend) => { (0, payment_preimage, required), (2, incoming_cltv_expiry, required), + (3, payment_metadata, option), }, ;); @@ -7056,6 +7163,7 @@ where let pending_outbound_payments = self.pending_outbound_payments.pending_outbound_payments.lock().unwrap(); let mut htlc_purposes: Vec<&events::PaymentPurpose> = Vec::new(); + let mut htlc_onion_fields: Vec<&_> = Vec::new(); (claimable_payments.claimable_payments.len() as u64).write(writer)?; for (payment_hash, payment) in claimable_payments.claimable_payments.iter() { payment_hash.write(writer)?; @@ -7064,6 +7172,7 @@ where htlc.write(writer)?; } htlc_purposes.push(&payment.purpose); + htlc_onion_fields.push(&payment.onion_fields); } let mut monitor_update_blocked_actions_per_peer = None; @@ -7178,6 +7287,7 @@ where (7, self.fake_scid_rand_bytes, required), (9, htlc_purposes, vec_type), (11, self.probing_cookie_secret, required), + (13, htlc_onion_fields, optional_vec), }); Ok(()) @@ -7556,6 +7666,7 @@ where let mut fake_scid_rand_bytes: Option<[u8; 32]> = None; let mut probing_cookie_secret: Option<[u8; 32]> = None; let mut claimable_htlc_purposes = None; + let mut claimable_htlc_onion_fields = None; let mut pending_claiming_payments = Some(HashMap::new()); let mut monitor_update_blocked_actions_per_peer = Some(Vec::new()); read_tlv_fields!(reader, { @@ -7568,6 +7679,7 @@ where (7, fake_scid_rand_bytes, option), (9, claimable_htlc_purposes, vec_type), (11, probing_cookie_secret, option), + (13, claimable_htlc_onion_fields, optional_vec), }); if fake_scid_rand_bytes.is_none() { fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes()); @@ -7717,15 +7829,29 @@ where let expanded_inbound_key = inbound_payment::ExpandedKey::new(&inbound_pmt_key_material); let mut claimable_payments = HashMap::with_capacity(claimable_htlcs_list.len()); - if let Some(mut purposes) = claimable_htlc_purposes { + if let Some(purposes) = claimable_htlc_purposes { if purposes.len() != claimable_htlcs_list.len() { return Err(DecodeError::InvalidValue); } - for (purpose, (payment_hash, htlcs)) in purposes.drain(..).zip(claimable_htlcs_list.drain(..)) { - let existing_payment = claimable_payments.insert(payment_hash, ClaimablePayment { - purpose, htlcs, - }); - if existing_payment.is_some() { return Err(DecodeError::InvalidValue); } + if let Some(onion_fields) = claimable_htlc_onion_fields { + if onion_fields.len() != claimable_htlcs_list.len() { + return Err(DecodeError::InvalidValue); + } + for (purpose, (onion, (payment_hash, htlcs))) in + purposes.into_iter().zip(onion_fields.into_iter().zip(claimable_htlcs_list.into_iter())) + { + let existing_payment = claimable_payments.insert(payment_hash, ClaimablePayment { + purpose, htlcs, onion_fields: onion, + }); + if existing_payment.is_some() { return Err(DecodeError::InvalidValue); } + } + } else { + for (purpose, (payment_hash, htlcs)) in purposes.into_iter().zip(claimable_htlcs_list.into_iter()) { + let existing_payment = claimable_payments.insert(payment_hash, ClaimablePayment { + purpose, htlcs, onion_fields: None, + }); + if existing_payment.is_some() { return Err(DecodeError::InvalidValue); } + } } } else { // LDK versions prior to 0.0.107 did not write a `pending_htlc_purposes`, but do @@ -7756,7 +7882,7 @@ where events::PaymentPurpose::SpontaneousPayment(*payment_preimage), }; claimable_payments.insert(payment_hash, ClaimablePayment { - purpose, htlcs, + purpose, htlcs, onion_fields: None, }); } } @@ -8878,14 +9004,23 @@ pub mod bench { use test::Bencher; - struct NodeHolder<'a, P: Persist> { - node: &'a ChannelManager< - &'a ChainMonitor, - &'a test_utils::TestBroadcaster, &'a KeysManager, &'a KeysManager, &'a KeysManager, - &'a test_utils::TestFeeEstimator, &'a test_utils::TestRouter<'a>, - &'a test_utils::TestLogger>, + type Manager<'a, P> = ChannelManager< + &'a ChainMonitor, + &'a test_utils::TestBroadcaster, &'a KeysManager, &'a KeysManager, &'a KeysManager, + &'a test_utils::TestFeeEstimator, &'a test_utils::TestRouter<'a>, + &'a test_utils::TestLogger>; + + struct ANodeHolder<'a, P: Persist> { + node: &'a Manager<'a, P>, + } + impl<'a, P: Persist> NodeHolder for ANodeHolder<'a, P> { + type CM = Manager<'a, P>; + #[inline] + fn node(&self) -> &Manager<'a, P> { self.node } + #[inline] + fn chain_monitor(&self) -> Option<&test_utils::TestChainMonitor> { None } } #[cfg(test)] @@ -8916,7 +9051,7 @@ pub mod bench { network, best_block: BestBlock::from_network(network), }); - let node_a_holder = NodeHolder { node: &node_a }; + let node_a_holder = ANodeHolder { node: &node_a }; let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b); @@ -8926,7 +9061,7 @@ pub mod bench { network, best_block: BestBlock::from_network(network), }); - let node_b_holder = NodeHolder { node: &node_b }; + let node_b_holder = ANodeHolder { node: &node_b }; node_a.peer_connected(&node_b.get_our_node_id(), &Init { features: node_b.init_features(), remote_network_address: None }, true).unwrap(); node_b.peer_connected(&node_a.get_our_node_id(), &Init { features: node_a.init_features(), remote_network_address: None }, false).unwrap(); @@ -9022,15 +9157,15 @@ pub mod bench { let payment_event = SendEvent::from_event($node_a.get_and_clear_pending_msg_events().pop().unwrap()); $node_b.handle_update_add_htlc(&$node_a.get_our_node_id(), &payment_event.msgs[0]); $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &payment_event.commitment_msg); - let (raa, cs) = do_get_revoke_commit_msgs!(NodeHolder { node: &$node_b }, &$node_a.get_our_node_id()); + let (raa, cs) = get_revoke_commit_msgs(&ANodeHolder { node: &$node_b }, &$node_a.get_our_node_id()); $node_a.handle_revoke_and_ack(&$node_b.get_our_node_id(), &raa); $node_a.handle_commitment_signed(&$node_b.get_our_node_id(), &cs); - $node_b.handle_revoke_and_ack(&$node_a.get_our_node_id(), &get_event_msg!(NodeHolder { node: &$node_a }, MessageSendEvent::SendRevokeAndACK, $node_b.get_our_node_id())); + $node_b.handle_revoke_and_ack(&$node_a.get_our_node_id(), &get_event_msg!(ANodeHolder { node: &$node_a }, MessageSendEvent::SendRevokeAndACK, $node_b.get_our_node_id())); - expect_pending_htlcs_forwardable!(NodeHolder { node: &$node_b }); - expect_payment_claimable!(NodeHolder { node: &$node_b }, payment_hash, payment_secret, 10_000); + expect_pending_htlcs_forwardable!(ANodeHolder { node: &$node_b }); + expect_payment_claimable!(ANodeHolder { node: &$node_b }, payment_hash, payment_secret, 10_000); $node_b.claim_funds(payment_preimage); - expect_payment_claimed!(NodeHolder { node: &$node_b }, payment_hash, 10_000); + expect_payment_claimed!(ANodeHolder { node: &$node_b }, payment_hash, 10_000); match $node_b.get_and_clear_pending_msg_events().pop().unwrap() { MessageSendEvent::UpdateHTLCs { node_id, updates } => { @@ -9041,12 +9176,12 @@ pub mod bench { _ => panic!("Failed to generate claim event"), } - let (raa, cs) = do_get_revoke_commit_msgs!(NodeHolder { node: &$node_a }, &$node_b.get_our_node_id()); + let (raa, cs) = get_revoke_commit_msgs(&ANodeHolder { node: &$node_a }, &$node_b.get_our_node_id()); $node_b.handle_revoke_and_ack(&$node_a.get_our_node_id(), &raa); $node_b.handle_commitment_signed(&$node_a.get_our_node_id(), &cs); - $node_a.handle_revoke_and_ack(&$node_b.get_our_node_id(), &get_event_msg!(NodeHolder { node: &$node_b }, MessageSendEvent::SendRevokeAndACK, $node_a.get_our_node_id())); + $node_a.handle_revoke_and_ack(&$node_b.get_our_node_id(), &get_event_msg!(ANodeHolder { node: &$node_b }, MessageSendEvent::SendRevokeAndACK, $node_a.get_our_node_id())); - expect_payment_sent!(NodeHolder { node: &$node_a }, payment_preimage); + expect_payment_sent!(ANodeHolder { node: &$node_a }, payment_preimage); } }