X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=f6cb81376e2490a205127760d5d6f14f0798bf0c;hb=6aca7e1c4db17f43b79504fd44b942b4bc08db9d;hp=9f403f85b2f0aec6b841961fa47fd7e6bce424e7;hpb=15050895fd409ff8b01092dc2db30d8222063631;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 9f403f85..f6cb8137 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -45,8 +45,8 @@ use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, No #[cfg(any(feature = "_test_utils", test))] use crate::ln::features::InvoiceFeatures; use crate::routing::gossip::NetworkGraph; -use crate::routing::router::{DefaultRouter, InFlightHtlcs, PaymentParameters, Route, RouteHop, RouteParameters, RoutePath, Router}; -use crate::routing::scoring::ProbabilisticScorer; +use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, Payee, PaymentParameters, Route, RouteHop, RouteParameters, Router}; +use crate::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; use crate::ln::msgs; use crate::ln::onion_utils; use crate::ln::onion_utils::HTLCFailReason; @@ -55,7 +55,7 @@ use crate::ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, MAX_VA use crate::ln::outbound_payment; use crate::ln::outbound_payment::{OutboundPayments, PaymentAttempts, PendingOutboundPayment}; use crate::ln::wire::Encode; -use crate::chain::keysinterface::{EntropySource, KeysManager, NodeSigner, Recipient, SignerProvider, ChannelSigner, WriteableEcdsaChannelSigner}; +use crate::sign::{EntropySource, KeysManager, NodeSigner, Recipient, SignerProvider, ChannelSigner, WriteableEcdsaChannelSigner}; use crate::util::config::{UserConfig, ChannelConfig}; use crate::util::wakers::{Future, Notifier}; use crate::util::scid_utils::fake_scid; @@ -72,12 +72,13 @@ use core::{cmp, mem}; use core::cell::RefCell; use crate::io::Read; use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState}; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; use core::time::Duration; use core::ops::Deref; // Re-export this for use in the public API. pub use crate::ln::outbound_payment::{PaymentSendFailure, Retry, RetryableSendFailure, RecipientOnionFields}; +use crate::ln::script::ShutdownScript; // We hold various information about HTLC relay in the HTLC objects in Channel itself: // @@ -106,11 +107,13 @@ pub(super) enum PendingHTLCRouting { }, Receive { payment_data: msgs::FinalOnionHopData, + payment_metadata: Option>, incoming_cltv_expiry: u32, // Used to track when we should expire pending HTLCs that go unclaimed phantom_shared_secret: Option<[u8; 32]>, }, ReceiveKeysend { payment_preimage: PaymentPreimage, + payment_metadata: Option>, incoming_cltv_expiry: u32, // Used to track when we should expire pending HTLCs that go unclaimed }, } @@ -280,7 +283,7 @@ impl_writeable_tlv_based_enum!(SentHTLCId, pub(crate) enum HTLCSource { PreviousHopData(HTLCPreviousHopData), OutboundRoute { - path: Vec, + path: Path, session_priv: SecretKey, /// Technically we can recalculate this from the route, but we cache it here to avoid /// doing a double-pass on route when we get a failure back @@ -311,7 +314,7 @@ impl HTLCSource { #[cfg(test)] pub fn dummy() -> Self { HTLCSource::OutboundRoute { - path: Vec::new(), + path: Path { hops: Vec::new(), blinded_tail: None }, session_priv: SecretKey::from_slice(&[1; 32]).unwrap(), first_hop_htlc_msat: 0, payment_id: PaymentId([2; 32]), @@ -470,6 +473,12 @@ impl_writeable_tlv_based!(ClaimingPayment, { (4, receiver_node_id, required), }); +struct ClaimablePayment { + purpose: events::PaymentPurpose, + onion_fields: Option, + htlcs: Vec, +} + /// Information about claimable or being-claimed payments struct ClaimablePayments { /// Map from payment hash to the payment data and any HTLCs which are to us and can be @@ -480,7 +489,7 @@ struct ClaimablePayments { /// /// When adding to the map, [`Self::pending_claiming_payments`] must also be checked to ensure /// we don't get a duplicate payment. - claimable_htlcs: HashMap)>, + claimable_payments: HashMap, /// Map from payment hash to the payment data for HTLCs which we have begun claiming, but which /// are waiting on a [`ChannelMonitorUpdate`] to complete in order to be surfaced to the user @@ -492,9 +501,11 @@ struct ClaimablePayments { /// for some reason. They are handled in timer_tick_occurred, so may be processed with /// quite some time lag. enum BackgroundEvent { - /// Handle a ChannelMonitorUpdate that closes a channel, broadcasting its current latest holder - /// commitment transaction. - ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)), + /// Handle a ChannelMonitorUpdate + /// + /// Note that any such events are lost on shutdown, so in general they must be updates which + /// are regenerated on startup. + MonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), } #[derive(Debug)] @@ -513,6 +524,20 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (2, EmitEvent) => { (0, event, upgradable_required) }, ); +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum EventCompletionAction { + ReleaseRAAChannelMonitorUpdate { + counterparty_node_id: PublicKey, + channel_funding_outpoint: OutPoint, + }, +} +impl_writeable_tlv_based_enum!(EventCompletionAction, + (0, ReleaseRAAChannelMonitorUpdate) => { + (0, channel_funding_outpoint, required), + (2, counterparty_node_id, required), + }; +); + /// State we hold per-peer. pub(super) struct PeerState { /// `temporary_channel_id` or `channel_id` -> `channel`. @@ -600,7 +625,9 @@ pub type SimpleArcChannelManager = ChannelManager< Arc>>, Arc, - Arc>>, Arc>>> + Arc>>, Arc>>>, + ProbabilisticScoringFeeParameters, + ProbabilisticScorer>>, Arc>, >>, Arc >; @@ -616,7 +643,7 @@ pub type SimpleArcChannelManager = ChannelManager< /// of [`KeysManager`] and [`DefaultRouter`]. /// /// This is not exported to bindings users as Arcs don't make sense in bindings -pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>>, &'g L>; +pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>, ProbabilisticScoringFeeParameters, ProbabilisticScorer<&'f NetworkGraph<&'g L>, &'g L>>, &'g L>; /// A trivial trait which describes any [`ChannelManager`] used in testing. #[cfg(any(test, feature = "_test_utils"))] @@ -924,8 +951,19 @@ where #[cfg(any(test, feature = "_test_utils"))] pub(super) per_peer_state: FairRwLock::Signer>>>>, + /// The set of events which we need to give to the user to handle. In some cases an event may + /// require some further action after the user handles it (currently only blocking a monitor + /// update from being handed to the user to ensure the included changes to the channel state + /// are handled by the user before they're persisted durably to disk). In that case, the second + /// element in the tuple is set to `Some` with further details of the action. + /// + /// Note that events MUST NOT be removed from pending_events after deserialization, as they + /// could be in the middle of being processed without the direct mutex held. + /// /// See `ChannelManager` struct-level documentation for lock order requirements. - pending_events: Mutex>, + pending_events: Mutex)>>, + /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously. + pending_events_processor: AtomicBool, /// See `ChannelManager` struct-level documentation for lock order requirements. pending_background_events: Mutex>, /// Used when we have to take a BIG lock to make sure everything is self-consistent. @@ -1070,6 +1108,14 @@ pub(crate) const MPP_TIMEOUT_TICKS: u8 = 3; /// [`OutboundPayments::remove_stale_resolved_payments`]. pub(crate) const IDEMPOTENCY_TIMEOUT_TICKS: u8 = 7; +/// The number of ticks of [`ChannelManager::timer_tick_occurred`] where a peer is disconnected +/// until we mark the channel disabled and gossip the update. +pub(crate) const DISABLE_GOSSIP_TICKS: u8 = 10; + +/// The number of ticks of [`ChannelManager::timer_tick_occurred`] where a peer is connected until +/// we mark the channel enabled and gossip the update. +pub(crate) const ENABLE_GOSSIP_TICKS: u8 = 5; + /// The maximum number of unfunded channels we can have per-peer before we start rejecting new /// (inbound) ones. The number of peers with unfunded channels is limited separately in /// [`MAX_UNFUNDED_CHANNEL_PEERS`]. @@ -1396,7 +1442,7 @@ pub enum RecentPaymentDetails { /// Route hints used in constructing invoices for [phantom node payents]. /// -/// [phantom node payments]: crate::chain::keysinterface::PhantomKeysManager +/// [phantom node payments]: crate::sign::PhantomKeysManager #[derive(Clone)] pub struct PhantomRouteHints { /// The list of channels to be included in the invoice route hints. @@ -1428,10 +1474,10 @@ macro_rules! handle_error { }); } if let Some((channel_id, user_channel_id)) = chan_id { - $self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { + $self.pending_events.lock().unwrap().push_back((events::Event::ChannelClosed { channel_id, user_channel_id, reason: ClosureReason::ProcessingError { err: err.err.clone() } - }); + }, None)); } } @@ -1563,13 +1609,13 @@ macro_rules! send_channel_ready { macro_rules! emit_channel_pending_event { ($locked_events: expr, $channel: expr) => { if $channel.should_emit_channel_pending_event() { - $locked_events.push(events::Event::ChannelPending { + $locked_events.push_back((events::Event::ChannelPending { channel_id: $channel.channel_id(), former_temporary_channel_id: $channel.temporary_channel_id(), counterparty_node_id: $channel.get_counterparty_node_id(), user_channel_id: $channel.get_user_id(), funding_txo: $channel.get_funding_txo().unwrap().into_bitcoin_outpoint(), - }); + }, None)); $channel.set_channel_pending_event_emitted(); } } @@ -1579,12 +1625,12 @@ macro_rules! emit_channel_ready_event { ($locked_events: expr, $channel: expr) => { if $channel.should_emit_channel_ready_event() { debug_assert!($channel.channel_pending_event_emitted()); - $locked_events.push(events::Event::ChannelReady { + $locked_events.push_back((events::Event::ChannelReady { channel_id: $channel.channel_id(), user_channel_id: $channel.get_user_id(), counterparty_node_id: $channel.get_counterparty_node_id(), channel_type: $channel.get_channel_type().clone(), - }); + }, None)); $channel.set_channel_ready_event_emitted(); } } @@ -1662,11 +1708,8 @@ macro_rules! handle_new_monitor_update { res }, ChannelMonitorUpdateStatus::Completed => { - if ($update_id == 0 || $chan.get_next_monitor_update() - .expect("We can't be processing a monitor update if it isn't queued") - .update_id == $update_id) && - $chan.get_latest_monitor_update_id() == $update_id - { + $chan.complete_one_mon_update($update_id); + if $chan.no_monitor_updates_pending() { handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); } Ok(()) @@ -1680,30 +1723,58 @@ macro_rules! handle_new_monitor_update { macro_rules! process_events_body { ($self: expr, $event_to_handle: expr, $handle_event: expr) => { - // We'll acquire our total consistency lock until the returned future completes so that - // we can be sure no other persists happen while processing events. - let _read_guard = $self.total_consistency_lock.read().unwrap(); + let mut processed_all_events = false; + while !processed_all_events { + if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; + } - let mut result = NotifyOption::SkipPersist; + let mut result = NotifyOption::SkipPersist; - // TODO: This behavior should be documented. It's unintuitive that we query - // ChannelMonitors when clearing other events. - if $self.process_pending_monitor_events() { - result = NotifyOption::DoPersist; - } + { + // We'll acquire our total consistency lock so that we can be sure no other + // persists happen while processing monitor events. + let _read_guard = $self.total_consistency_lock.read().unwrap(); + + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if $self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + } - let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]); - if !pending_events.is_empty() { - result = NotifyOption::DoPersist; - } + let pending_events = $self.pending_events.lock().unwrap().clone(); + let num_events = pending_events.len(); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } - for event in pending_events { - $event_to_handle = event; - $handle_event; - } + let mut post_event_actions = Vec::new(); + + for (event, action_opt) in pending_events { + $event_to_handle = event; + $handle_event; + if let Some(action) = action_opt { + post_event_actions.push(action); + } + } + + { + let mut pending_events = $self.pending_events.lock().unwrap(); + pending_events.drain(..num_events); + processed_all_events = pending_events.is_empty(); + $self.pending_events_processor.store(false, Ordering::Release); + } + + if !post_event_actions.is_empty() { + $self.handle_post_event_actions(post_event_actions); + // If we had some actions, go around again as we may have more events now + processed_all_events = false; + } - if result == NotifyOption::DoPersist { - $self.persistence_notifier.notify(); + if result == NotifyOption::DoPersist { + $self.persistence_notifier.notify(); + } } } } @@ -1753,7 +1824,7 @@ where pending_inbound_payments: Mutex::new(HashMap::new()), pending_outbound_payments: OutboundPayments::new(), forward_htlcs: Mutex::new(HashMap::new()), - claimable_payments: Mutex::new(ClaimablePayments { claimable_htlcs: HashMap::new(), pending_claiming_payments: HashMap::new() }), + claimable_payments: Mutex::new(ClaimablePayments { claimable_payments: HashMap::new(), pending_claiming_payments: HashMap::new() }), pending_intercepted_htlcs: Mutex::new(HashMap::new()), id_to_peer: Mutex::new(HashMap::new()), short_to_chan_info: FairRwLock::new(HashMap::new()), @@ -1770,7 +1841,8 @@ where per_peer_state: FairRwLock::new(HashMap::new()), - pending_events: Mutex::new(Vec::new()), + pending_events: Mutex::new(VecDeque::new()), + pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), persistence_notifier: Notifier::new(), @@ -1818,6 +1890,10 @@ where /// Raises [`APIError::APIMisuseError`] when `channel_value_satoshis` > 2**24 or `push_msat` is /// greater than `channel_value_satoshis * 1k` or `channel_value_satoshis < 1000`. /// + /// Raises [`APIError::ChannelUnavailable`] if the channel cannot be opened due to failing to + /// generate a shutdown scriptpubkey or destination script set by + /// [`SignerProvider::get_shutdown_scriptpubkey`] or [`SignerProvider::get_destination_script`]. + /// /// Note that we do not check if you are currently connected to the given peer. If no /// connection is available, the outbound `open_channel` message may fail to send, resulting in /// the channel eventually being silently forgotten (dropped on reload). @@ -1977,18 +2053,20 @@ where let mut pending_events_lock = self.pending_events.lock().unwrap(); match channel.unbroadcasted_funding() { Some(transaction) => { - pending_events_lock.push(events::Event::DiscardFunding { channel_id: channel.channel_id(), transaction }) + pending_events_lock.push_back((events::Event::DiscardFunding { + channel_id: channel.channel_id(), transaction + }, None)); }, None => {}, } - pending_events_lock.push(events::Event::ChannelClosed { + pending_events_lock.push_back((events::Event::ChannelClosed { channel_id: channel.channel_id(), user_channel_id: channel.get_user_id(), reason: closure_reason - }); + }, None)); } - fn close_channel_internal(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option) -> Result<(), APIError> { + fn close_channel_internal(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option, override_shutdown_script: Option) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>; @@ -2005,7 +2083,7 @@ where let funding_txo_opt = chan_entry.get().get_funding_txo(); let their_features = &peer_state.latest_features; let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut() - .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight)?; + .get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?; failed_htlcs = htlcs; // We can send the `shutdown` message before updating the `ChannelMonitor` @@ -2062,12 +2140,17 @@ where /// /// May generate a [`SendShutdown`] message event on success, which should be relayed. /// + /// Raises [`APIError::ChannelUnavailable`] if the channel cannot be closed due to failing to + /// generate a shutdown scriptpubkey or destination script set by + /// [`SignerProvider::get_shutdown_scriptpubkey`]. A force-closure may be needed to close the + /// channel. + /// /// [`ChannelConfig::force_close_avoidance_max_fee_satoshis`]: crate::util::config::ChannelConfig::force_close_avoidance_max_fee_satoshis /// [`Background`]: crate::chain::chaininterface::ConfirmationTarget::Background /// [`Normal`]: crate::chain::chaininterface::ConfirmationTarget::Normal /// [`SendShutdown`]: crate::events::MessageSendEvent::SendShutdown pub fn close_channel(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey) -> Result<(), APIError> { - self.close_channel_internal(channel_id, counterparty_node_id, None) + self.close_channel_internal(channel_id, counterparty_node_id, None, None) } /// Begins the process of closing a channel. After this call (plus some timeout), no new HTLCs @@ -2084,14 +2167,24 @@ where /// transaction feerate below `target_feerate_sat_per_1000_weight` (or the feerate which /// will appear on a force-closure transaction, whichever is lower). /// + /// The `shutdown_script` provided will be used as the `scriptPubKey` for the closing transaction. + /// Will fail if a shutdown script has already been set for this channel by + /// ['ChannelHandshakeConfig::commit_upfront_shutdown_pubkey`]. The given shutdown script must + /// also be compatible with our and the counterparty's features. + /// /// May generate a [`SendShutdown`] message event on success, which should be relayed. /// + /// Raises [`APIError::ChannelUnavailable`] if the channel cannot be closed due to failing to + /// generate a shutdown scriptpubkey or destination script set by + /// [`SignerProvider::get_shutdown_scriptpubkey`]. A force-closure may be needed to close the + /// channel. + /// /// [`ChannelConfig::force_close_avoidance_max_fee_satoshis`]: crate::util::config::ChannelConfig::force_close_avoidance_max_fee_satoshis /// [`Background`]: crate::chain::chaininterface::ConfirmationTarget::Background /// [`Normal`]: crate::chain::chaininterface::ConfirmationTarget::Normal /// [`SendShutdown`]: crate::events::MessageSendEvent::SendShutdown - pub fn close_channel_with_target_feerate(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: u32) -> Result<(), APIError> { - self.close_channel_internal(channel_id, counterparty_node_id, Some(target_feerate_sats_per_1000_weight)) + pub fn close_channel_with_feerate_and_script(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option, shutdown_script: Option) -> Result<(), APIError> { + self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script) } #[inline] @@ -2248,7 +2341,7 @@ where msg: "Got non final data with an HMAC of 0", }); }, - msgs::OnionHopDataFormat::FinalNode { payment_data, keysend_preimage } => { + msgs::OnionHopDataFormat::FinalNode { payment_data, keysend_preimage, payment_metadata } => { if payment_data.is_some() && keysend_preimage.is_some() { return Err(ReceiveError { err_code: 0x4000|22, @@ -2258,6 +2351,7 @@ where } else if let Some(data) = payment_data { PendingHTLCRouting::Receive { payment_data: data, + payment_metadata, incoming_cltv_expiry: hop_data.outgoing_cltv_value, phantom_shared_secret, } @@ -2278,6 +2372,7 @@ where PendingHTLCRouting::ReceiveKeysend { payment_preimage, + payment_metadata, incoming_cltv_expiry: hop_data.outgoing_cltv_value, } } else { @@ -2457,7 +2552,14 @@ where // hopefully an attacker trying to path-trace payments cannot make this occur // on a small/per-node/per-channel scale. if !chan.is_live() { // channel_disabled - break Some(("Forwarding channel is not in a ready state.", 0x1000 | 20, chan_update_opt)); + // If the channel_update we're going to return is disabled (i.e. the + // peer has been disabled for some time), return `channel_disabled`, + // otherwise return `temporary_channel_failure`. + if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { + break Some(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); + } else { + break Some(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); + } } if *outgoing_amt_msat < chan.get_counterparty_htlc_minimum_msat() { // amount_below_minimum break Some(("HTLC amount was below the htlc_minimum_msat", 0x1000 | 11, chan_update_opt)); @@ -2582,11 +2684,18 @@ where log_trace!(self.logger, "Generating channel update for channel {}", log_bytes!(chan.channel_id())); let were_node_one = self.our_network_pubkey.serialize()[..] < chan.get_counterparty_node_id().serialize()[..]; + let enabled = chan.is_usable() && match chan.channel_update_status() { + ChannelUpdateStatus::Enabled => true, + ChannelUpdateStatus::DisabledStaged(_) => true, + ChannelUpdateStatus::Disabled => false, + ChannelUpdateStatus::EnabledStaged(_) => false, + }; + let unsigned = msgs::UnsignedChannelUpdate { chain_hash: self.genesis_hash, short_channel_id, timestamp: chan.get_update_time_counter(), - flags: (!were_node_one) as u8 | ((!chan.is_live() as u8) << 1), + flags: (!were_node_one) as u8 | ((!enabled as u8) << 1), cltv_expiry_delta: chan.get_cltv_expiry_delta(), htlc_minimum_msat: chan.get_counterparty_htlc_minimum_msat(), htlc_maximum_msat: chan.get_announced_htlc_max_msat(), @@ -2607,29 +2716,28 @@ where } #[cfg(test)] - pub(crate) fn test_send_payment_along_path(&self, path: &Vec, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { + pub(crate) fn test_send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { let _lck = self.total_consistency_lock.read().unwrap(); self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv_bytes) } - fn send_payment_along_path(&self, path: &Vec, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { + fn send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { // The top-level caller should hold the total_consistency_lock read lock. debug_assert!(self.total_consistency_lock.try_write().is_err()); - log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id); + log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.hops.first().unwrap().short_channel_id); let prng_seed = self.entropy_source.get_secure_random_bytes(); let session_priv = SecretKey::from_slice(&session_priv_bytes[..]).expect("RNG is busted"); let onion_keys = onion_utils::construct_onion_keys(&self.secp_ctx, &path, &session_priv) .map_err(|_| APIError::InvalidRoute{err: "Pubkey along hop was maliciously selected".to_owned()})?; let (onion_payloads, htlc_msat, htlc_cltv) = onion_utils::build_onion_payloads(path, total_value, recipient_onion, cur_height, keysend_preimage)?; - if onion_utils::route_size_insane(&onion_payloads) { - return Err(APIError::InvalidRoute{err: "Route size too large considering onion data".to_owned()}); - } - let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash); + + let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash) + .map_err(|_| APIError::InvalidRoute { err: "Route size too large considering onion data".to_owned()})?; let err: Result<(), _> = loop { - let (counterparty_node_id, id) = match self.short_to_chan_info.read().unwrap().get(&path.first().unwrap().short_channel_id) { + let (counterparty_node_id, id) = match self.short_to_chan_info.read().unwrap().get(&path.hops.first().unwrap().short_channel_id) { None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}), Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), }; @@ -2680,7 +2788,7 @@ where return Ok(()); }; - match handle_error!(self, err, path.first().unwrap().pubkey) { + match handle_error!(self, err, path.hops.first().unwrap().pubkey) { Ok(_) => unreachable!(), Err(e) => { Err(APIError::ChannelUnavailable { err: e.err }) @@ -2777,6 +2885,11 @@ where self.pending_outbound_payments.test_add_new_pending_payment(payment_hash, recipient_onion, payment_id, route, None, &self.entropy_source, best_block_height) } + #[cfg(test)] + pub(crate) fn test_set_payment_metadata(&self, payment_id: PaymentId, new_payment_metadata: Option>) { + self.pending_outbound_payments.test_set_payment_metadata(payment_id, new_payment_metadata); + } + /// Signals that no further retries for the given payment should occur. Useful if you have a /// pending outbound payment with retries remaining, but wish to stop retrying the payment before @@ -2845,10 +2958,10 @@ where /// Send a payment that is probing the given route for liquidity. We calculate the /// [`PaymentHash`] of probes based on a static secret and a random [`PaymentId`], which allows /// us to easily discern them from real payments. - pub fn send_probe(&self, hops: Vec) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { + pub fn send_probe(&self, path: Path) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - self.pending_outbound_payments.send_probe(hops, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height, + self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height, |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) } @@ -2969,16 +3082,23 @@ where } { let height = self.best_block.read().unwrap().height(); - // Transactions are evaluated as final by network mempools at the next block. However, the modules - // constituting our Lightning node might not have perfect sync about their blockchain views. Thus, if - // the wallet module is in advance on the LDK view, allow one more block of headroom. - if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 2 { + // Transactions are evaluated as final by network mempools if their locktime is strictly + // lower than the next block height. However, the modules constituting our Lightning + // node might not have perfect sync about their blockchain views. Thus, if the wallet + // module is ahead of LDK, only allow one more block of headroom. + if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 1 { return Err(APIError::APIMisuseError { err: "Funding transaction absolute timelock is non-final".to_owned() }); } } self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| { + if tx.output.len() > u16::max_value() as usize { + return Err(APIError::APIMisuseError { + err: "Transaction had more than 2^16 outputs, which is not supported".to_owned() + }); + } + let mut output_index = None; let expected_spk = chan.get_funding_redeemscript().to_v0_p2wsh(); for (idx, outp) in tx.output.iter().enumerate() { @@ -2988,11 +3108,6 @@ where err: "Multiple outputs matched the expected script and value".to_owned() }); } - if idx > u16::max_value() as usize { - return Err(APIError::APIMisuseError { - err: "Transaction had more than 2^16 outputs, which is not supported".to_owned() - }); - } output_index = Some(idx as u16); } } @@ -3178,7 +3293,7 @@ where pub fn process_pending_htlc_forwards(&self) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let mut new_events = Vec::new(); + let mut new_events = VecDeque::new(); let mut failed_forwards = Vec::new(); let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new(); { @@ -3361,7 +3476,7 @@ where } } } else { - for forward_info in pending_forwards.drain(..) { + 'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) { match forward_info { HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id, @@ -3369,13 +3484,19 @@ where routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat, .. } }) => { - let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret) = match routing { - PendingHTLCRouting::Receive { payment_data, incoming_cltv_expiry, phantom_shared_secret } => { + let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret, mut onion_fields) = match routing { + PendingHTLCRouting::Receive { payment_data, payment_metadata, incoming_cltv_expiry, phantom_shared_secret } => { let _legacy_hop_data = Some(payment_data.clone()); - (incoming_cltv_expiry, OnionPayload::Invoice { _legacy_hop_data }, Some(payment_data), phantom_shared_secret) + let onion_fields = + RecipientOnionFields { payment_secret: Some(payment_data.payment_secret), payment_metadata }; + (incoming_cltv_expiry, OnionPayload::Invoice { _legacy_hop_data }, + Some(payment_data), phantom_shared_secret, onion_fields) + }, + PendingHTLCRouting::ReceiveKeysend { payment_preimage, payment_metadata, incoming_cltv_expiry } => { + let onion_fields = RecipientOnionFields { payment_secret: None, payment_metadata }; + (incoming_cltv_expiry, OnionPayload::Spontaneous(payment_preimage), + None, None, onion_fields) }, - PendingHTLCRouting::ReceiveKeysend { payment_preimage, incoming_cltv_expiry } => - (incoming_cltv_expiry, OnionPayload::Spontaneous(payment_preimage), None, None), _ => { panic!("short_channel_id == 0 should imply any pending_forward entries are of type Receive"); } @@ -3400,8 +3521,11 @@ where onion_payload, }; + let mut committed_to_claimable = false; + macro_rules! fail_htlc { ($htlc: expr, $payment_hash: expr) => { + debug_assert!(!committed_to_claimable); let mut htlc_msat_height_data = $htlc.value.to_be_bytes().to_vec(); htlc_msat_height_data.extend_from_slice( &self.best_block.read().unwrap().height().to_be_bytes(), @@ -3416,6 +3540,7 @@ where HTLCFailReason::reason(0x4000 | 15, htlc_msat_height_data), HTLCDestination::FailedPayment { payment_hash: $payment_hash }, )); + continue 'next_forwardable_htlc; } } let phantom_shared_secret = claimable_htlc.prev_hop.phantom_shared_secret; @@ -3437,15 +3562,28 @@ where let mut claimable_payments = self.claimable_payments.lock().unwrap(); if claimable_payments.pending_claiming_payments.contains_key(&payment_hash) { fail_htlc!(claimable_htlc, payment_hash); - continue } - let (_, ref mut htlcs) = claimable_payments.claimable_htlcs.entry(payment_hash) - .or_insert_with(|| (purpose(), Vec::new())); + let ref mut claimable_payment = claimable_payments.claimable_payments + .entry(payment_hash) + // Note that if we insert here we MUST NOT fail_htlc!() + .or_insert_with(|| { + committed_to_claimable = true; + ClaimablePayment { + purpose: purpose(), htlcs: Vec::new(), onion_fields: None, + } + }); + if let Some(earlier_fields) = &mut claimable_payment.onion_fields { + if earlier_fields.check_merge(&mut onion_fields).is_err() { + fail_htlc!(claimable_htlc, payment_hash); + } + } else { + claimable_payment.onion_fields = Some(onion_fields); + } + let ref mut htlcs = &mut claimable_payment.htlcs; if htlcs.len() == 1 { if let OnionPayload::Spontaneous(_) = htlcs[0].onion_payload { log_trace!(self.logger, "Failing new HTLC with payment_hash {} as we already had an existing keysend HTLC with the same payment hash", log_bytes!(payment_hash.0)); fail_htlc!(claimable_htlc, payment_hash); - continue } } let mut total_value = claimable_htlc.sender_intended_value; @@ -3474,11 +3612,14 @@ where log_bytes!(payment_hash.0)); fail_htlc!(claimable_htlc, payment_hash); } else if total_value >= $payment_data.total_msat { + #[allow(unused_assignments)] { + committed_to_claimable = true; + } let prev_channel_id = prev_funding_outpoint.to_channel_id(); htlcs.push(claimable_htlc); let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum(); htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat)); - new_events.push(events::Event::PaymentClaimable { + new_events.push_back((events::Event::PaymentClaimable { receiver_node_id: Some(receiver_node_id), payment_hash, purpose: purpose(), @@ -3486,13 +3627,17 @@ where via_channel_id: Some(prev_channel_id), via_user_channel_id: Some(prev_user_channel_id), claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER), - }); + onion_fields: claimable_payment.onion_fields.clone(), + }, None)); payment_claimable_generated = true; } else { // Nothing to do - we haven't reached the total // payment value yet, wait until we receive more // MPP parts. htlcs.push(claimable_htlc); + #[allow(unused_assignments)] { + committed_to_claimable = true; + } } payment_claimable_generated }} @@ -3515,7 +3660,6 @@ where Err(()) => { log_trace!(self.logger, "Failing new HTLC with payment_hash {} as payment verification failed", log_bytes!(payment_hash.0)); fail_htlc!(claimable_htlc, payment_hash); - continue } }; if let Some(min_final_cltv_expiry_delta) = min_final_cltv_expiry_delta { @@ -3524,7 +3668,6 @@ where log_trace!(self.logger, "Failing new HTLC with payment_hash {} as its CLTV expiry was too soon (had {}, earliest expected {})", log_bytes!(payment_hash.0), cltv_expiry, expected_min_expiry_height); fail_htlc!(claimable_htlc, payment_hash); - continue; } } check_total_value!(payment_data, payment_preimage); @@ -3533,17 +3676,20 @@ where let mut claimable_payments = self.claimable_payments.lock().unwrap(); if claimable_payments.pending_claiming_payments.contains_key(&payment_hash) { fail_htlc!(claimable_htlc, payment_hash); - continue } - match claimable_payments.claimable_htlcs.entry(payment_hash) { + match claimable_payments.claimable_payments.entry(payment_hash) { hash_map::Entry::Vacant(e) => { let amount_msat = claimable_htlc.value; claimable_htlc.total_value_received = Some(amount_msat); let claim_deadline = Some(claimable_htlc.cltv_expiry - HTLC_FAIL_BACK_BUFFER); let purpose = events::PaymentPurpose::SpontaneousPayment(preimage); - e.insert((purpose.clone(), vec![claimable_htlc])); + e.insert(ClaimablePayment { + purpose: purpose.clone(), + onion_fields: Some(onion_fields.clone()), + htlcs: vec![claimable_htlc], + }); let prev_channel_id = prev_funding_outpoint.to_channel_id(); - new_events.push(events::Event::PaymentClaimable { + new_events.push_back((events::Event::PaymentClaimable { receiver_node_id: Some(receiver_node_id), payment_hash, amount_msat, @@ -3551,7 +3697,8 @@ where via_channel_id: Some(prev_channel_id), via_user_channel_id: Some(prev_user_channel_id), claim_deadline, - }); + onion_fields: Some(onion_fields), + }, None)); }, hash_map::Entry::Occupied(_) => { log_trace!(self.logger, "Failing new keysend HTLC with payment_hash {} for a duplicative payment hash", log_bytes!(payment_hash.0)); @@ -3565,7 +3712,6 @@ where if payment_data.is_none() { log_trace!(self.logger, "Failing new keysend HTLC with payment_hash {} because we already have an inbound payment with the same payment hash", log_bytes!(payment_hash.0)); fail_htlc!(claimable_htlc, payment_hash); - continue }; let payment_data = payment_data.unwrap(); if inbound_payment.get().payment_secret != payment_data.payment_secret { @@ -3631,7 +3777,7 @@ where for event in background_events.drain(..) { match event { - BackgroundEvent::ClosingMonitorUpdate((funding_txo, update)) => { + BackgroundEvent::MonitorUpdateRegeneratedOnStartup((funding_txo, update)) => { // The channel has already been closed, so no use bothering to care about the // monitor updating completing. let _ = self.chain_monitor.update_channel(funding_txo, &update); @@ -3736,27 +3882,39 @@ where } match chan.channel_update_status() { - ChannelUpdateStatus::Enabled if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged), - ChannelUpdateStatus::Disabled if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged), - ChannelUpdateStatus::DisabledStaged if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Enabled), - ChannelUpdateStatus::EnabledStaged if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::Disabled), - ChannelUpdateStatus::DisabledStaged if !chan.is_live() => { - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); + ChannelUpdateStatus::Enabled if !chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged(0)), + ChannelUpdateStatus::Disabled if chan.is_live() => chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged(0)), + ChannelUpdateStatus::DisabledStaged(_) if chan.is_live() + => chan.set_channel_update_status(ChannelUpdateStatus::Enabled), + ChannelUpdateStatus::EnabledStaged(_) if !chan.is_live() + => chan.set_channel_update_status(ChannelUpdateStatus::Disabled), + ChannelUpdateStatus::DisabledStaged(mut n) if !chan.is_live() => { + n += 1; + if n >= DISABLE_GOSSIP_TICKS { + chan.set_channel_update_status(ChannelUpdateStatus::Disabled); + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + should_persist = NotifyOption::DoPersist; + } else { + chan.set_channel_update_status(ChannelUpdateStatus::DisabledStaged(n)); } - should_persist = NotifyOption::DoPersist; - chan.set_channel_update_status(ChannelUpdateStatus::Disabled); }, - ChannelUpdateStatus::EnabledStaged if chan.is_live() => { - if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); + ChannelUpdateStatus::EnabledStaged(mut n) if chan.is_live() => { + n += 1; + if n >= ENABLE_GOSSIP_TICKS { + chan.set_channel_update_status(ChannelUpdateStatus::Enabled); + if let Ok(update) = self.get_channel_update_for_broadcast(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update + }); + } + should_persist = NotifyOption::DoPersist; + } else { + chan.set_channel_update_status(ChannelUpdateStatus::EnabledStaged(n)); } - should_persist = NotifyOption::DoPersist; - chan.set_channel_update_status(ChannelUpdateStatus::Enabled); }, _ => {}, } @@ -3798,24 +3956,27 @@ where } } - self.claimable_payments.lock().unwrap().claimable_htlcs.retain(|payment_hash, (_, htlcs)| { - if htlcs.is_empty() { + self.claimable_payments.lock().unwrap().claimable_payments.retain(|payment_hash, payment| { + if payment.htlcs.is_empty() { // This should be unreachable debug_assert!(false); return false; } - if let OnionPayload::Invoice { .. } = htlcs[0].onion_payload { + if let OnionPayload::Invoice { .. } = payment.htlcs[0].onion_payload { // Check if we've received all the parts we need for an MPP (the value of the parts adds to total_msat). // In this case we're not going to handle any timeouts of the parts here. // This condition determining whether the MPP is complete here must match // exactly the condition used in `process_pending_htlc_forwards`. - if htlcs[0].total_msat <= htlcs.iter().fold(0, |total, htlc| total + htlc.sender_intended_value) { + if payment.htlcs[0].total_msat <= payment.htlcs.iter() + .fold(0, |total, htlc| total + htlc.sender_intended_value) + { return true; - } else if htlcs.into_iter().any(|htlc| { + } else if payment.htlcs.iter_mut().any(|htlc| { htlc.timer_ticks += 1; return htlc.timer_ticks >= MPP_TIMEOUT_TICKS }) { - timed_out_mpp_htlcs.extend(htlcs.drain(..).map(|htlc: ClaimableHTLC| (htlc.prev_hop, *payment_hash))); + timed_out_mpp_htlcs.extend(payment.htlcs.drain(..) + .map(|htlc: ClaimableHTLC| (htlc.prev_hop, *payment_hash))); return false; } } @@ -3870,9 +4031,9 @@ where pub fn fail_htlc_backwards_with_reason(&self, payment_hash: &PaymentHash, failure_code: FailureCode) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let removed_source = self.claimable_payments.lock().unwrap().claimable_htlcs.remove(payment_hash); - if let Some((_, mut sources)) = removed_source { - for htlc in sources.drain(..) { + let removed_source = self.claimable_payments.lock().unwrap().claimable_payments.remove(payment_hash); + if let Some(payment) = removed_source { + for htlc in payment.htlcs { let reason = self.get_htlc_fail_reason_from_failure_code(failure_code, &htlc); let source = HTLCSource::PreviousHopData(htlc.prev_hop); let receiver = HTLCDestination::FailedPayment { payment_hash: *payment_hash }; @@ -4015,10 +4176,10 @@ where mem::drop(forward_htlcs); if push_forward_ev { self.push_pending_forwards_ev(); } let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::HTLCHandlingFailed { + pending_events.push_back((events::Event::HTLCHandlingFailed { prev_channel_id: outpoint.to_channel_id(), failed_next_destination: destination, - }); + }, None)); }, } } @@ -4049,9 +4210,9 @@ where let mut sources = { let mut claimable_payments = self.claimable_payments.lock().unwrap(); - if let Some((payment_purpose, sources)) = claimable_payments.claimable_htlcs.remove(&payment_hash) { + if let Some(payment) = claimable_payments.claimable_payments.remove(&payment_hash) { let mut receiver_node_id = self.our_network_pubkey; - for htlc in sources.iter() { + for htlc in payment.htlcs.iter() { if htlc.prev_hop.phantom_shared_secret.is_some() { let phantom_pubkey = self.node_signer.get_node_id(Recipient::PhantomNode) .expect("Failed to get node_id for phantom node recipient"); @@ -4061,15 +4222,15 @@ where } let dup_purpose = claimable_payments.pending_claiming_payments.insert(payment_hash, - ClaimingPayment { amount_msat: sources.iter().map(|source| source.value).sum(), - payment_purpose, receiver_node_id, + ClaimingPayment { amount_msat: payment.htlcs.iter().map(|source| source.value).sum(), + payment_purpose: payment.purpose, receiver_node_id, }); if dup_purpose.is_some() { debug_assert!(false, "Shouldn't get a duplicate pending claim event ever"); log_error!(self.logger, "Got a duplicate pending claimable event on payment hash {}! Please report this bug", log_bytes!(payment_hash.0)); } - sources + payment.htlcs } else { return; } }; debug_assert!(!sources.is_empty()); @@ -4281,13 +4442,13 @@ where MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => { let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); if let Some(ClaimingPayment { amount_msat, payment_purpose: purpose, receiver_node_id }) = payment { - self.pending_events.lock().unwrap().push(events::Event::PaymentClaimed { + self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed { payment_hash, purpose, amount_msat, receiver_node_id: Some(receiver_node_id), - }); + }, None)); } }, MonitorUpdateCompletionAction::EmitEvent { event } => { - self.pending_events.lock().unwrap().push(event); + self.pending_events.lock().unwrap().push_back((event, None)); }, } } @@ -4611,15 +4772,13 @@ where }); } else { let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push( - events::Event::OpenChannelRequest { - temporary_channel_id: msg.temporary_channel_id.clone(), - counterparty_node_id: counterparty_node_id.clone(), - funding_satoshis: msg.funding_satoshis, - push_msat: msg.push_msat, - channel_type: channel.get_channel_type().clone(), - } - ); + pending_events.push_back((events::Event::OpenChannelRequest { + temporary_channel_id: msg.temporary_channel_id.clone(), + counterparty_node_id: counterparty_node_id.clone(), + funding_satoshis: msg.funding_satoshis, + push_msat: msg.push_msat, + channel_type: channel.get_channel_type().clone(), + }, None)); } entry.insert(channel); @@ -4647,13 +4806,13 @@ where } }; let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::FundingGenerationReady { + pending_events.push_back((events::Event::FundingGenerationReady { temporary_channel_id: msg.temporary_channel_id, counterparty_node_id: *counterparty_node_id, channel_value_satoshis: value, output_script, user_channel_id: user_id, - }); + }, None)); Ok(()) } @@ -5027,11 +5186,13 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().get_funding_txo(); - let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, - peer_state, per_peer_state, chan) + let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); + if let Some(monitor_update) = monitor_update_opt { + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + peer_state, per_peer_state, chan) + } else { Ok(()) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5041,7 +5202,7 @@ where fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) { for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards { let mut push_forward_event = false; - let mut new_intercept_events = Vec::new(); + let mut new_intercept_events = VecDeque::new(); let mut failed_intercept_forwards = Vec::new(); if !pending_forwards.is_empty() { for (forward_info, prev_htlc_id) in pending_forwards.drain(..) { @@ -5068,13 +5229,13 @@ where let mut pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap(); match pending_intercepts.entry(intercept_id) { hash_map::Entry::Vacant(entry) => { - new_intercept_events.push(events::Event::HTLCIntercepted { + new_intercept_events.push_back((events::Event::HTLCIntercepted { requested_next_hop_scid: scid, payment_hash: forward_info.payment_hash, inbound_amount_msat: forward_info.incoming_amt_msat.unwrap(), expected_outbound_amount_msat: forward_info.outgoing_amt_msat, intercept_id - }); + }, None)); entry.insert(PendingAddHTLCInfo { prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info }); }, @@ -5124,13 +5285,13 @@ where fn push_pending_forwards_ev(&self) { let mut pending_events = self.pending_events.lock().unwrap(); let forward_ev_exists = pending_events.iter() - .find(|ev| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }) + .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }) .is_some(); if !forward_ev_exists { - pending_events.push(events::Event::PendingHTLCsForwardable { + pending_events.push_back((events::Event::PendingHTLCsForwardable { time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS), - }); + }, None)); } } @@ -5146,11 +5307,13 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().get_funding_txo(); - let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - let res = handle_new_monitor_update!(self, update_res, update_id, - peer_state_lock, peer_state, per_peer_state, chan); + let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let res = if let Some(monitor_update) = monitor_update_opt { + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + handle_new_monitor_update!(self, update_res, update_id, + peer_state_lock, peer_state, per_peer_state, chan) + } else { Ok(()) }; (htlcs_to_fail, res) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5534,7 +5697,7 @@ where if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] { assert!(should_broadcast); } else { unreachable!(); } - self.pending_background_events.lock().unwrap().push(BackgroundEvent::ClosingMonitorUpdate((funding_txo, update))); + self.pending_background_events.lock().unwrap().push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup((funding_txo, update))); } self.finish_force_close_channel(failure); } @@ -5705,7 +5868,7 @@ where /// Gets a fake short channel id for use in receiving [phantom node payments]. These fake scids /// are used when constructing the phantom invoice's route hints. /// - /// [phantom node payments]: crate::chain::keysinterface::PhantomKeysManager + /// [phantom node payments]: crate::sign::PhantomKeysManager pub fn get_phantom_scid(&self) -> u64 { let best_block_height = self.best_block.read().unwrap().height(); let short_to_chan_info = self.short_to_chan_info.read().unwrap(); @@ -5721,7 +5884,7 @@ where /// Gets route hints for use in receiving [phantom node payments]. /// - /// [phantom node payments]: crate::chain::keysinterface::PhantomKeysManager + /// [phantom node payments]: crate::sign::PhantomKeysManager pub fn get_phantom_route_hints(&self) -> PhantomRouteHints { PhantomRouteHints { channels: self.list_usable_channels(), @@ -5779,13 +5942,13 @@ where #[cfg(feature = "_test_utils")] pub fn push_pending_event(&self, event: events::Event) { let mut events = self.pending_events.lock().unwrap(); - events.push(event); + events.push_back((event, None)); } #[cfg(test)] pub fn pop_pending_event(&self) -> Option { let mut events = self.pending_events.lock().unwrap(); - if events.is_empty() { None } else { Some(events.remove(0)) } + events.pop_front().map(|(e, _)| e) } #[cfg(test)] @@ -5798,6 +5961,72 @@ where self.pending_outbound_payments.clear_pending_payments() } + fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) { + let mut errors = Vec::new(); + loop { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lck = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lck; + if self.pending_events.lock().unwrap().iter() + .any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate { + channel_funding_outpoint, counterparty_node_id + })) + { + // Check that, while holding the peer lock, we don't have another event + // blocking any monitor updates for this channel. If we do, let those + // events be the ones that ultimately release the monitor update(s). + log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending", + log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); + break; + } + if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) { + debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint); + if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { + log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", + log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); + let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update); + let update_id = monitor_update.update_id; + if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, + peer_state_lck, peer_state, per_peer_state, chan) + { + errors.push((e, counterparty_node_id)); + } + if further_update_exists { + // If there are more `ChannelMonitorUpdate`s to process, restart at the + // top of the loop. + continue; + } + } else { + log_trace!(self.logger, "Unlocked monitor updating for channel {} without monitors to update", + log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); + } + } + } else { + log_debug!(self.logger, + "Got a release post-RAA monitor update for peer {} but the channel is gone", + log_pubkey!(counterparty_node_id)); + } + break; + } + for (err, counterparty_node_id) in errors { + let res = Err::<(), _>(err); + let _ = handle_error!(self, res, counterparty_node_id); + } + } + + fn handle_post_event_actions(&self, actions: Vec) { + for action in actions { + match action { + EventCompletionAction::ReleaseRAAChannelMonitorUpdate { + channel_funding_outpoint, counterparty_node_id + } => { + self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint); + } + } + } + } + /// Processes any events asynchronously in the order they were generated since the last call /// using the given event handler. /// @@ -6141,8 +6370,8 @@ where } if let Some(height) = height_opt { - self.claimable_payments.lock().unwrap().claimable_htlcs.retain(|payment_hash, (_, htlcs)| { - htlcs.retain(|htlc| { + self.claimable_payments.lock().unwrap().claimable_payments.retain(|payment_hash, payment| { + payment.htlcs.retain(|htlc| { // If height is approaching the number of blocks we think it takes us to get // our commitment transaction confirmed before the HTLC expires, plus the // number of blocks we generally consider it to take to do a commitment update, @@ -6157,7 +6386,7 @@ where false } else { true } }); - !htlcs.is_empty() // Only retain this entry if htlcs has at least one entry. + !payment.htlcs.is_empty() // Only retain this entry if htlcs has at least one entry. }); let mut intercepted_htlcs = self.pending_intercepted_htlcs.lock().unwrap(); @@ -6263,11 +6492,23 @@ where let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id); } + fn handle_open_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannelV2) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.temporary_channel_id.clone())), *counterparty_node_id); + } + fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); } + fn handle_accept_channel_v2(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.temporary_channel_id.clone())), *counterparty_node_id); + } + fn handle_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); let _ = handle_error!(self, self.internal_funding_created(counterparty_node_id, msg), *counterparty_node_id); @@ -6370,23 +6611,40 @@ where }); pending_msg_events.retain(|msg| { match msg { + // V1 Channel Establishment &events::MessageSendEvent::SendAcceptChannel { .. } => false, &events::MessageSendEvent::SendOpenChannel { .. } => false, &events::MessageSendEvent::SendFundingCreated { .. } => false, &events::MessageSendEvent::SendFundingSigned { .. } => false, + // V2 Channel Establishment + &events::MessageSendEvent::SendAcceptChannelV2 { .. } => false, + &events::MessageSendEvent::SendOpenChannelV2 { .. } => false, + // Common Channel Establishment &events::MessageSendEvent::SendChannelReady { .. } => false, &events::MessageSendEvent::SendAnnouncementSignatures { .. } => false, + // Interactive Transaction Construction + &events::MessageSendEvent::SendTxAddInput { .. } => false, + &events::MessageSendEvent::SendTxAddOutput { .. } => false, + &events::MessageSendEvent::SendTxRemoveInput { .. } => false, + &events::MessageSendEvent::SendTxRemoveOutput { .. } => false, + &events::MessageSendEvent::SendTxComplete { .. } => false, + &events::MessageSendEvent::SendTxSignatures { .. } => false, + &events::MessageSendEvent::SendTxInitRbf { .. } => false, + &events::MessageSendEvent::SendTxAckRbf { .. } => false, + &events::MessageSendEvent::SendTxAbort { .. } => false, + // Channel Operations &events::MessageSendEvent::UpdateHTLCs { .. } => false, &events::MessageSendEvent::SendRevokeAndACK { .. } => false, &events::MessageSendEvent::SendClosingSigned { .. } => false, &events::MessageSendEvent::SendShutdown { .. } => false, &events::MessageSendEvent::SendChannelReestablish { .. } => false, + &events::MessageSendEvent::HandleError { .. } => false, + // Gossip &events::MessageSendEvent::SendChannelAnnouncement { .. } => false, &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true, &events::MessageSendEvent::SendChannelUpdate { .. } => false, - &events::MessageSendEvent::HandleError { .. } => false, &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, &events::MessageSendEvent::SendShortIdsQuery { .. } => false, &events::MessageSendEvent::SendReplyChannelRange { .. } => false, @@ -6543,6 +6801,60 @@ where fn provided_init_features(&self, _their_init_features: &PublicKey) -> InitFeatures { provided_init_features(&self.default_configuration) } + + fn handle_tx_add_input(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxAddInput) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_add_output(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxAddOutput) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_remove_input(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxRemoveInput) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_remove_output(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxRemoveOutput) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_complete(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxComplete) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxSignatures) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_init_rbf(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxInitRbf) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_ack_rbf(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxAckRbf) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } + + fn handle_tx_abort(&self, counterparty_node_id: &PublicKey, msg: &msgs::TxAbort) { + let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( + "Dual-funded channels not supported".to_owned(), + msg.channel_id.clone())), *counterparty_node_id); + } } /// Fetches the set of [`NodeFeatures`] flags which are provided by or required by @@ -6580,7 +6892,7 @@ pub fn provided_init_features(_config: &UserConfig) -> InitFeatures { // should also add the corresponding (optional) bit to the [`ChannelMessageHandler`] impl for // [`ErroringMessageHandler`]. let mut features = InitFeatures::empty(); - features.set_data_loss_protect_optional(); + features.set_data_loss_protect_required(); features.set_upfront_shutdown_script_optional(); features.set_variable_length_onion_required(); features.set_static_remote_key_required(); @@ -6742,10 +7054,12 @@ impl_writeable_tlv_based_enum!(PendingHTLCRouting, (0, payment_data, required), (1, phantom_shared_secret, option), (2, incoming_cltv_expiry, required), + (3, payment_metadata, option), }, (2, ReceiveKeysend) => { (0, payment_preimage, required), (2, incoming_cltv_expiry, required), + (3, payment_metadata, option), }, ;); @@ -6919,28 +7233,32 @@ impl Readable for HTLCSource { 0 => { let mut session_priv: crate::util::ser::RequiredWrapper = crate::util::ser::RequiredWrapper(None); let mut first_hop_htlc_msat: u64 = 0; - let mut path: Option> = Some(Vec::new()); + let mut path_hops: Option> = Some(Vec::new()); let mut payment_id = None; let mut payment_params: Option = None; + let mut blinded_tail: Option = None; read_tlv_fields!(reader, { (0, session_priv, required), (1, payment_id, option), (2, first_hop_htlc_msat, required), - (4, path, vec_type), + (4, path_hops, vec_type), (5, payment_params, (option: ReadableArgs, 0)), + (6, blinded_tail, option), }); if payment_id.is_none() { // For backwards compat, if there was no payment_id written, use the session_priv bytes // instead. payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref())); } - if path.is_none() || path.as_ref().unwrap().is_empty() { + let path = Path { hops: path_hops.ok_or(DecodeError::InvalidValue)?, blinded_tail }; + if path.hops.len() == 0 { return Err(DecodeError::InvalidValue); } - let path = path.unwrap(); if let Some(params) = payment_params.as_mut() { - if params.final_cltv_expiry_delta == 0 { - params.final_cltv_expiry_delta = path.last().unwrap().cltv_expiry_delta; + if let Payee::Clear { ref mut final_cltv_expiry_delta, .. } = params.payee { + if final_cltv_expiry_delta == &0 { + *final_cltv_expiry_delta = path.final_cltv_expiry_delta().ok_or(DecodeError::InvalidValue)?; + } } } Ok(HTLCSource::OutboundRoute { @@ -6967,8 +7285,9 @@ impl Writeable for HTLCSource { (1, payment_id_opt, option), (2, first_hop_htlc_msat, required), // 3 was previously used to write a PaymentSecret for the payment. - (4, *path, vec_type), + (4, path.hops, vec_type), (5, None::, option), // payment_params in LDK versions prior to 0.0.115 + (6, path.blinded_tail, option), }); } HTLCSource::PreviousHopData(ref field) => { @@ -7078,14 +7397,16 @@ where let pending_outbound_payments = self.pending_outbound_payments.pending_outbound_payments.lock().unwrap(); let mut htlc_purposes: Vec<&events::PaymentPurpose> = Vec::new(); - (claimable_payments.claimable_htlcs.len() as u64).write(writer)?; - for (payment_hash, (purpose, previous_hops)) in claimable_payments.claimable_htlcs.iter() { + let mut htlc_onion_fields: Vec<&_> = Vec::new(); + (claimable_payments.claimable_payments.len() as u64).write(writer)?; + for (payment_hash, payment) in claimable_payments.claimable_payments.iter() { payment_hash.write(writer)?; - (previous_hops.len() as u64).write(writer)?; - for htlc in previous_hops.iter() { + (payment.htlcs.len() as u64).write(writer)?; + for htlc in payment.htlcs.iter() { htlc.write(writer)?; } - htlc_purposes.push(purpose); + htlc_purposes.push(&payment.purpose); + htlc_onion_fields.push(&payment.onion_fields); } let mut monitor_update_blocked_actions_per_peer = None; @@ -7115,23 +7436,28 @@ where } let events = self.pending_events.lock().unwrap(); - (events.len() as u64).write(writer)?; - for event in events.iter() { - event.write(writer)?; - } - - let background_events = self.pending_background_events.lock().unwrap(); - (background_events.len() as u64).write(writer)?; - for event in background_events.iter() { - match event { - BackgroundEvent::ClosingMonitorUpdate((funding_txo, monitor_update)) => { - 0u8.write(writer)?; - funding_txo.write(writer)?; - monitor_update.write(writer)?; - }, + // LDK versions prior to 0.0.115 don't support post-event actions, thus if there's no + // actions at all, skip writing the required TLV. Otherwise, pre-0.0.115 versions will + // refuse to read the new ChannelManager. + let events_not_backwards_compatible = events.iter().any(|(_, action)| action.is_some()); + if events_not_backwards_compatible { + // If we're gonna write a even TLV that will overwrite our events anyway we might as + // well save the space and not write any events here. + 0u64.write(writer)?; + } else { + (events.len() as u64).write(writer)?; + for (event, _) in events.iter() { + event.write(writer)?; } } + // LDK versions prior to 0.0.116 wrote the `pending_background_events` + // `MonitorUpdateRegeneratedOnStartup`s here, however there was never a reason to do so - + // the closing monitor updates were always effectively replayed on startup (either directly + // by calling `broadcast_latest_holder_commitment_txn` on a `ChannelMonitor` during + // deserialization or, in 0.0.115, by regenerating the monitor update itself). + 0u64.write(writer)?; + // Prior to 0.0.111 we tracked node_announcement serials here, however that now happens in // `PeerManager`, and thus we simply write the `highest_seen_timestamp` twice, which is // likely to be identical. @@ -7198,14 +7524,57 @@ where (5, self.our_network_pubkey, required), (6, monitor_update_blocked_actions_per_peer, option), (7, self.fake_scid_rand_bytes, required), + (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option), (9, htlc_purposes, vec_type), (11, self.probing_cookie_secret, required), + (13, htlc_onion_fields, optional_vec), }); Ok(()) } } +impl Writeable for VecDeque<(Event, Option)> { + fn write(&self, w: &mut W) -> Result<(), io::Error> { + (self.len() as u64).write(w)?; + for (event, action) in self.iter() { + event.write(w)?; + action.write(w)?; + #[cfg(debug_assertions)] { + // Events are MaybeReadable, in some cases indicating that they shouldn't actually + // be persisted and are regenerated on restart. However, if such an event has a + // post-event-handling action we'll write nothing for the event and would have to + // either forget the action or fail on deserialization (which we do below). Thus, + // check that the event is sane here. + let event_encoded = event.encode(); + let event_read: Option = + MaybeReadable::read(&mut &event_encoded[..]).unwrap(); + if action.is_some() { assert!(event_read.is_some()); } + } + } + Ok(()) + } +} +impl Readable for VecDeque<(Event, Option)> { + fn read(reader: &mut R) -> Result { + let len: u64 = Readable::read(reader)?; + const MAX_ALLOC_SIZE: u64 = 1024 * 16; + let mut events: Self = VecDeque::with_capacity(cmp::min( + MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option)>() as u64, + len) as usize); + for _ in 0..len { + let ev_opt = MaybeReadable::read(reader)?; + let action = Readable::read(reader)?; + if let Some(ev) = ev_opt { + events.push_back((ev, action)); + } else if action.is_some() { + return Err(DecodeError::InvalidValue); + } + } + Ok(events) + } +} + /// Arguments for the creation of a ChannelManager that are not deserialized. /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation @@ -7372,7 +7741,7 @@ where let mut peer_channels: HashMap::Signer>>> = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); - let mut channel_closures = Vec::new(); + let mut channel_closures = VecDeque::new(); let mut pending_background_events = Vec::new(); for _ in 0..channel_count { let mut channel: Channel<::Signer> = Channel::read(reader, ( @@ -7381,14 +7750,11 @@ where let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { - if channel.get_cur_holder_commitment_transaction_number() < monitor.get_cur_holder_commitment_number() || - channel.get_revoked_counterparty_commitment_transaction_number() < monitor.get_min_seen_secret() || - channel.get_cur_counterparty_commitment_transaction_number() < monitor.get_cur_counterparty_commitment_number() || - channel.get_latest_monitor_update_id() > monitor.get_latest_update_id() { + if channel.get_latest_complete_monitor_update_id() > monitor.get_latest_update_id() { // If the channel is ahead of the monitor, return InvalidValue: log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", - log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id()); + log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_complete_monitor_update_id()); log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); @@ -7405,14 +7771,14 @@ where log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id()); let (monitor_update, mut new_failed_htlcs) = channel.force_shutdown(true); if let Some(monitor_update) = monitor_update { - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate(monitor_update)); + pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup(monitor_update)); } failed_htlcs.append(&mut new_failed_htlcs); - channel_closures.push(events::Event::ChannelClosed { + channel_closures.push_back((events::Event::ChannelClosed { channel_id: channel.channel_id(), user_channel_id: channel.get_user_id(), reason: ClosureReason::OutdatedChannelManager - }); + }, None)); for (channel_htlc_source, payment_hash) in channel.inflight_htlc_sources() { let mut found_htlc = false; for (monitor_htlc_source, _) in monitor.get_all_current_outbound_htlcs() { @@ -7457,11 +7823,11 @@ where // was in-progress, we never broadcasted the funding transaction and can still // safely discard the channel. let _ = channel.force_shutdown(false); - channel_closures.push(events::Event::ChannelClosed { + channel_closures.push_back((events::Event::ChannelClosed { channel_id: channel.channel_id(), user_channel_id: channel.get_user_id(), reason: ClosureReason::DisconnectedPeer, - }); + }, None)); } else { log_error!(args.logger, "Missing ChannelMonitor for channel {} needed by ChannelManager.", log_bytes!(channel.channel_id())); log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); @@ -7474,11 +7840,13 @@ where for (funding_txo, _) in args.channel_monitors.iter() { if !funding_txo_set.contains(funding_txo) { + log_info!(args.logger, "Queueing monitor update to ensure missing channel {} is force closed", + log_bytes!(funding_txo.to_channel_id())); let monitor_update = ChannelMonitorUpdate { update_id: CLOSED_CHANNEL_UPDATE_ID, updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }], }; - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate((*funding_txo, monitor_update))); + pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); } } @@ -7522,10 +7890,11 @@ where } let event_count: u64 = Readable::read(reader)?; - let mut pending_events_read: Vec = Vec::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::())); + let mut pending_events_read: VecDeque<(events::Event, Option)> = + VecDeque::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option)>())); for _ in 0..event_count { match MaybeReadable::read(reader)? { - Some(event) => pending_events_read.push(event), + Some(event) => pending_events_read.push_back((event, None)), None => continue, } } @@ -7534,13 +7903,11 @@ where for _ in 0..background_event_count { match ::read(reader)? { 0 => { - let (funding_txo, monitor_update): (OutPoint, ChannelMonitorUpdate) = (Readable::read(reader)?, Readable::read(reader)?); - if pending_background_events.iter().find(|e| { - let BackgroundEvent::ClosingMonitorUpdate((pending_funding_txo, pending_monitor_update)) = e; - *pending_funding_txo == funding_txo && *pending_monitor_update == monitor_update - }).is_none() { - pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate((funding_txo, monitor_update))); - } + // LDK versions prior to 0.0.116 wrote pending `MonitorUpdateRegeneratedOnStartup`s here, + // however we really don't (and never did) need them - we regenerate all + // on-startup monitor updates. + let _: OutPoint = Readable::read(reader)?; + let _: ChannelMonitorUpdate = Readable::read(reader)?; } _ => return Err(DecodeError::InvalidValue), } @@ -7578,8 +7945,10 @@ where let mut fake_scid_rand_bytes: Option<[u8; 32]> = None; let mut probing_cookie_secret: Option<[u8; 32]> = None; let mut claimable_htlc_purposes = None; + let mut claimable_htlc_onion_fields = None; let mut pending_claiming_payments = Some(HashMap::new()); let mut monitor_update_blocked_actions_per_peer = Some(Vec::new()); + let mut events_override = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -7588,8 +7957,10 @@ where (5, received_network_pubkey, option), (6, monitor_update_blocked_actions_per_peer, option), (7, fake_scid_rand_bytes, option), + (8, events_override, option), (9, claimable_htlc_purposes, vec_type), (11, probing_cookie_secret, option), + (13, claimable_htlc_onion_fields, optional_vec), }); if fake_scid_rand_bytes.is_none() { fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes()); @@ -7599,6 +7970,10 @@ where probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes()); } + if let Some(events) = events_override { + pending_events_read = events; + } + if !channel_closures.is_empty() { pending_events_read.append(&mut channel_closures); } @@ -7630,12 +8005,12 @@ where if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() { for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() { if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source { - if path.is_empty() { + if path.hops.is_empty() { log_error!(args.logger, "Got an empty path for a pending payment"); return Err(DecodeError::InvalidValue); } - let path_amt = path.last().unwrap().fee_msat; + let path_amt = path.final_value_msat(); let mut session_priv_bytes = [0; 32]; session_priv_bytes[..].copy_from_slice(&session_priv[..]); match pending_outbounds.pending_outbound_payments.lock().unwrap().entry(payment_id) { @@ -7645,7 +8020,7 @@ where if newly_added { "Added" } else { "Had" }, path_amt, log_bytes!(session_priv_bytes), log_bytes!(htlc.payment_hash.0)); }, hash_map::Entry::Vacant(entry) => { - let path_fee = path.get_path_fees(); + let path_fee = path.fee_msat(); entry.insert(PendingOutboundPayment::Retryable { retry_strategy: None, attempts: PaymentAttempts::new(), @@ -7653,6 +8028,7 @@ where session_privs: [session_priv_bytes].iter().map(|a| *a).collect(), payment_hash: htlc.payment_hash, payment_secret: None, // only used for retries, and we'll never retry on startup + payment_metadata: None, // only used for retries, and we'll never retry on startup keysend_preimage: None, // only used for retries, and we'll never retry on startup pending_amt_msat: path_amt, pending_fee_msat: Some(path_fee), @@ -7693,7 +8069,7 @@ where if pending_forward_matches_htlc(&htlc_info) { log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}", log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); - pending_events_read.retain(|event| { + pending_events_read.retain(|(event, _)| { if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event { intercepted_id != ev_id } else { true } @@ -7729,30 +8105,47 @@ where // shut down before the timer hit. Either way, set the time_forwardable to a small // constant as enough time has likely passed that we should simply handle the forwards // now, or at least after the user gets a chance to reconnect to our peers. - pending_events_read.push(events::Event::PendingHTLCsForwardable { + pending_events_read.push_back((events::Event::PendingHTLCsForwardable { time_forwardable: Duration::from_secs(2), - }); + }, None)); } let inbound_pmt_key_material = args.node_signer.get_inbound_payment_key_material(); let expanded_inbound_key = inbound_payment::ExpandedKey::new(&inbound_pmt_key_material); - let mut claimable_htlcs = HashMap::with_capacity(claimable_htlcs_list.len()); - if let Some(mut purposes) = claimable_htlc_purposes { + let mut claimable_payments = HashMap::with_capacity(claimable_htlcs_list.len()); + if let Some(purposes) = claimable_htlc_purposes { if purposes.len() != claimable_htlcs_list.len() { return Err(DecodeError::InvalidValue); } - for (purpose, (payment_hash, previous_hops)) in purposes.drain(..).zip(claimable_htlcs_list.drain(..)) { - claimable_htlcs.insert(payment_hash, (purpose, previous_hops)); + if let Some(onion_fields) = claimable_htlc_onion_fields { + if onion_fields.len() != claimable_htlcs_list.len() { + return Err(DecodeError::InvalidValue); + } + for (purpose, (onion, (payment_hash, htlcs))) in + purposes.into_iter().zip(onion_fields.into_iter().zip(claimable_htlcs_list.into_iter())) + { + let existing_payment = claimable_payments.insert(payment_hash, ClaimablePayment { + purpose, htlcs, onion_fields: onion, + }); + if existing_payment.is_some() { return Err(DecodeError::InvalidValue); } + } + } else { + for (purpose, (payment_hash, htlcs)) in purposes.into_iter().zip(claimable_htlcs_list.into_iter()) { + let existing_payment = claimable_payments.insert(payment_hash, ClaimablePayment { + purpose, htlcs, onion_fields: None, + }); + if existing_payment.is_some() { return Err(DecodeError::InvalidValue); } + } } } else { // LDK versions prior to 0.0.107 did not write a `pending_htlc_purposes`, but do // include a `_legacy_hop_data` in the `OnionPayload`. - for (payment_hash, previous_hops) in claimable_htlcs_list.drain(..) { - if previous_hops.is_empty() { + for (payment_hash, htlcs) in claimable_htlcs_list.drain(..) { + if htlcs.is_empty() { return Err(DecodeError::InvalidValue); } - let purpose = match &previous_hops[0].onion_payload { + let purpose = match &htlcs[0].onion_payload { OnionPayload::Invoice { _legacy_hop_data } => { if let Some(hop_data) = _legacy_hop_data { events::PaymentPurpose::InvoicePayment { @@ -7773,7 +8166,9 @@ where OnionPayload::Spontaneous(payment_preimage) => events::PaymentPurpose::SpontaneousPayment(*payment_preimage), }; - claimable_htlcs.insert(payment_hash, (purpose, previous_hops)); + claimable_payments.insert(payment_hash, ClaimablePayment { + purpose, htlcs, onion_fields: None, + }); } } @@ -7825,17 +8220,17 @@ where for (_, monitor) in args.channel_monitors.iter() { for (payment_hash, payment_preimage) in monitor.get_stored_preimages() { - if let Some((payment_purpose, claimable_htlcs)) = claimable_htlcs.remove(&payment_hash) { + if let Some(payment) = claimable_payments.remove(&payment_hash) { log_info!(args.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", log_bytes!(payment_hash.0)); let mut claimable_amt_msat = 0; let mut receiver_node_id = Some(our_network_pubkey); - let phantom_shared_secret = claimable_htlcs[0].prev_hop.phantom_shared_secret; + let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret; if phantom_shared_secret.is_some() { let phantom_pubkey = args.node_signer.get_node_id(Recipient::PhantomNode) .expect("Failed to get node_id for phantom node recipient"); receiver_node_id = Some(phantom_pubkey) } - for claimable_htlc in claimable_htlcs { + for claimable_htlc in payment.htlcs { claimable_amt_msat += claimable_htlc.value; // Add a holding-cell claim of the payment to the Channel, which should be @@ -7866,12 +8261,12 @@ where previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &args.tx_broadcaster, &bounded_fee_estimator, &args.logger); } } - pending_events_read.push(events::Event::PaymentClaimed { + pending_events_read.push_back((events::Event::PaymentClaimed { receiver_node_id, payment_hash, - purpose: payment_purpose, + purpose: payment.purpose, amount_msat: claimable_amt_msat, - }); + }, None)); } } } @@ -7900,7 +8295,7 @@ where pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()), forward_htlcs: Mutex::new(forward_htlcs), - claimable_payments: Mutex::new(ClaimablePayments { claimable_htlcs, pending_claiming_payments: pending_claiming_payments.unwrap() }), + claimable_payments: Mutex::new(ClaimablePayments { claimable_payments, pending_claiming_payments: pending_claiming_payments.unwrap() }), outbound_scid_aliases: Mutex::new(outbound_scid_aliases), id_to_peer: Mutex::new(id_to_peer), short_to_chan_info: FairRwLock::new(short_to_chan_info), @@ -7916,6 +8311,7 @@ where per_peer_state: FairRwLock::new(per_peer_state), pending_events: Mutex::new(pending_events_read), + pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), persistence_notifier: Notifier::new(), @@ -7947,8 +8343,6 @@ mod tests { use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; - #[cfg(feature = "std")] - use core::time::Duration; use core::sync::atomic::Ordering; use crate::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; @@ -7960,7 +8354,7 @@ mod tests { use crate::util::errors::APIError; use crate::util::test_utils; use crate::util::config::ChannelConfig; - use crate::chain::keysinterface::EntropySource; + use crate::sign::EntropySource; #[test] fn test_notify_limits() { @@ -8191,7 +8585,7 @@ mod tests { }; let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &random_seed_bytes + None, nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -8225,7 +8619,7 @@ mod tests { let payment_preimage = PaymentPreimage([42; 32]); let route = find_route( &nodes[0].node.get_our_node_id(), &route_params, &nodes[0].network_graph, - None, nodes[0].logger, &scorer, &random_seed_bytes + None, nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); let payment_hash = nodes[0].node.send_spontaneous_payment(&route, Some(payment_preimage), RecipientOnionFields::spontaneous_empty(), PaymentId(payment_preimage.0)).unwrap(); @@ -8288,7 +8682,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &random_seed_bytes + nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]); @@ -8332,7 +8726,7 @@ mod tests { let random_seed_bytes = chanmon_cfgs[1].keys_manager.get_secure_random_bytes(); let route = find_route( &payer_pubkey, &route_params, &network_graph, Some(&first_hops.iter().collect::>()), - nodes[0].logger, &scorer, &random_seed_bytes + nodes[0].logger, &scorer, &(), &random_seed_bytes ).unwrap(); let test_preimage = PaymentPreimage([42; 32]); @@ -8372,12 +8766,12 @@ mod tests { let (mut route, payment_hash, _, _) = get_route_and_payment_hash!(&nodes[0], nodes[3], 100000); let path = route.paths[0].clone(); route.paths.push(path); - route.paths[0][0].pubkey = nodes[1].node.get_our_node_id(); - route.paths[0][0].short_channel_id = chan_1_id; - route.paths[0][1].short_channel_id = chan_3_id; - route.paths[1][0].pubkey = nodes[2].node.get_our_node_id(); - route.paths[1][0].short_channel_id = chan_2_id; - route.paths[1][1].short_channel_id = chan_4_id; + route.paths[0].hops[0].pubkey = nodes[1].node.get_our_node_id(); + route.paths[0].hops[0].short_channel_id = chan_1_id; + route.paths[0].hops[1].short_channel_id = chan_3_id; + route.paths[1].hops[0].pubkey = nodes[2].node.get_our_node_id(); + route.paths[1].hops[0].short_channel_id = chan_2_id; + route.paths[1].hops[1].short_channel_id = chan_4_id; match nodes[0].node.send_payment_with_route(&route, payment_hash, RecipientOnionFields::spontaneous_empty(), PaymentId(payment_hash.0)) @@ -8876,7 +9270,7 @@ mod tests { pub mod bench { use crate::chain::Listen; use crate::chain::chainmonitor::{ChainMonitor, Persist}; - use crate::chain::keysinterface::{KeysManager, InMemorySigner}; + use crate::sign::{KeysManager, InMemorySigner}; use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use crate::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentId, RecipientOnionFields, Retry}; use crate::ln::functional_test_utils::*; @@ -8925,7 +9319,7 @@ pub mod bench { // calls per node. let network = bitcoin::Network::Testnet; - let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}; + let tx_broadcaster = test_utils::TestBroadcaster::new(network); let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; let logger_a = test_utils::TestLogger::with_id("node a".to_owned()); let scorer = Mutex::new(test_utils::TestScorer::new()); @@ -8989,10 +9383,7 @@ pub mod bench { assert_eq!(&tx_broadcaster.txn_broadcasted.lock().unwrap()[..], &[tx.clone()]); - let block = Block { - header: BlockHeader { version: 0x20000000, prev_blockhash: BestBlock::from_network(network).block_hash(), merkle_root: TxMerkleNode::all_zeros(), time: 42, bits: 42, nonce: 42 }, - txdata: vec![tx], - }; + let block = create_dummy_block(BestBlock::from_network(network).block_hash(), 42, vec![tx]); Listen::block_connected(&node_a, &block, 1); Listen::block_connected(&node_b, &block, 1); @@ -9033,7 +9424,7 @@ pub mod bench { macro_rules! send_payment { ($node_a: expr, $node_b: expr) => { let payment_params = PaymentParameters::from_node_id($node_b.get_our_node_id(), TEST_FINAL_CLTV) - .with_features($node_b.invoice_features()); + .with_bolt11_features($node_b.invoice_features()).unwrap(); let mut payment_preimage = PaymentPreimage([0; 32]); payment_preimage.0[0..8].copy_from_slice(&payment_count.to_le_bytes()); payment_count += 1;