X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=498813de7a11900e2dbdcffa9adf8de9fd037d6c;hb=20cd856aa5d2834a557663ea67ee0139ad522586;hp=3fa8fa5ba947080d52cb888f505f40b568adb2a1;hpb=b8ed4d2608e32128dd5a1dee92911638a4301138;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 3fa8fa5b..498813de 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -45,7 +45,7 @@ use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, No #[cfg(any(feature = "_test_utils", test))] use crate::ln::features::InvoiceFeatures; use crate::routing::gossip::NetworkGraph; -use crate::routing::router::{DefaultRouter, InFlightHtlcs, PaymentParameters, Route, RouteHop, RouteParameters, RoutePath, Router}; +use crate::routing::router::{BlindedTail, DefaultRouter, InFlightHtlcs, Path, PaymentParameters, Route, RouteHop, RouteParameters, Router}; use crate::routing::scoring::ProbabilisticScorer; use crate::ln::msgs; use crate::ln::onion_utils; @@ -72,7 +72,7 @@ use core::{cmp, mem}; use core::cell::RefCell; use crate::io::Read; use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState}; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; use core::time::Duration; use core::ops::Deref; @@ -282,7 +282,7 @@ impl_writeable_tlv_based_enum!(SentHTLCId, pub(crate) enum HTLCSource { PreviousHopData(HTLCPreviousHopData), OutboundRoute { - path: Vec, + path: Path, session_priv: SecretKey, /// Technically we can recalculate this from the route, but we cache it here to avoid /// doing a double-pass on route when we get a failure back @@ -313,7 +313,7 @@ impl HTLCSource { #[cfg(test)] pub fn dummy() -> Self { HTLCSource::OutboundRoute { - path: Vec::new(), + path: Path { hops: Vec::new(), blinded_tail: None }, session_priv: SecretKey::from_slice(&[1; 32]).unwrap(), first_hop_htlc_msat: 0, payment_id: PaymentId([2; 32]), @@ -934,6 +934,8 @@ where /// See `ChannelManager` struct-level documentation for lock order requirements. pending_events: Mutex>, + /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously. + pending_events_processor: AtomicBool, /// See `ChannelManager` struct-level documentation for lock order requirements. pending_background_events: Mutex>, /// Used when we have to take a BIG lock to make sure everything is self-consistent. @@ -1696,30 +1698,47 @@ macro_rules! handle_new_monitor_update { macro_rules! process_events_body { ($self: expr, $event_to_handle: expr, $handle_event: expr) => { - // We'll acquire our total consistency lock until the returned future completes so that - // we can be sure no other persists happen while processing events. - let _read_guard = $self.total_consistency_lock.read().unwrap(); + let mut processed_all_events = false; + while !processed_all_events { + if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; + } - let mut result = NotifyOption::SkipPersist; + let mut result = NotifyOption::SkipPersist; - // TODO: This behavior should be documented. It's unintuitive that we query - // ChannelMonitors when clearing other events. - if $self.process_pending_monitor_events() { - result = NotifyOption::DoPersist; - } + { + // We'll acquire our total consistency lock so that we can be sure no other + // persists happen while processing monitor events. + let _read_guard = $self.total_consistency_lock.read().unwrap(); + + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if $self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + } - let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]); - if !pending_events.is_empty() { - result = NotifyOption::DoPersist; - } + let pending_events = $self.pending_events.lock().unwrap().clone(); + let num_events = pending_events.len(); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } - for event in pending_events { - $event_to_handle = event; - $handle_event; - } + for event in pending_events { + $event_to_handle = event; + $handle_event; + } - if result == NotifyOption::DoPersist { - $self.persistence_notifier.notify(); + { + let mut pending_events = $self.pending_events.lock().unwrap(); + pending_events.drain(..num_events); + processed_all_events = pending_events.is_empty(); + $self.pending_events_processor.store(false, Ordering::Release); + } + + if result == NotifyOption::DoPersist { + $self.persistence_notifier.notify(); + } } } } @@ -1787,6 +1806,7 @@ where per_peer_state: FairRwLock::new(HashMap::new()), pending_events: Mutex::new(Vec::new()), + pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), persistence_notifier: Notifier::new(), @@ -2639,16 +2659,16 @@ where } #[cfg(test)] - pub(crate) fn test_send_payment_along_path(&self, path: &Vec, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { + pub(crate) fn test_send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { let _lck = self.total_consistency_lock.read().unwrap(); self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv_bytes) } - fn send_payment_along_path(&self, path: &Vec, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { + fn send_payment_along_path(&self, path: &Path, payment_hash: &PaymentHash, recipient_onion: RecipientOnionFields, total_value: u64, cur_height: u32, payment_id: PaymentId, keysend_preimage: &Option, session_priv_bytes: [u8; 32]) -> Result<(), APIError> { // The top-level caller should hold the total_consistency_lock read lock. debug_assert!(self.total_consistency_lock.try_write().is_err()); - log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.first().unwrap().short_channel_id); + log_trace!(self.logger, "Attempting to send payment for path with next hop {}", path.hops.first().unwrap().short_channel_id); let prng_seed = self.entropy_source.get_secure_random_bytes(); let session_priv = SecretKey::from_slice(&session_priv_bytes[..]).expect("RNG is busted"); @@ -2661,7 +2681,7 @@ where let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash); let err: Result<(), _> = loop { - let (counterparty_node_id, id) = match self.short_to_chan_info.read().unwrap().get(&path.first().unwrap().short_channel_id) { + let (counterparty_node_id, id) = match self.short_to_chan_info.read().unwrap().get(&path.hops.first().unwrap().short_channel_id) { None => return Err(APIError::ChannelUnavailable{err: "No channel available with first hop!".to_owned()}), Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()), }; @@ -2712,7 +2732,7 @@ where return Ok(()); }; - match handle_error!(self, err, path.first().unwrap().pubkey) { + match handle_error!(self, err, path.hops.first().unwrap().pubkey) { Ok(_) => unreachable!(), Err(e) => { Err(APIError::ChannelUnavailable { err: e.err }) @@ -2882,10 +2902,10 @@ where /// Send a payment that is probing the given route for liquidity. We calculate the /// [`PaymentHash`] of probes based on a static secret and a random [`PaymentId`], which allows /// us to easily discern them from real payments. - pub fn send_probe(&self, hops: Vec) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { + pub fn send_probe(&self, path: Path) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - self.pending_outbound_payments.send_probe(hops, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height, + self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height, |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) } @@ -3006,10 +3026,11 @@ where } { let height = self.best_block.read().unwrap().height(); - // Transactions are evaluated as final by network mempools at the next block. However, the modules - // constituting our Lightning node might not have perfect sync about their blockchain views. Thus, if - // the wallet module is in advance on the LDK view, allow one more block of headroom. - if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 2 { + // Transactions are evaluated as final by network mempools if their locktime is strictly + // lower than the next block height. However, the modules constituting our Lightning + // node might not have perfect sync about their blockchain views. Thus, if the wallet + // module is ahead of LDK, only allow one more block of headroom. + if !funding_transaction.input.iter().all(|input| input.sequence == Sequence::MAX) && LockTime::from(funding_transaction.lock_time).is_block_height() && funding_transaction.lock_time.0 > height + 1 { return Err(APIError::APIMisuseError { err: "Funding transaction absolute timelock is non-final".to_owned() }); @@ -7004,28 +7025,30 @@ impl Readable for HTLCSource { 0 => { let mut session_priv: crate::util::ser::RequiredWrapper = crate::util::ser::RequiredWrapper(None); let mut first_hop_htlc_msat: u64 = 0; - let mut path: Option> = Some(Vec::new()); + let mut path_hops: Option> = Some(Vec::new()); let mut payment_id = None; let mut payment_params: Option = None; + let mut blinded_tail: Option = None; read_tlv_fields!(reader, { (0, session_priv, required), (1, payment_id, option), (2, first_hop_htlc_msat, required), - (4, path, vec_type), + (4, path_hops, vec_type), (5, payment_params, (option: ReadableArgs, 0)), + (6, blinded_tail, option), }); if payment_id.is_none() { // For backwards compat, if there was no payment_id written, use the session_priv bytes // instead. payment_id = Some(PaymentId(*session_priv.0.unwrap().as_ref())); } - if path.is_none() || path.as_ref().unwrap().is_empty() { + let path = Path { hops: path_hops.ok_or(DecodeError::InvalidValue)?, blinded_tail }; + if path.hops.len() == 0 { return Err(DecodeError::InvalidValue); } - let path = path.unwrap(); if let Some(params) = payment_params.as_mut() { if params.final_cltv_expiry_delta == 0 { - params.final_cltv_expiry_delta = path.last().unwrap().cltv_expiry_delta; + params.final_cltv_expiry_delta = path.final_cltv_expiry_delta().ok_or(DecodeError::InvalidValue)?; } } Ok(HTLCSource::OutboundRoute { @@ -7052,8 +7075,9 @@ impl Writeable for HTLCSource { (1, payment_id_opt, option), (2, first_hop_htlc_msat, required), // 3 was previously used to write a PaymentSecret for the payment. - (4, *path, vec_type), + (4, path.hops, vec_type), (5, None::, option), // payment_params in LDK versions prior to 0.0.115 + (6, path.blinded_tail, option), }); } HTLCSource::PreviousHopData(ref field) => { @@ -7720,12 +7744,12 @@ where if id_to_peer.get(&monitor.get_funding_txo().0.to_channel_id()).is_none() { for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs() { if let HTLCSource::OutboundRoute { payment_id, session_priv, path, .. } = htlc_source { - if path.is_empty() { + if path.hops.is_empty() { log_error!(args.logger, "Got an empty path for a pending payment"); return Err(DecodeError::InvalidValue); } - let path_amt = path.last().unwrap().fee_msat; + let path_amt = path.final_value_msat(); let mut session_priv_bytes = [0; 32]; session_priv_bytes[..].copy_from_slice(&session_priv[..]); match pending_outbounds.pending_outbound_payments.lock().unwrap().entry(payment_id) { @@ -7735,7 +7759,7 @@ where if newly_added { "Added" } else { "Had" }, path_amt, log_bytes!(session_priv_bytes), log_bytes!(htlc.payment_hash.0)); }, hash_map::Entry::Vacant(entry) => { - let path_fee = path.get_path_fees(); + let path_fee = path.fee_msat(); entry.insert(PendingOutboundPayment::Retryable { retry_strategy: None, attempts: PaymentAttempts::new(), @@ -8026,6 +8050,7 @@ where per_peer_state: FairRwLock::new(per_peer_state), pending_events: Mutex::new(pending_events_read), + pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), persistence_notifier: Notifier::new(), @@ -8057,8 +8082,6 @@ mod tests { use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; - #[cfg(feature = "std")] - use core::time::Duration; use core::sync::atomic::Ordering; use crate::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; @@ -8482,12 +8505,12 @@ mod tests { let (mut route, payment_hash, _, _) = get_route_and_payment_hash!(&nodes[0], nodes[3], 100000); let path = route.paths[0].clone(); route.paths.push(path); - route.paths[0][0].pubkey = nodes[1].node.get_our_node_id(); - route.paths[0][0].short_channel_id = chan_1_id; - route.paths[0][1].short_channel_id = chan_3_id; - route.paths[1][0].pubkey = nodes[2].node.get_our_node_id(); - route.paths[1][0].short_channel_id = chan_2_id; - route.paths[1][1].short_channel_id = chan_4_id; + route.paths[0].hops[0].pubkey = nodes[1].node.get_our_node_id(); + route.paths[0].hops[0].short_channel_id = chan_1_id; + route.paths[0].hops[1].short_channel_id = chan_3_id; + route.paths[1].hops[0].pubkey = nodes[2].node.get_our_node_id(); + route.paths[1].hops[0].short_channel_id = chan_2_id; + route.paths[1].hops[1].short_channel_id = chan_4_id; match nodes[0].node.send_payment_with_route(&route, payment_hash, RecipientOnionFields::spontaneous_empty(), PaymentId(payment_hash.0)) @@ -9035,7 +9058,7 @@ pub mod bench { // calls per node. let network = bitcoin::Network::Testnet; - let tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}; + let tx_broadcaster = test_utils::TestBroadcaster::new(network); let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; let logger_a = test_utils::TestLogger::with_id("node a".to_owned()); let scorer = Mutex::new(test_utils::TestScorer::new());