X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=0cadbd41a29d21ccb99de1e0d7bf84319e33971c;hb=a7e3575ccc7cba00f712df6cfe5dab6d9ca17cee;hp=e97d6a5ee4289b7301e7b41edcd933eaa19f560c;hpb=305df1d7da1f23e60b7d06f2d8026e9dbcea35f4;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index e97d6a5e..0cadbd41 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -77,7 +77,7 @@ use core::time::Duration; use core::ops::Deref; // Re-export this for use in the public API. -pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure, RecipientOnionFields}; +pub use crate::ln::outbound_payment::{PaymentSendFailure, ProbeSendFailure, Retry, RetryableSendFailure, RecipientOnionFields}; use crate::ln::script::ShutdownScript; // We hold various information about HTLC relay in the HTLC objects in Channel itself: @@ -177,7 +177,7 @@ pub(super) enum HTLCForwardInfo { } /// Tracks the inbound corresponding to an outbound HTLC -#[derive(Clone, Hash, PartialEq, Eq)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub(crate) struct HTLCPreviousHopData { // Note that this may be an outbound SCID alias for the associated channel. short_channel_id: u64, @@ -233,7 +233,8 @@ impl From<&ClaimableHTLC> for events::ClaimedHTLC { } } -/// A payment identifier used to uniquely identify a payment to LDK. +/// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify +/// a payment and ensure idempotency in LDK. /// /// This is not exported to bindings users as we just use [u8; 32] directly #[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)] @@ -282,7 +283,7 @@ impl Readable for InterceptId { } } -#[derive(Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] /// Uniquely describes an HTLC by its source. Just the guaranteed-unique subset of [`HTLCSource`]. pub(crate) enum SentHTLCId { PreviousHopData { short_channel_id: u64, htlc_id: u64 }, @@ -313,7 +314,7 @@ impl_writeable_tlv_based_enum!(SentHTLCId, /// Tracks the inbound corresponding to an outbound HTLC #[allow(clippy::derive_hash_xor_eq)] // Our Hash is faithful to the data, we just don't have SecretKey::hash -#[derive(Clone, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum HTLCSource { PreviousHopData(HTLCPreviousHopData), OutboundRoute { @@ -494,6 +495,10 @@ impl MsgHandleErrInternal { channel_capacity: None, } } + + fn closes_channel(&self) -> bool { + self.chan_id.is_some() + } } /// We hold back HTLCs we intend to relay for a random interval greater than this (see @@ -655,7 +660,6 @@ pub(crate) enum RAAMonitorUpdateBlockingAction { } impl RAAMonitorUpdateBlockingAction { - #[allow(unused)] fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self { Self::ForwardedPaymentInboundClaim { channel_id: prev_hop.outpoint.to_channel_id(), @@ -835,33 +839,46 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = &'g L >; -macro_rules! define_test_pub_trait { ($vis: vis) => { -/// A trivial trait which describes any [`ChannelManager`] used in testing. -$vis trait AChannelManager { +/// A trivial trait which describes any [`ChannelManager`]. +pub trait AChannelManager { + /// A type implementing [`chain::Watch`]. type Watch: chain::Watch + ?Sized; + /// A type that may be dereferenced to [`Self::Watch`]. type M: Deref; + /// A type implementing [`BroadcasterInterface`]. type Broadcaster: BroadcasterInterface + ?Sized; + /// A type that may be dereferenced to [`Self::Broadcaster`]. type T: Deref; + /// A type implementing [`EntropySource`]. type EntropySource: EntropySource + ?Sized; + /// A type that may be dereferenced to [`Self::EntropySource`]. type ES: Deref; + /// A type implementing [`NodeSigner`]. type NodeSigner: NodeSigner + ?Sized; + /// A type that may be dereferenced to [`Self::NodeSigner`]. type NS: Deref; + /// A type implementing [`WriteableEcdsaChannelSigner`]. type Signer: WriteableEcdsaChannelSigner + Sized; + /// A type implementing [`SignerProvider`] for [`Self::Signer`]. type SignerProvider: SignerProvider + ?Sized; + /// A type that may be dereferenced to [`Self::SignerProvider`]. type SP: Deref; + /// A type implementing [`FeeEstimator`]. type FeeEstimator: FeeEstimator + ?Sized; + /// A type that may be dereferenced to [`Self::FeeEstimator`]. type F: Deref; + /// A type implementing [`Router`]. type Router: Router + ?Sized; + /// A type that may be dereferenced to [`Self::Router`]. type R: Deref; + /// A type implementing [`Logger`]. type Logger: Logger + ?Sized; + /// A type that may be dereferenced to [`Self::Logger`]. type L: Deref; + /// Returns a reference to the actual [`ChannelManager`] object. fn get_cm(&self) -> &ChannelManager; } -} } -#[cfg(any(test, feature = "_test_utils"))] -define_test_pub_trait!(pub); -#[cfg(not(any(test, feature = "_test_utils")))] -define_test_pub_trait!(pub(crate)); + impl AChannelManager for ChannelManager where @@ -1229,7 +1246,7 @@ enum NotifyOption { /// We allow callers to either always notify by constructing with `notify_on_drop` or choose to /// notify or not based on whether relevant changes have been made, providing a closure to /// `optionally_notify` which returns a `NotifyOption`. -struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { +struct PersistenceNotifierGuard<'a, F: FnMut() -> NotifyOption> { event_persist_notifier: &'a Notifier, needs_persist_flag: &'a AtomicBool, should_persist: F, @@ -1238,12 +1255,18 @@ struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { } impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused - fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + /// Notifies any waiters and indicates that we need to persist, in addition to possibly having + /// events to handle. + /// + /// This must always be called if the changes included a `ChannelMonitorUpdate`, as well as in + /// other cases where losing the changes on restart may result in a force-close or otherwise + /// isn't ideal. + fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { Self::optionally_notify(cm, || -> NotifyOption { NotifyOption::DoPersist }) } - fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, persist_check: F) - -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + fn optionally_notify NotifyOption, C: AChannelManager>(cm: &'a C, mut persist_check: F) + -> PersistenceNotifierGuard<'a, impl FnMut() -> NotifyOption> { let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); let force_notify = cm.get_cm().process_background_events(); @@ -1282,7 +1305,7 @@ impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care w } } -impl<'a, F: Fn() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { +impl<'a, F: FnMut() -> NotifyOption> Drop for PersistenceNotifierGuard<'a, F> { fn drop(&mut self) { match (self.should_persist)() { NotifyOption::DoPersist => { @@ -1697,11 +1720,15 @@ pub enum ChannelShutdownState { pub enum RecentPaymentDetails { /// When an invoice was requested and thus a payment has not yet been sent. AwaitingInvoice { - /// Identifier for the payment to ensure idempotency. + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. payment_id: PaymentId, }, /// When a payment is still being sent and awaiting successful delivery. Pending { + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. + payment_id: PaymentId, /// Hash of the payment that is currently being sent but has yet to be fulfilled or /// abandoned. payment_hash: PaymentHash, @@ -1713,6 +1740,9 @@ pub enum RecentPaymentDetails { /// been resolved. Upon receiving [`Event::PaymentSent`], we delay for a few minutes before the /// payment is removed from tracking. Fulfilled { + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. + payment_id: PaymentId, /// Hash of the payment that was claimed. `None` for serializations of [`ChannelManager`] /// made before LDK version 0.0.104. payment_hash: Option, @@ -1721,6 +1751,9 @@ pub enum RecentPaymentDetails { /// abandoned via [`ChannelManager::abandon_payment`], it is marked as abandoned until all /// pending HTLCs for this payment resolve and an [`Event::PaymentFailed`] is generated. Abandoned { + /// A user-provided identifier in [`ChannelManager::send_payment`] used to uniquely identify + /// a payment and ensure idempotency in LDK. + payment_id: PaymentId, /// Hash of the payment that we have given up trying to send. payment_hash: PaymentHash, }, @@ -2152,9 +2185,14 @@ macro_rules! process_events_body { processed_all_events = false; } - if result == NotifyOption::DoPersist { - $self.needs_persist_flag.store(true, Ordering::Release); - $self.event_persist_notifier.notify(); + match result { + NotifyOption::DoPersist => { + $self.needs_persist_flag.store(true, Ordering::Release); + $self.event_persist_notifier.notify(); + }, + NotifyOption::SkipPersistHandleEvents => + $self.event_persist_notifier.notify(), + NotifyOption::SkipPersistNoEvents => {}, } } } @@ -2460,15 +2498,16 @@ where }, PendingOutboundPayment::Retryable { payment_hash, total_msat, .. } => { Some(RecentPaymentDetails::Pending { + payment_id: *payment_id, payment_hash: *payment_hash, total_msat: *total_msat, }) }, PendingOutboundPayment::Abandoned { payment_hash, .. } => { - Some(RecentPaymentDetails::Abandoned { payment_hash: *payment_hash }) + Some(RecentPaymentDetails::Abandoned { payment_id: *payment_id, payment_hash: *payment_hash }) }, PendingOutboundPayment::Fulfilled { payment_hash, .. } => { - Some(RecentPaymentDetails::Fulfilled { payment_hash: *payment_hash }) + Some(RecentPaymentDetails::Fulfilled { payment_id: *payment_id, payment_hash: *payment_hash }) }, PendingOutboundPayment::Legacy { .. } => None }) @@ -2769,7 +2808,7 @@ where let (short_channel_id, amt_to_forward, outgoing_cltv_value) = match hop_data { msgs::InboundOnionPayload::Forward { short_channel_id, amt_to_forward, outgoing_cltv_value } => (short_channel_id, amt_to_forward, outgoing_cltv_value), - msgs::InboundOnionPayload::Receive { .. } => + msgs::InboundOnionPayload::Receive { .. } | msgs::InboundOnionPayload::BlindedReceive { .. } => return Err(InboundOnionErr { msg: "Final Node OnionHopData provided for us as an intermediary node", err_code: 0x4000 | 22, @@ -2801,12 +2840,19 @@ where payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata, .. } => (payment_data, keysend_preimage, custom_tlvs, amt_msat, outgoing_cltv_value, payment_metadata), - _ => + msgs::InboundOnionPayload::BlindedReceive { + amt_msat, total_msat, outgoing_cltv_value, payment_secret, .. + } => { + let payment_data = msgs::FinalOnionHopData { payment_secret, total_msat }; + (Some(payment_data), None, Vec::new(), amt_msat, outgoing_cltv_value, None) + } + msgs::InboundOnionPayload::Forward { .. } => { return Err(InboundOnionErr { err_code: 0x4000|22, err_data: Vec::new(), msg: "Got non final data with an HMAC of 0", - }), + }) + }, }; // final_incorrect_cltv_expiry if outgoing_cltv_value > cltv_expiry { @@ -2946,7 +2992,10 @@ where } } - let next_hop = match onion_utils::decode_next_payment_hop(shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, msg.payment_hash) { + let next_hop = match onion_utils::decode_next_payment_hop( + shared_secret, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac, + msg.payment_hash, &self.node_signer + ) { Ok(res) => res, Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => { return_malformed_err!(err_msg, err_code); @@ -2968,7 +3017,9 @@ where // We'll do receive checks in [`Self::construct_pending_htlc_info`] so we have access to the // inbound channel's state. onion_utils::Hop::Receive { .. } => return Ok((next_hop, shared_secret, None)), - onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } => { + onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::Receive { .. }, .. } | + onion_utils::Hop::Forward { next_hop_data: msgs::InboundOnionPayload::BlindedReceive { .. }, .. } => + { return_err!("Final Node OnionHopData provided for us as an intermediary node", 0x4000 | 22, &[0; 0]); } }; @@ -3508,6 +3559,116 @@ where outbound_payment::payment_is_probe(payment_hash, payment_id, self.probing_cookie_secret) } + /// Sends payment probes over all paths of a route that would be used to pay the given + /// amount to the given `node_id`. + /// + /// See [`ChannelManager::send_preflight_probes`] for more information. + pub fn send_spontaneous_preflight_probes( + &self, node_id: PublicKey, amount_msat: u64, final_cltv_expiry_delta: u32, + liquidity_limit_multiplier: Option, + ) -> Result, ProbeSendFailure> { + let payment_params = + PaymentParameters::from_node_id(node_id, final_cltv_expiry_delta); + + let route_params = RouteParameters { payment_params, final_value_msat: amount_msat }; + + self.send_preflight_probes(route_params, liquidity_limit_multiplier) + } + + /// Sends payment probes over all paths of a route that would be used to pay a route found + /// according to the given [`RouteParameters`]. + /// + /// This may be used to send "pre-flight" probes, i.e., to train our scorer before conducting + /// the actual payment. Note this is only useful if there likely is sufficient time for the + /// probe to settle before sending out the actual payment, e.g., when waiting for user + /// confirmation in a wallet UI. + /// + /// Otherwise, there is a chance the probe could take up some liquidity needed to complete the + /// actual payment. Users should therefore be cautious and might avoid sending probes if + /// liquidity is scarce and/or they don't expect the probe to return before they send the + /// payment. To mitigate this issue, channels with available liquidity less than the required + /// amount times the given `liquidity_limit_multiplier` won't be used to send pre-flight + /// probes. If `None` is given as `liquidity_limit_multiplier`, it defaults to `3`. + pub fn send_preflight_probes( + &self, route_params: RouteParameters, liquidity_limit_multiplier: Option, + ) -> Result, ProbeSendFailure> { + let liquidity_limit_multiplier = liquidity_limit_multiplier.unwrap_or(3); + + let payer = self.get_our_node_id(); + let usable_channels = self.list_usable_channels(); + let first_hops = usable_channels.iter().collect::>(); + let inflight_htlcs = self.compute_inflight_htlcs(); + + let route = self + .router + .find_route(&payer, &route_params, Some(&first_hops), inflight_htlcs) + .map_err(|e| { + log_error!(self.logger, "Failed to find path for payment probe: {:?}", e); + ProbeSendFailure::RouteNotFound + })?; + + let mut used_liquidity_map = HashMap::with_capacity(first_hops.len()); + + let mut res = Vec::new(); + + for mut path in route.paths { + // If the last hop is probably an unannounced channel we refrain from probing all the + // way through to the end and instead probe up to the second-to-last channel. + while let Some(last_path_hop) = path.hops.last() { + if last_path_hop.maybe_announced_channel { + // We found a potentially announced last hop. + break; + } else { + // Drop the last hop, as it's likely unannounced. + log_debug!( + self.logger, + "Avoided sending payment probe all the way to last hop {} as it is likely unannounced.", + last_path_hop.short_channel_id + ); + let final_value_msat = path.final_value_msat(); + path.hops.pop(); + if let Some(new_last) = path.hops.last_mut() { + new_last.fee_msat += final_value_msat; + } + } + } + + if path.hops.len() < 2 { + log_debug!( + self.logger, + "Skipped sending payment probe over path with less than two hops." + ); + continue; + } + + if let Some(first_path_hop) = path.hops.first() { + if let Some(first_hop) = first_hops.iter().find(|h| { + h.get_outbound_payment_scid() == Some(first_path_hop.short_channel_id) + }) { + let path_value = path.final_value_msat() + path.fee_msat(); + let used_liquidity = + used_liquidity_map.entry(first_path_hop.short_channel_id).or_insert(0); + + if first_hop.next_outbound_htlc_limit_msat + < (*used_liquidity + path_value) * liquidity_limit_multiplier + { + log_debug!(self.logger, "Skipped sending payment probe to avoid putting channel {} under the liquidity limit.", first_path_hop.short_channel_id); + continue; + } else { + *used_liquidity += path_value; + } + } + } + + res.push(self.send_probe(path).map_err(|e| { + log_error!(self.logger, "Failed to send pre-flight probe: {:?}", e); + ProbeSendFailure::SendingFailed(e) + })?); + } + + Ok(res) + } + /// Handles the generation of a funding transaction, optionally (for tests) with a function /// which checks the correctness of the funding transaction given the associated channel. fn funding_transaction_generated_intern, &Transaction) -> Result>( @@ -3955,7 +4116,10 @@ where let phantom_pubkey_res = self.node_signer.get_node_id(Recipient::PhantomNode); if phantom_pubkey_res.is_ok() && fake_scid::is_valid_phantom(&self.fake_scid_rand_bytes, short_chan_id, &self.genesis_hash) { let phantom_shared_secret = self.node_signer.ecdh(Recipient::PhantomNode, &onion_packet.public_key.unwrap(), None).unwrap().secret_bytes(); - let next_hop = match onion_utils::decode_next_payment_hop(phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, payment_hash) { + let next_hop = match onion_utils::decode_next_payment_hop( + phantom_shared_secret, &onion_packet.hop_data, onion_packet.hmac, + payment_hash, &self.node_signer + ) { Ok(res) => res, Err(onion_utils::OnionDecodeErr::Malformed { err_msg, err_code }) => { let sha256_of_onion = Sha256::hash(&onion_packet.hop_data).into_inner(); @@ -5194,11 +5358,17 @@ where self.pending_outbound_payments.finalize_claims(sources, &self.pending_events); } - fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option, from_onchain: bool, next_channel_outpoint: OutPoint) { + fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, + forwarded_htlc_value_msat: Option, from_onchain: bool, + next_channel_counterparty_node_id: Option, next_channel_outpoint: OutPoint + ) { match source { HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => { debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire), "We don't support claim_htlc claims during startup - monitors may not be available yet"); + if let Some(pubkey) = next_channel_counterparty_node_id { + debug_assert_eq!(pubkey, path.hops[0].pubkey); + } let ev_completion_action = EventCompletionAction::ReleaseRAAChannelMonitorUpdate { channel_funding_outpoint: next_channel_outpoint, counterparty_node_id: path.hops[0].pubkey, @@ -5209,6 +5379,7 @@ where }, HTLCSource::PreviousHopData(hop_data) => { let prev_outpoint = hop_data.outpoint; + let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data); let res = self.claim_funds_from_hop(hop_data, payment_preimage, |htlc_claim_value_msat| { if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { @@ -5224,7 +5395,17 @@ where next_channel_id: Some(next_channel_outpoint.to_channel_id()), outbound_amount_forwarded_msat: forwarded_htlc_value_msat, }, - downstream_counterparty_and_funding_outpoint: None, + downstream_counterparty_and_funding_outpoint: + if let Some(node_id) = next_channel_counterparty_node_id { + Some((node_id, next_channel_outpoint, completed_blocker)) + } else { + // We can only get `None` here if we are processing a + // `ChannelMonitor`-originated event, in which case we + // don't care about ensuring we wake the downstream + // channel's monitor updating - the channel is already + // closed. + None + }, }) } else { None } }); @@ -5560,6 +5741,8 @@ where } fn internal_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are + // likely to be lost on restart! if msg.chain_hash != self.genesis_hash { return Err(MsgHandleErrInternal::send_err_msg_no_close("Unknown genesis block hash".to_owned(), msg.temporary_channel_id.clone())); } @@ -5659,6 +5842,8 @@ where } fn internal_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this, so any changes are + // likely to be lost on restart! let (value, output_script, user_id) = { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -5819,6 +6004,8 @@ where } fn internal_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -5997,6 +6184,9 @@ where //encrypted with the same key. It's not immediately obvious how to usefully exploit that, //but we should prevent it anyway. + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! + let decoded_hop_res = self.decode_update_add_htlc_onion(msg); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -6063,6 +6253,17 @@ where hash_map::Entry::Occupied(mut chan_phase_entry) => { if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() { let res = try_chan_phase_entry!(self, chan.update_fulfill_htlc(&msg), chan_phase_entry); + if let HTLCSource::PreviousHopData(prev_hop) = &res.0 { + peer_state.actions_blocking_raa_monitor_updates.entry(msg.channel_id) + .or_insert_with(Vec::new) + .push(RAAMonitorUpdateBlockingAction::from_prev_hop_data(&prev_hop)); + } + // Note that we do not need to push an `actions_blocking_raa_monitor_updates` + // entry here, even though we *do* need to block the next RAA monitor update. + // We do this instead in the `claim_funds_internal` by attaching a + // `ReleaseRAAChannelMonitorUpdate` action to the event generated when the + // outbound HTLC is claimed. This is guaranteed to all complete before we + // process the RAA as messages are processed from single peers serially. funding_txo = chan.context.get_funding_txo().expect("We won't accept a fulfill until funded"); res } else { @@ -6073,11 +6274,13 @@ where hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } }; - self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, funding_txo); + self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, Some(*counterparty_node_id), funding_txo); Ok(()) } fn internal_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -6101,6 +6304,8 @@ where } fn internal_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) -> Result<(), MsgHandleErrInternal> { + // Note that the ChannelManager is NOT re-persisted on disk after this (unless we error + // closing a channel), so any changes are likely to be lost on restart! let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -6275,6 +6480,23 @@ where }) } + #[cfg(any(test, feature = "_test_utils"))] + pub(crate) fn test_raa_monitor_updates_held(&self, + counterparty_node_id: PublicKey, channel_id: ChannelId + ) -> bool { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lck = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lck; + + if let Some(chan) = peer_state.channel_by_id.get(&channel_id) { + return self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates, + chan.context().get_funding_txo().unwrap(), counterparty_node_id); + } + } + false + } + fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { let (htlcs_to_fail, res) = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6418,7 +6640,7 @@ where Ok(NotifyOption::DoPersist) } - fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result<(), MsgHandleErrInternal> { + fn internal_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) -> Result { let htlc_forwards; let need_lnd_workaround = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6474,14 +6696,16 @@ where } }; + let mut persist = NotifyOption::SkipPersistHandleEvents; if let Some(forwards) = htlc_forwards { self.forward_htlcs(&mut [forwards][..]); + persist = NotifyOption::DoPersist; } if let Some(channel_ready_msg) = need_lnd_workaround { self.internal_channel_ready(counterparty_node_id, &channel_ready_msg)?; } - Ok(()) + Ok(persist) } /// Process pending events from the [`chain::Watch`], returning whether any events were processed. @@ -6496,8 +6720,8 @@ where match monitor_event { MonitorEvent::HTLCEvent(htlc_update) => { if let Some(preimage) = htlc_update.payment_preimage { - log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", &preimage); - self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint); + log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", preimage); + self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, counterparty_node_id, funding_outpoint); } else { log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", &htlc_update.payment_hash); let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() }; @@ -7476,8 +7700,21 @@ where L::Target: Logger, { fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // open_channel message - pre-funded channels are never written so there should be no + // change to the contents. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_open_channel(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => { + debug_assert!(false, "We shouldn't close a new channel"); + NotifyOption::DoPersist + }, + _ => NotifyOption::SkipPersistHandleEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { @@ -7487,8 +7724,13 @@ where } fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // accept_channel message - pre-funded channels are never written so there should be no + // change to the contents. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); + NotifyOption::SkipPersistHandleEvents + }); } fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { @@ -7508,8 +7750,19 @@ where } fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // channel_ready message - while the channel's state will change, any channel_ready message + // will ultimately be re-sent on startup and the `ChannelMonitor` won't be updated so we + // will not force-close the channel on startup. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_ready(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + _ => NotifyOption::SkipPersistHandleEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) { @@ -7523,8 +7776,19 @@ where } fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_add_htlc message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_add_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { @@ -7533,13 +7797,35 @@ where } fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fail_htlc message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fail_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fail_malformed_htlc message - the message itself doesn't change our channel state + // only the `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fail_malformed_htlc(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { @@ -7553,8 +7839,19 @@ where } fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id); + // Note that we never need to persist the updated ChannelManager for an inbound + // update_fee message - the message itself doesn't change our channel state only the + // `commitment_signed` message afterwards will. + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_update_fee(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(()) => NotifyOption::SkipPersistNoEvents, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { @@ -7567,18 +7864,28 @@ where if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { persist } else { - NotifyOption::SkipPersistNoEvents + NotifyOption::DoPersist } }); } fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_channel_reestablish(counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(persist) => *persist, + }; + let _ = handle_error!(self, res, *counterparty_node_id); + persist + }); } fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify( + self, || NotifyOption::SkipPersistHandleEvents); + let mut failed_channels = Vec::new(); let mut per_peer_state = self.per_peer_state.write().unwrap(); let remove_peer = { @@ -7677,76 +7984,82 @@ where return Err(()); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + let mut res = Ok(()); - // If we have too many peers connected which don't have funded channels, disconnect the - // peer immediately (as long as it doesn't have funded channels). If we have a bunch of - // unfunded channels taking up space in memory for disconnected peers, we still let new - // peers connect, but we'll reject new channels from them. - let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); - let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; + PersistenceNotifierGuard::optionally_notify(self, || { + // If we have too many peers connected which don't have funded channels, disconnect the + // peer immediately (as long as it doesn't have funded channels). If we have a bunch of + // unfunded channels taking up space in memory for disconnected peers, we still let new + // peers connect, but we'll reject new channels from them. + let connected_peers_without_funded_channels = self.peers_without_funded_channels(|node| node.is_connected); + let inbound_peer_limited = inbound && connected_peers_without_funded_channels >= MAX_NO_CHANNEL_PEERS; - { - let mut peer_state_lock = self.per_peer_state.write().unwrap(); - match peer_state_lock.entry(counterparty_node_id.clone()) { - hash_map::Entry::Vacant(e) => { - if inbound_peer_limited { - return Err(()); - } - e.insert(Mutex::new(PeerState { - channel_by_id: HashMap::new(), - inbound_channel_request_by_id: HashMap::new(), - latest_features: init_msg.features.clone(), - pending_msg_events: Vec::new(), - in_flight_monitor_updates: BTreeMap::new(), - monitor_update_blocked_actions: BTreeMap::new(), - actions_blocking_raa_monitor_updates: BTreeMap::new(), - is_connected: true, - })); - }, - hash_map::Entry::Occupied(e) => { - let mut peer_state = e.get().lock().unwrap(); - peer_state.latest_features = init_msg.features.clone(); - - let best_block_height = self.best_block.read().unwrap().height(); - if inbound_peer_limited && - Self::unfunded_channel_count(&*peer_state, best_block_height) == - peer_state.channel_by_id.len() - { - return Err(()); - } + { + let mut peer_state_lock = self.per_peer_state.write().unwrap(); + match peer_state_lock.entry(counterparty_node_id.clone()) { + hash_map::Entry::Vacant(e) => { + if inbound_peer_limited { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } + e.insert(Mutex::new(PeerState { + channel_by_id: HashMap::new(), + inbound_channel_request_by_id: HashMap::new(), + latest_features: init_msg.features.clone(), + pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), + monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), + is_connected: true, + })); + }, + hash_map::Entry::Occupied(e) => { + let mut peer_state = e.get().lock().unwrap(); + peer_state.latest_features = init_msg.features.clone(); + + let best_block_height = self.best_block.read().unwrap().height(); + if inbound_peer_limited && + Self::unfunded_channel_count(&*peer_state, best_block_height) == + peer_state.channel_by_id.len() + { + res = Err(()); + return NotifyOption::SkipPersistNoEvents; + } - debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); - peer_state.is_connected = true; - }, + debug_assert!(!peer_state.is_connected, "A peer shouldn't be connected twice"); + peer_state.is_connected = true; + }, + } } - } - log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); + log_debug!(self.logger, "Generating channel_reestablish events for {}", log_pubkey!(counterparty_node_id)); - let per_peer_state = self.per_peer_state.read().unwrap(); - if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { - let mut peer_state_lock = peer_state_mutex.lock().unwrap(); - let peer_state = &mut *peer_state_lock; - let pending_msg_events = &mut peer_state.pending_msg_events; + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; - peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| - if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { - // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted - // (so won't be recovered after a crash), they shouldn't exist here and we would never need to - // worry about closing and removing them. - debug_assert!(false); - None - } - ).for_each(|chan| { - pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { - node_id: chan.context.get_counterparty_node_id(), - msg: chan.get_channel_reestablish(&self.logger), + peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)| + if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { + // Since unfunded channel maps are cleared upon disconnecting a peer, and they're not persisted + // (so won't be recovered after a crash), they shouldn't exist here and we would never need to + // worry about closing and removing them. + debug_assert!(false); + None + } + ).for_each(|chan| { + pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish { + node_id: chan.context.get_counterparty_node_id(), + msg: chan.get_channel_reestablish(&self.logger), + }); }); - }); - } - //TODO: Also re-broadcast announcement_signatures - Ok(()) + } + + return NotifyOption::SkipPersistHandleEvents; + //TODO: Also re-broadcast announcement_signatures + }); + res } fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) { @@ -9328,6 +9641,7 @@ where // downstream chan is closed (because we don't have a // channel_id -> peer map entry). counterparty_opt.is_none(), + counterparty_opt.cloned().or(monitor.get_counterparty_node_id()), monitor.get_funding_txo().0)) } else { None } } else { @@ -9608,12 +9922,12 @@ where channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver); } - for (source, preimage, downstream_value, downstream_closed, downstream_funding) in pending_claims_to_replay { + for (source, preimage, downstream_value, downstream_closed, downstream_node_id, downstream_funding) in pending_claims_to_replay { // We use `downstream_closed` in place of `from_onchain` here just as a guess - we // don't remember in the `ChannelMonitor` where we got a preimage from, but if the // channel is closed we just assume that it probably came from an on-chain claim. channel_manager.claim_funds_internal(source, preimage, Some(downstream_value), - downstream_closed, downstream_funding); + downstream_closed, downstream_node_id, downstream_funding); } //TODO: Broadcast channel update for closed channels, but only after we've made a @@ -9868,7 +10182,7 @@ mod tests { // To start (1), send a regular payment but don't claim it. let expected_route = [&nodes[1]]; - let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &expected_route, 100_000); + let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &expected_route, 100_000); // Next, attempt a keysend payment and make sure it fails. let route_params = RouteParameters::from_payment_params_and_value(